Skip to content

Commit

Permalink
Add H2O.ai Database-like Ops benchmark to dfbench (groupby support) (#…
Browse files Browse the repository at this point in the history
…13996)

* Add H2O.ai Database-like Ops benchmark to dfbench

* Fix query and fmt

* Change venv

* Make sure venv version support falsa

* Fix default path

* Support groupby only now

* fix

* Address comments

* fix

* support python version higher

* support higer python such as python 3.13

* Addressed new comments

* Add specific query example
  • Loading branch information
zhuqi-lucas authored Jan 12, 2025
1 parent 17446ad commit 226afc6
Show file tree
Hide file tree
Showing 8 changed files with 379 additions and 154 deletions.
55 changes: 37 additions & 18 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ DataFusion is included in the benchmark setups for several popular
benchmarks that compare performance with other engines. For example:

* [ClickBench] scripts are in the [ClickBench repo](https://github.com/ClickHouse/ClickBench/tree/main/datafusion)
* [H2o.ai `db-benchmark`] scripts are in [db-benchmark](db-benchmark) directory
* [H2o.ai `db-benchmark`] scripts are in [db-benchmark](https://github.com/apache/datafusion/tree/main/benchmarks/src/h2o.rs)

[ClickBench]: https://github.com/ClickHouse/ClickBench/tree/main
[H2o.ai `db-benchmark`]: https://github.com/h2oai/db-benchmark
Expand Down Expand Up @@ -405,31 +405,50 @@ cargo run --release --bin external_aggr -- benchmark -n 4 --iterations 3 -p '...
```


# Older Benchmarks
## h2o benchmarks for groupby

## h2o benchmarks
### Generate data for h2o benchmarks
There are three options for generating data for h2o benchmarks: `small`, `medium`, and `big`. The data is generated in the `data` directory.

1. Generate small data (1e7 rows)
```bash
cargo run --release --bin h2o group-by --query 1 --path /mnt/bigdata/h2oai/N_1e7_K_1e2_single.csv --mem-table --debug
./bench.sh data h2o_small
```

Example run:

2. Generate medium data (1e8 rows)
```bash
./bench.sh data h2o_medium
```


3. Generate large data (1e9 rows)
```bash
./bench.sh data h2o_big
```

### Run h2o benchmarks
There are three options for running h2o benchmarks: `small`, `medium`, and `big`.
1. Run small data benchmark
```bash
./bench.sh run h2o_small
```
Running benchmarks with the following options: GroupBy(GroupBy { query: 1, path: "/mnt/bigdata/h2oai/N_1e7_K_1e2_single.csv", debug: false })
Executing select id1, sum(v1) as v1 from x group by id1
+-------+--------+
| id1 | v1 |
+-------+--------+
| id063 | 199420 |
| id094 | 200127 |
| id044 | 198886 |
...
| id093 | 200132 |
| id003 | 199047 |
+-------+--------+

h2o groupby query 1 took 1669 ms
2. Run medium data benchmark
```bash
./bench.sh run h2o_medium
```

3. Run large data benchmark
```bash
./bench.sh run h2o_big
```

4. Run a specific query with a specific data path

For example, to run query 1 with the small data generated above:
```bash
cargo run --release --bin dfbench -- h2o --path ./benchmarks/data/h2o/G1_1e7_1e7_100_0.csv --query 1
```

[1]: http://www.tpc.org/tpch/
Expand Down
146 changes: 146 additions & 0 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ clickbench_1: ClickBench queries against a single parquet file
clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)
external_aggr: External aggregation benchmark
h2o_small: h2oai benchmark with small dataset (1e7 rows), default file format is csv
h2o_medium: h2oai benchmark with medium dataset (1e8 rows), default file format is csv
h2o_big: h2oai benchmark with large dataset (1e9 rows), default file format is csv
**********
* Supported Configuration (Environment Variables)
Expand Down Expand Up @@ -142,6 +145,9 @@ main() {
all)
data_tpch "1"
data_tpch "10"
data_h2o "SMALL"
data_h2o "MEDIUM"
data_h2o "BIG"
data_clickbench_1
data_clickbench_partitioned
data_imdb
Expand Down Expand Up @@ -172,6 +178,15 @@ main() {
imdb)
data_imdb
;;
h2o_small)
data_h2o "SMALL" "CSV"
;;
h2o_medium)
data_h2o "MEDIUM" "CSV"
;;
h2o_big)
data_h2o "BIG" "CSV"
;;
external_aggr)
# same data as for tpch
data_tpch "1"
Expand Down Expand Up @@ -221,6 +236,9 @@ main() {
run_clickbench_1
run_clickbench_partitioned
run_clickbench_extended
run_h2o "SMALL" "PARQUET" "groupby"
run_h2o "MEDIUM" "PARQUET" "groupby"
run_h2o "BIG" "PARQUET" "groupby"
run_imdb
run_external_aggr
;;
Expand Down Expand Up @@ -254,6 +272,15 @@ main() {
imdb)
run_imdb
;;
h2o_small)
run_h2o "SMALL" "CSV" "groupby"
;;
h2o_medium)
run_h2o "MEDIUM" "CSV" "groupby"
;;
h2o_big)
run_h2o "BIG" "CSV" "groupby"
;;
external_aggr)
run_external_aggr
;;
Expand Down Expand Up @@ -541,6 +568,125 @@ run_imdb() {
$CARGO_COMMAND --bin imdb -- benchmark datafusion --iterations 5 --path "${IMDB_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}"
}

data_h2o() {
# Default values for size and data format
SIZE=${1:-"SMALL"}
DATA_FORMAT=${2:-"CSV"}

# Function to compare Python versions
version_ge() {
[ "$(printf '%s\n' "$1" "$2" | sort -V | head -n1)" = "$2" ]
}

export PYO3_USE_ABI3_FORWARD_COMPATIBILITY=1

# Find the highest available Python version (3.10 or higher)
REQUIRED_VERSION="3.10"
PYTHON_CMD=$(command -v python3 || true)

if [ -n "$PYTHON_CMD" ]; then
PYTHON_VERSION=$($PYTHON_CMD -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
if version_ge "$PYTHON_VERSION" "$REQUIRED_VERSION"; then
echo "Found Python version $PYTHON_VERSION, which is suitable."
else
echo "Python version $PYTHON_VERSION found, but version $REQUIRED_VERSION or higher is required."
PYTHON_CMD=""
fi
fi

# Search for suitable Python versions if the default is unsuitable
if [ -z "$PYTHON_CMD" ]; then
# Loop through all available Python3 commands on the system
for CMD in $(compgen -c | grep -E '^python3(\.[0-9]+)?$'); do
if command -v "$CMD" &> /dev/null; then
PYTHON_VERSION=$($CMD -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
if version_ge "$PYTHON_VERSION" "$REQUIRED_VERSION"; then
PYTHON_CMD="$CMD"
echo "Found suitable Python version: $PYTHON_VERSION ($CMD)"
break
fi
fi
done
fi

# If no suitable Python version found, exit with an error
if [ -z "$PYTHON_CMD" ]; then
echo "Python 3.10 or higher is required. Please install it."
return 1
fi

echo "Using Python command: $PYTHON_CMD"

# Install falsa and other dependencies
echo "Installing falsa..."

# Set virtual environment directory
VIRTUAL_ENV="${PWD}/venv"

# Create a virtual environment using the detected Python command
$PYTHON_CMD -m venv "$VIRTUAL_ENV"

# Activate the virtual environment and install dependencies
source "$VIRTUAL_ENV/bin/activate"

# Ensure 'falsa' is installed (avoid unnecessary reinstall)
pip install --quiet --upgrade falsa

# Create directory if it doesn't exist
H2O_DIR="${DATA_DIR}/h2o"
mkdir -p "${H2O_DIR}"

# Generate h2o test data
echo "Generating h2o test data in ${H2O_DIR} with size=${SIZE} and format=${DATA_FORMAT}"
falsa groupby --path-prefix="${H2O_DIR}" --size "${SIZE}" --data-format "${DATA_FORMAT}"

# Deactivate virtual environment after completion
deactivate
}

## todo now only support groupby, after https://github.com/mrpowers-io/falsa/issues/21 done, we can add support for join
run_h2o() {
# Default values for size and data format
SIZE=${1:-"SMALL"}
DATA_FORMAT=${2:-"CSV"}
DATA_FORMAT=$(echo "$DATA_FORMAT" | tr '[:upper:]' '[:lower:]')
RUN_Type=${3:-"groupby"}

# Data directory and results file path
H2O_DIR="${DATA_DIR}/h2o"
RESULTS_FILE="${RESULTS_DIR}/h2o.json"

echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running h2o benchmark..."

# Set the file name based on the size
case "$SIZE" in
"SMALL")
FILE_NAME="G1_1e7_1e7_100_0.${DATA_FORMAT}" # For small dataset
;;
"MEDIUM")
FILE_NAME="G1_1e8_1e8_100_0.${DATA_FORMAT}" # For medium dataset
;;
"BIG")
FILE_NAME="G1_1e9_1e9_100_0.${DATA_FORMAT}" # For big dataset
;;
*)
echo "Invalid size. Valid options are SMALL, MEDIUM, or BIG."
return 1
;;
esac

# Set the query file name based on the RUN_Type
QUERY_FILE="${SCRIPT_DIR}/queries/h2o/${RUN_Type}.sql"

# Run the benchmark using the dynamically constructed file path and query file
$CARGO_COMMAND --bin dfbench -- h2o \
--iterations 3 \
--path "${H2O_DIR}/${FILE_NAME}" \
--queries-path "${QUERY_FILE}" \
-o "${RESULTS_FILE}"
}

# Runs the external aggregation benchmark
run_external_aggr() {
# Use TPC-H SF1 dataset
Expand Down
10 changes: 10 additions & 0 deletions benchmarks/queries/h2o/groupby.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1;
SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2;
SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3;
SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM x GROUP BY id4;
SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM x GROUP BY id6;
SELECT id4, id5, MEDIAN(v3) AS median_v3, STDDEV(v3) AS sd_v3 FROM x GROUP BY id4, id5;
SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM x GROUP BY id3;
SELECT id6, largest2_v3 FROM (SELECT id6, v3 AS largest2_v3, ROW_NUMBER() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS order_v3 FROM x WHERE v3 IS NOT NULL) sub_query WHERE order_v3 <= 2;
SELECT id2, id4, POWER(CORR(v1, v2), 2) AS r2 FROM x GROUP BY id2, id4;
SELECT id1, id2, id3, id4, id5, id6, SUM(v3) AS v3, COUNT(*) AS count FROM x GROUP BY id1, id2, id3, id4, id5, id6;
5 changes: 5 additions & 0 deletions benchmarks/queries/h2o/join.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
SELECT x.id1, x.id2, x.id3, x.id4 as xid4, small.id4 as smallid4, x.id5, x.id6, x.v1, small.v2 FROM x INNER JOIN small ON x.id1 = small.id1;
SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x INNER JOIN medium ON x.id2 = medium.id2;
SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x LEFT JOIN medium ON x.id2 = medium.id2;
SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x JOIN medium ON x.id5 = medium.id5;
SELECT x.id1 as xid1, large.id1 as largeid1, x.id2 as xid2, large.id2 as largeid2, x.id3, x.id4 as xid4, large.id4 as largeid4, x.id5 as xid5, large.id5 as largeid5, x.id6 as xid6, large.id6 as largeid6, x.v1, large.v2 FROM x JOIN large ON x.id3 = large.id3;
6 changes: 5 additions & 1 deletion benchmarks/src/bin/dfbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
#[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

use datafusion_benchmarks::{clickbench, imdb, parquet_filter, sort, sort_tpch, tpch};
use datafusion_benchmarks::{
clickbench, h2o, imdb, parquet_filter, sort, sort_tpch, tpch,
};

#[derive(Debug, StructOpt)]
#[structopt(about = "benchmark command")]
Expand All @@ -45,6 +47,7 @@ enum Options {
Sort(sort::RunOpt),
SortTpch(sort_tpch::RunOpt),
Imdb(imdb::RunOpt),
H2o(h2o::RunOpt),
}

// Main benchmark runner entrypoint
Expand All @@ -60,5 +63,6 @@ pub async fn main() -> Result<()> {
Options::Sort(opt) => opt.run().await,
Options::SortTpch(opt) => opt.run().await,
Options::Imdb(opt) => opt.run().await,
Options::H2o(opt) => opt.run().await,
}
}
Loading

0 comments on commit 226afc6

Please sign in to comment.