Skip to content

Commit

Permalink
Switch to pyarrow for Parquet file generation for the types integrati…
Browse files Browse the repository at this point in the history
…on table
  • Loading branch information
gruuya committed Jan 4, 2025
1 parent 451c5cc commit ba81d13
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 49 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ rust-version = { workspace = true }
[dependencies]
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
datafusion = "43"
datafusion = "44"
futures = { workspace = true }
iceberg = { workspace = true }
iceberg-catalog-rest = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/integration_tests/testdata/pyiceberg/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

FROM python:3.9-bullseye

RUN pip install pyiceberg[pyarrow]==0.8 datafusion==43.1.0
RUN pip install pyiceberg[pyarrow]==0.8

COPY provision.py .

Expand Down
79 changes: 43 additions & 36 deletions crates/integration_tests/testdata/pyiceberg/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,53 @@
# under the License.

import os
from datafusion import SessionContext
from pyiceberg.catalog import load_catalog
import pyarrow.parquet as pq
import pyarrow as pa
from datetime import datetime, timedelta

# Generate a table with various types in memory and dump to a Parquet file
ctx = SessionContext()
ctx.sql("""
CREATE TABLE types_test (
cboolean BOOLEAN,
cint8 TINYINT,
cint16 SMALLINT,
cint32 INT,
cint64 BIGINT,
cfloat32 REAL,
cfloat64 DOUBLE PRECISION,
cdecimal DECIMAL(8, 2),
cdate32 DATE,
ctimestamp TIMESTAMP,
ctimestamptz TIMESTAMPTZ,
cutf8 TEXT,
cbinary BYTEA
) AS SELECT
s % 2 = 1 as cboolean,
(s % 256 - 128) as cint8,
s as cint16,
s as cint32,
s as cint64,
s as cfloat32,
s as cfloat64,
s::NUMERIC / 100 as cnumeric,
s as cdate,
s * 1000 as ctimestamp,
s * 1000 as ctimestampz,
s::TEXT as cutf8,
s::TEXT cbinary
FROM unnest(generate_series(0, 1000)) AS q(s);
""")
a = ctx.sql("COPY types_test TO 'types_test.parquet'")
# File loading fails in the container without this line???
print(f"Created a Parquet file with {a} rows")
rows = 1001
columns = [
pa.array([(i % 2 == 1) for i in range(rows)]),
pa.array([(i % 256 - 128) for i in range(rows)]),
pa.array([i for i in range(rows)]),
pa.array([i for i in range(rows)]),
pa.array([i for i in range(rows)]),
pa.array([float(i) for i in range(rows)]),
pa.array([float(i) for i in range(rows)]),
pa.array([round(i / 100, 2) for i in range(rows)]),
pa.array([(datetime(1970, 1, 1) + timedelta(days=i)).date() for i in range(rows)]),
pa.array([(datetime(1970, 1, 1) + timedelta(seconds=i)) for i in range(rows)]),
pa.array([(datetime(1970, 1, 1) + timedelta(seconds=i)) for i in range(rows)]),
pa.array([str(i) for i in range(rows)]),
pa.array([str(i).encode("utf-8") for i in range(rows)]),
]
schema = pa.schema([
('cboolean', pa.bool_()),
('cint8', pa.int8()),
('cint16', pa.int16()),
('cint32', pa.int32()),
('cint64', pa.int64()),
('cfloat32', pa.float32()),
('cfloat64', pa.float64()),
('cdecimal128', pa.decimal128(8, 2)),
('cdate32', pa.date32()),
('ctimestamp', pa.timestamp('us')),
('ctimestamptz', pa.timestamp('us', tz='UTC')),
('cutf8', pa.utf8()),
('cbinary', pa.binary()),
])

# Convert to a PyArrow table
table = pa.Table.from_arrays(columns, schema=schema)

# Write to a Parquet file
pq.write_table(table, "types_test.parquet")

# Output the result
print(f"Created a Parquet file with {rows} rows and schema {table.schema}.")


# Load the Parquet file
parquet_file = pq.read_table("./types_test.parquet")
Expand Down
22 changes: 11 additions & 11 deletions crates/integration_tests/tests/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ async fn test_basic_queries() -> Result<(), DataFusionError> {
PARQUET_FIELD_ID_META_KEY.to_string(),
"7".to_string(),
)])),
Field::new("cdecimal", DataType::Decimal128(8, 2), true).with_metadata(HashMap::from(
[(PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string(),)]
)),
Field::new("cdecimal128", DataType::Decimal128(8, 2), true).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string(),)])
),
Field::new("cdate32", DataType::Date32, true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"9".to_string(),
Expand Down Expand Up @@ -125,19 +125,19 @@ async fn test_basic_queries() -> Result<(), DataFusionError> {
.collect()
.await?;
let expected = [
"+----------+-------+--------+--------+--------+----------+----------+----------+------------+----------------------------+-----------------------------+-------+---------+",
"| cboolean | cint8 | cint16 | cint32 | cint64 | cfloat32 | cfloat64 | cdecimal | cdate32 | ctimestamp | ctimestamptz | cutf8 | cbinary |",
"+----------+-------+--------+--------+--------+----------+----------+----------+------------+----------------------------+-----------------------------+-------+---------+",
"| false | -128 | 0 | 0 | 0 | 0.0 | 0.0 | 0.00 | 1970-01-01 | 1970-01-01T00:00:00 | 1970-01-01T00:00:00Z | 0 | 30 |",
"| true | -127 | 1 | 1 | 1 | 1.0 | 1.0 | 0.01 | 1970-01-02 | 1970-01-01T00:00:00.000001 | 1970-01-01T00:00:00.000001Z | 1 | 31 |",
"| false | -126 | 2 | 2 | 2 | 2.0 | 2.0 | 0.02 | 1970-01-03 | 1970-01-01T00:00:00.000002 | 1970-01-01T00:00:00.000002Z | 2 | 32 |",
"+----------+-------+--------+--------+--------+----------+----------+----------+------------+----------------------------+-----------------------------+-------+---------+",
"+----------+-------+--------+--------+--------+----------+----------+-------------+------------+---------------------+----------------------+-------+---------+",
"| cboolean | cint8 | cint16 | cint32 | cint64 | cfloat32 | cfloat64 | cdecimal128 | cdate32 | ctimestamp | ctimestamptz | cutf8 | cbinary |",
"+----------+-------+--------+--------+--------+----------+----------+-------------+------------+---------------------+----------------------+-------+---------+",
"| false | -128 | 0 | 0 | 0 | 0.0 | 0.0 | 0.00 | 1970-01-01 | 1970-01-01T00:00:00 | 1970-01-01T00:00:00Z | 0 | 30 |",
"| true | -127 | 1 | 1 | 1 | 1.0 | 1.0 | 0.01 | 1970-01-02 | 1970-01-01T00:00:01 | 1970-01-01T00:00:01Z | 1 | 31 |",
"| false | -126 | 2 | 2 | 2 | 2.0 | 2.0 | 0.02 | 1970-01-03 | 1970-01-01T00:00:02 | 1970-01-01T00:00:02Z | 2 | 32 |",
"+----------+-------+--------+--------+--------+----------+----------+-------------+------------+---------------------+----------------------+-------+---------+",
];
assert_batches_eq!(expected, &batches);

// TODO: this isn't OK, and should be fixed with https://github.com/apache/iceberg-rust/issues/813
let err = ctx
.sql("SELECT cdecimal FROM types_table WHERE cint16 <= 2")
.sql("SELECT cdecimal128 FROM types_table WHERE cint16 <= 2")
.await?
.collect()
.await
Expand Down

0 comments on commit ba81d13

Please sign in to comment.