From ba81d1399685cf6d43036606d6d49ee8c868e715 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Sat, 4 Jan 2025 10:23:35 +0100 Subject: [PATCH] Switch to pyarrow for Parquet file generation for the types integration table --- Cargo.lock | 2 + crates/integration_tests/Cargo.toml | 2 +- .../testdata/pyiceberg/Dockerfile | 2 +- .../testdata/pyiceberg/provision.py | 79 ++++++++++--------- crates/integration_tests/tests/datafusion.rs | 22 +++--- 5 files changed, 58 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d0b27f717..d3a9d3255 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3080,9 +3080,11 @@ version = "0.4.0" dependencies = [ "arrow-array", "arrow-schema", + "datafusion", "futures", "iceberg", "iceberg-catalog-rest", + "iceberg-datafusion", "iceberg_test_utils", "parquet", "tokio", diff --git a/crates/integration_tests/Cargo.toml b/crates/integration_tests/Cargo.toml index e0411b316..172a8d3a5 100644 --- a/crates/integration_tests/Cargo.toml +++ b/crates/integration_tests/Cargo.toml @@ -27,7 +27,7 @@ rust-version = { workspace = true } [dependencies] arrow-array = { workspace = true } arrow-schema = { workspace = true } -datafusion = "43" +datafusion = "44" futures = { workspace = true } iceberg = { workspace = true } iceberg-catalog-rest = { workspace = true } diff --git a/crates/integration_tests/testdata/pyiceberg/Dockerfile b/crates/integration_tests/testdata/pyiceberg/Dockerfile index b4f8be2f6..b37f6fc2d 100644 --- a/crates/integration_tests/testdata/pyiceberg/Dockerfile +++ b/crates/integration_tests/testdata/pyiceberg/Dockerfile @@ -15,7 +15,7 @@ FROM python:3.9-bullseye -RUN pip install pyiceberg[pyarrow]==0.8 datafusion==43.1.0 +RUN pip install pyiceberg[pyarrow]==0.8 COPY provision.py . diff --git a/crates/integration_tests/testdata/pyiceberg/provision.py b/crates/integration_tests/testdata/pyiceberg/provision.py index d41dfc751..92c739290 100644 --- a/crates/integration_tests/testdata/pyiceberg/provision.py +++ b/crates/integration_tests/testdata/pyiceberg/provision.py @@ -16,46 +16,53 @@ # under the License. import os -from datafusion import SessionContext from pyiceberg.catalog import load_catalog import pyarrow.parquet as pq +import pyarrow as pa +from datetime import datetime, timedelta # Generate a table with various types in memory and dump to a Parquet file -ctx = SessionContext() -ctx.sql(""" -CREATE TABLE types_test ( - cboolean BOOLEAN, - cint8 TINYINT, - cint16 SMALLINT, - cint32 INT, - cint64 BIGINT, - cfloat32 REAL, - cfloat64 DOUBLE PRECISION, - cdecimal DECIMAL(8, 2), - cdate32 DATE, - ctimestamp TIMESTAMP, - ctimestamptz TIMESTAMPTZ, - cutf8 TEXT, - cbinary BYTEA -) AS SELECT - s % 2 = 1 as cboolean, - (s % 256 - 128) as cint8, - s as cint16, - s as cint32, - s as cint64, - s as cfloat32, - s as cfloat64, - s::NUMERIC / 100 as cnumeric, - s as cdate, - s * 1000 as ctimestamp, - s * 1000 as ctimestampz, - s::TEXT as cutf8, - s::TEXT cbinary -FROM unnest(generate_series(0, 1000)) AS q(s); -""") -a = ctx.sql("COPY types_test TO 'types_test.parquet'") -# File loading fails in the container without this line??? -print(f"Created a Parquet file with {a} rows") +rows = 1001 +columns = [ + pa.array([(i % 2 == 1) for i in range(rows)]), + pa.array([(i % 256 - 128) for i in range(rows)]), + pa.array([i for i in range(rows)]), + pa.array([i for i in range(rows)]), + pa.array([i for i in range(rows)]), + pa.array([float(i) for i in range(rows)]), + pa.array([float(i) for i in range(rows)]), + pa.array([round(i / 100, 2) for i in range(rows)]), + pa.array([(datetime(1970, 1, 1) + timedelta(days=i)).date() for i in range(rows)]), + pa.array([(datetime(1970, 1, 1) + timedelta(seconds=i)) for i in range(rows)]), + pa.array([(datetime(1970, 1, 1) + timedelta(seconds=i)) for i in range(rows)]), + pa.array([str(i) for i in range(rows)]), + pa.array([str(i).encode("utf-8") for i in range(rows)]), +] +schema = pa.schema([ + ('cboolean', pa.bool_()), + ('cint8', pa.int8()), + ('cint16', pa.int16()), + ('cint32', pa.int32()), + ('cint64', pa.int64()), + ('cfloat32', pa.float32()), + ('cfloat64', pa.float64()), + ('cdecimal128', pa.decimal128(8, 2)), + ('cdate32', pa.date32()), + ('ctimestamp', pa.timestamp('us')), + ('ctimestamptz', pa.timestamp('us', tz='UTC')), + ('cutf8', pa.utf8()), + ('cbinary', pa.binary()), +]) + +# Convert to a PyArrow table +table = pa.Table.from_arrays(columns, schema=schema) + +# Write to a Parquet file +pq.write_table(table, "types_test.parquet") + +# Output the result +print(f"Created a Parquet file with {rows} rows and schema {table.schema}.") + # Load the Parquet file parquet_file = pq.read_table("./types_test.parquet") diff --git a/crates/integration_tests/tests/datafusion.rs b/crates/integration_tests/tests/datafusion.rs index 533fd7da7..f3757a0fe 100644 --- a/crates/integration_tests/tests/datafusion.rs +++ b/crates/integration_tests/tests/datafusion.rs @@ -81,9 +81,9 @@ async fn test_basic_queries() -> Result<(), DataFusionError> { PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string(), )])), - Field::new("cdecimal", DataType::Decimal128(8, 2), true).with_metadata(HashMap::from( - [(PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string(),)] - )), + Field::new("cdecimal128", DataType::Decimal128(8, 2), true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string(),)]) + ), Field::new("cdate32", DataType::Date32, true).with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), "9".to_string(), @@ -125,19 +125,19 @@ async fn test_basic_queries() -> Result<(), DataFusionError> { .collect() .await?; let expected = [ - "+----------+-------+--------+--------+--------+----------+----------+----------+------------+----------------------------+-----------------------------+-------+---------+", - "| cboolean | cint8 | cint16 | cint32 | cint64 | cfloat32 | cfloat64 | cdecimal | cdate32 | ctimestamp | ctimestamptz | cutf8 | cbinary |", - "+----------+-------+--------+--------+--------+----------+----------+----------+------------+----------------------------+-----------------------------+-------+---------+", - "| false | -128 | 0 | 0 | 0 | 0.0 | 0.0 | 0.00 | 1970-01-01 | 1970-01-01T00:00:00 | 1970-01-01T00:00:00Z | 0 | 30 |", - "| true | -127 | 1 | 1 | 1 | 1.0 | 1.0 | 0.01 | 1970-01-02 | 1970-01-01T00:00:00.000001 | 1970-01-01T00:00:00.000001Z | 1 | 31 |", - "| false | -126 | 2 | 2 | 2 | 2.0 | 2.0 | 0.02 | 1970-01-03 | 1970-01-01T00:00:00.000002 | 1970-01-01T00:00:00.000002Z | 2 | 32 |", - "+----------+-------+--------+--------+--------+----------+----------+----------+------------+----------------------------+-----------------------------+-------+---------+", + "+----------+-------+--------+--------+--------+----------+----------+-------------+------------+---------------------+----------------------+-------+---------+", + "| cboolean | cint8 | cint16 | cint32 | cint64 | cfloat32 | cfloat64 | cdecimal128 | cdate32 | ctimestamp | ctimestamptz | cutf8 | cbinary |", + "+----------+-------+--------+--------+--------+----------+----------+-------------+------------+---------------------+----------------------+-------+---------+", + "| false | -128 | 0 | 0 | 0 | 0.0 | 0.0 | 0.00 | 1970-01-01 | 1970-01-01T00:00:00 | 1970-01-01T00:00:00Z | 0 | 30 |", + "| true | -127 | 1 | 1 | 1 | 1.0 | 1.0 | 0.01 | 1970-01-02 | 1970-01-01T00:00:01 | 1970-01-01T00:00:01Z | 1 | 31 |", + "| false | -126 | 2 | 2 | 2 | 2.0 | 2.0 | 0.02 | 1970-01-03 | 1970-01-01T00:00:02 | 1970-01-01T00:00:02Z | 2 | 32 |", + "+----------+-------+--------+--------+--------+----------+----------+-------------+------------+---------------------+----------------------+-------+---------+", ]; assert_batches_eq!(expected, &batches); // TODO: this isn't OK, and should be fixed with https://github.com/apache/iceberg-rust/issues/813 let err = ctx - .sql("SELECT cdecimal FROM types_table WHERE cint16 <= 2") + .sql("SELECT cdecimal128 FROM types_table WHERE cint16 <= 2") .await? .collect() .await