Enterprise-Scale Data Engineering with Python: Evaluation of Bodo with the TPCxBB Q26 Benchmark

Zhuchang Zhan

Enterprises are always looking for simpler, faster, more scalable, and cost-effective data processing solutions as the size and complexity of data grows. In addition, the productivity of data scientists and data engineers is a key metric for organizations looking to advance their data-centric applications. Platforms for basic analytics queries on 100GB - 1TB datasets may not work for complex machine learning and AI applications on 10TB+ datasets. Therefore, evaluating the efficacy of compute platforms can be challenging since there are several factors to consider.

One of our customers, a large Fortune 10 enterprise, recently evaluated Bodo for data engineering workloads in their new data infrastructure. We share the results here, which can be informative for anyone interested in large-scale data engineering. In summary, Bodo is much simpler to use and 10x faster than highly optimized Spark with the same cluster setup as measured by the customer.

Bodo Code


import bodo
import pandas as pd
import numpy as np

@bodo.jit
def tpcx_bb_q26():
    store_sales = pd.read_parquet('s3://...')
    item = pd.read_parquet('s3://...')
    item2 = item[item['i_category'] == 'Books']
    sale_items = pd.merge(
        store_sales, item2, left_on='ss_item_sk', right_on='i_item_sk'
    )
    count1 = sale_items.groupby('ss_customer_sk')['ss_item_sk'].count()
    gp1 = sale_items.groupby('ss_customer_sk')['i_class_id']
    def id1(x): return (x == 1).sum()
    def id2(x): return (x == 2).sum()
    ...
    def id15(x): return (x == 15).sum()
    customer_i_class = gp1.agg((id1, id2, ... id15))
    customer_i_class['ss_item_count'] = count1
    customer_i_class = customer_i_class[customer_i_class.ss_item_count > 5]
    customer_i_class = customer_i_class.drop(['ss_item_count'], axis=1)
    customer_i_class = customer_i_class.sort_values('ss_customer_sk')
    return customer_i_class

Spark Code


import numpy as np
import pyspark
import pyspark.sql.functions  as f
from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel

def tpcx_bb_q26():
    spark = SparkSession.builder.appName('ipython').getOrCreate()
    store_sales = spark.read.parquet('s3://...')
    store_sales.createOrReplaceTempView('store_sales')
    item = spark.read.parquet('s3://...')
    item.createOrReplaceTempView('item')
    ​
    result = spark.sql(
        """
        SELECT
            ss.ss_customer_sk AS cid,
            CAST( count(CASE WHEN i.i_class_id=1 THEN 1 ELSE NULL END) AS DOUBLE ) AS id1,
            CAST( count(CASE WHEN i.i_class_id=2 THEN 1 ELSE NULL END) AS DOUBLE ) AS id2,
            ...
            CAST( count(CASE WHEN i.i_class_id=15 THEN 1 ELSE NULL END) AS DOUBLE ) AS id15
        FROM store_sales ss
        INNER JOIN item i
        ON
        (
            ss.ss_item_sk = i.i_item_sk
            AND i.i_category IN ('Books')
            AND ss.ss_customer_sk IS NOT NULL
        )
        GROUP BY ss.ss_customer_sk
        HAVING count(ss.ss_item_sk) > 5
        ORDER BY cid
        """
    )
    customer_i_class = result.toPandas().values.astype(np.float64).sum()
    return customer_i_class

TPCxBB Q26 Benchmark

The customer chose the industry-standard TPCxBB benchmark suite since it can represent data engineering workloads for various applications, including machine learning pipelines. It is designed to represent advanced analytics use-cases that utilize structured, semi-structured and unstructured data. In our experience, even TPCxBB is too simple to represent complex data pipelines in ML applications, and therefore cannot fully exercise Bodo’s optimization capabilities. But it can serve as a good starting point for evaluation.

The TPCxBB benchmarks represent workloads in a retail setting with various data sources (such as online and in-store transactions, logs, and reviews). Still, the benchmark patterns apply to all verticals. In addition, the data generator is scalable and can generate data at any scale factor, which is very useful for scalability tests.

This evaluation focuses on Query 26 of the TPCxBB benchmark suite, which clusters retail customers into groups based on their purchase history. This particular benchmark is interesting since it represents data preparation and feature engineering stages of an ML pipeline well. Further, focusing on a single benchmark allows examining many factors (memory consumption, energy cost, etc.) in more depth. Above are both Spark and Bodo versions of the code. The Bodo version uses native Python/Pandas APIs and is a straightforward translation of the original SQL code. The Spark version uses SQL for faster speed but for simplicity could also be written in the equivalent PySpark DataFrames APIs which are wrappers around the SparkSQL engine.

Cluster Setup

The customer environment is a custom AWS infrastructure setup that meets their specific security and governance requirements. Setting up Bodo in this environment proved to be straightforward since Bodo uses a “clean” engine design that has minimal infrastructure requirements. As the alternative baseline for comparison, the customer chose the Elastic Map Reduce (EMR) service of AWS, which provides an optimized version of Apache Spark. Furthermore, their data engineering experts examined Spark’s dozens of configuration options and tuned them manually if necessary. On the other hand, Bodo requires no tuning and can be run efficiently out of the box by anyone experienced with Python.

Both the Bodo and the Spark executions use the same cluster resources. The cluster has 125 instances of c5n.18xlarge type, with a total of 4,500 physical CPU cores. All nodes are in a single AWS Placement Group so that they are physically close to each other in the data center to reduce networking overhead. The input dataset is scale factor 40,000 of TPCxBB, which has 52 billion rows and is 2.5TB in Parquet format on disk. In this setup, Bodo takes advantage of the Elastic Fabric Network (EFA) feature of AWS, which provides HPC networking (allows a high level of inter-node communications without TCP protocol overheads) at no extra cost.

cluster setup

Benchmark Results

As the graph below illustrates, Bodo is about 10x faster than EMR Spark for this use-case. Since the exact same compute resources are used, this translates to about 90% AWS infrastructure savings. Furthermore, the team found Bodo to be much easier to use by data scientists and data engineers since it provides standard Python APIs while being easier to set up for the IT team with minimal configuration setup.

chart 001

In addition, the benchmarking team gathered a lot of other data points. Notably, Bodo scales linearly with the data and the number of nodes while Spark struggles to handle datasets > 1TB and requires additional tuning for efficiency > 1,000 cores. In addition, using EFA is significant for performance, but Bodo is still much faster than Spark even with EFA turned off (Bodo without EFA is 5x faster than Spark). Furthermore, as measured by the customer, Bodo typically uses much less memory than Spark. We encourage the readers to replicate this benchmark and test Bodo for your own applications and let us know your findings here!

The Future of Data Platforms

Data processing platforms are becoming a critical part of modern data-driven enterprises, and their efficiency translates directly into business value and cost savings. Apache Spark enabled a generation of data platforms and use-cases by improving the simplicity and performance of data processing by 10x or more over Hadoop. With Bodo’s new technology, we aim to advance the data analytics area by providing 10x or more simplicity and performance improvements over Spark. We hope that you also share our excitement of the future to come with Bodo and welcome you to check out the free Bodo Community Edition to see how it works, and let us know if you would like free trial licenses for large-scale benchmarking. Join our Slack or Discourse to talk to our team directly.