Skip to content

Commit

Permalink
Add a e2e test for writing an Iceberg table with pyiceberg and readin…
Browse files Browse the repository at this point in the history
…g it with DataFusion
  • Loading branch information
gruuya committed Dec 18, 2024
1 parent f9de01b commit 1b03c42
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 0 deletions.
2 changes: 2 additions & 0 deletions crates/integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ futures = { workspace = true }
iceberg = { workspace = true }
iceberg-catalog-rest = { workspace = true }
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
iceberg-datafusion = { workspace = true }
datafusion = "43"
parquet = { workspace = true }
tokio = { workspace = true }
16 changes: 16 additions & 0 deletions crates/integration_tests/testdata/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,19 @@ services:
links:
- rest:rest
- minio:minio

pyiceberg:
build: pyiceberg/
networks:
rest_bridge:
depends_on:
- rest
- minio
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
- PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE=true
links:
- rest:rest
- minio:minio
22 changes: 22 additions & 0 deletions crates/integration_tests/testdata/pyiceberg/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

FROM python:3.9-bullseye

RUN pip install pyiceberg==0.8 pyarrow==18.0 datafusion==43.1.0

COPY load_types_table.py .

ENTRYPOINT python3 load_types_table.py
79 changes: 79 additions & 0 deletions crates/integration_tests/testdata/pyiceberg/load_types_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from datafusion import SessionContext
from pyiceberg.catalog import load_catalog
import pyarrow.parquet as pq

# Generate a table with various types in memory and dump to a Parquet file
ctx = SessionContext()
ctx.sql("""
CREATE TABLE types_test (
cboolean BOOLEAN,
cint8 TINYINT,
cint16 SMALLINT,
cint32 INT,
cint64 BIGINT,
cfloat32 REAL,
cfloat64 DOUBLE PRECISION,
cdecimal DECIMAL(8, 2),
cdate32 DATE,
ctimestamp TIMESTAMP,
ctimestamptz TIMESTAMPTZ,
cutf8 TEXT,
cbinary BYTEA
) AS SELECT
s % 2 = 1 as cboolean,
(s % 256 - 128) as cint8,
s as cint16,
s as cint32,
s as cint64,
s as cfloat32,
s as cfloat64,
s::NUMERIC / 100 as cnumeric,
s as cdate,
s * 1000 as ctimestamp,
s * 1000 as ctimestampz,
s::TEXT as cutf8,
s::TEXT cbinary
FROM unnest(generate_series(0, 1000)) AS q(s);
""")
a = ctx.sql("COPY types_test TO 'types_test.parquet'")
# File loading fails in the container without this line???
print(f"Created a Parquet file with {a} rows")

# Load the Parquet file
parquet_file = pq.read_table("./types_test.parquet")

# Connect to the REST catalog
catalog = load_catalog(
"rest",
**{
"type": "rest",
"uri": "http://rest:8181",
"s3.endpoint": "http://minio:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
},
)

# Create a corresponding Iceberg table and append the file to it
iceberg_table = catalog.create_table_if_not_exists(
identifier=f"default.types_test",
schema=parquet_file.schema,
)
iceberg_table.append(df=parquet_file)
76 changes: 76 additions & 0 deletions crates/integration_tests/tests/datafusion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

pub use datafusion::assert_batches_eq;
pub use datafusion::error::DataFusionError;
pub use datafusion::prelude::SessionContext;
use iceberg::{Catalog, TableIdent};
use iceberg_datafusion::IcebergTableProvider;
use iceberg_integration_tests::set_test_fixture;

#[tokio::test]
async fn test_basic_queries() -> Result<(), DataFusionError> {
let fixture = set_test_fixture("datafusion_basic_read").await;

let catalog = fixture.rest_catalog;

let table = catalog
.load_table(&TableIdent::from_strs(["default", "types_test"]).unwrap())
.await
.unwrap();

let ctx = SessionContext::new();

let table_provider = Arc::new(
IcebergTableProvider::try_new_from_table(table)
.await
.unwrap(),
);

ctx.register_table("types_table", table_provider)?;

let batches = ctx
.sql("SELECT * FROM types_table LIMIT 3")
.await?
.collect()
.await?;
let expected = [
"+----------+-------+--------+--------+--------+----------+----------+----------+------------+----------------------------+-----------------------------+-------+---------+",
"| cboolean | cint8 | cint16 | cint32 | cint64 | cfloat32 | cfloat64 | cdecimal | cdate32 | ctimestamp | ctimestamptz | cutf8 | cbinary |",
"+----------+-------+--------+--------+--------+----------+----------+----------+------------+----------------------------+-----------------------------+-------+---------+",
"| false | -128 | 0 | 0 | 0 | 0.0 | 0.0 | 0.00 | 1970-01-01 | 1970-01-01T00:00:00 | 1970-01-01T00:00:00Z | 0 | 30 |",
"| true | -127 | 1 | 1 | 1 | 1.0 | 1.0 | 0.01 | 1970-01-02 | 1970-01-01T00:00:00.000001 | 1970-01-01T00:00:00.000001Z | 1 | 31 |",
"| false | -126 | 2 | 2 | 2 | 2.0 | 2.0 | 0.02 | 1970-01-03 | 1970-01-01T00:00:00.000002 | 1970-01-01T00:00:00.000002Z | 2 | 32 |",
"+----------+-------+--------+--------+--------+----------+----------+----------+------------+----------------------------+-----------------------------+-------+---------+",
];
assert_batches_eq!(expected, &batches);

// TODO: this isn't OK, and should be fixed with https://github.com/apache/iceberg-rust/issues/813
let err = ctx
.sql("SELECT cdecimal FROM types_table WHERE cint16 <= 2")
.await?
.collect()
.await
.unwrap_err();
assert!(err
.to_string()
.contains("Invalid comparison operation: Int16 <= Int32"));

Ok(())
}

0 comments on commit 1b03c42

Please sign in to comment.