diff --git a/Cargo.lock b/Cargo.lock index d0b27f717..d3a9d3255 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3080,9 +3080,11 @@ version = "0.4.0" dependencies = [ "arrow-array", "arrow-schema", + "datafusion", "futures", "iceberg", "iceberg-catalog-rest", + "iceberg-datafusion", "iceberg_test_utils", "parquet", "tokio", diff --git a/crates/integration_tests/Cargo.toml b/crates/integration_tests/Cargo.toml index a047d7580..172a8d3a5 100644 --- a/crates/integration_tests/Cargo.toml +++ b/crates/integration_tests/Cargo.toml @@ -27,9 +27,11 @@ rust-version = { workspace = true } [dependencies] arrow-array = { workspace = true } arrow-schema = { workspace = true } +datafusion = "44" futures = { workspace = true } iceberg = { workspace = true } iceberg-catalog-rest = { workspace = true } +iceberg-datafusion = { workspace = true } iceberg_test_utils = { path = "../test_utils", features = ["tests"] } parquet = { workspace = true } tokio = { workspace = true } diff --git a/crates/integration_tests/testdata/docker-compose.yaml b/crates/integration_tests/testdata/docker-compose.yaml index dafd6b497..f27f3c9b8 100644 --- a/crates/integration_tests/testdata/docker-compose.yaml +++ b/crates/integration_tests/testdata/docker-compose.yaml @@ -83,3 +83,19 @@ services: links: - rest:rest - minio:minio + + pyiceberg: + build: pyiceberg/ + networks: + rest_bridge: + depends_on: + - rest + - minio + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + - PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE=true + links: + - rest:rest + - minio:minio diff --git a/crates/integration_tests/testdata/pyiceberg/Dockerfile b/crates/integration_tests/testdata/pyiceberg/Dockerfile new file mode 100644 index 000000000..f1abb90c4 --- /dev/null +++ b/crates/integration_tests/testdata/pyiceberg/Dockerfile @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.9-bullseye + +RUN pip install pyiceberg[pyarrow]==0.8.1 + +COPY provision.py . + +ENTRYPOINT python3 provision.py diff --git a/crates/integration_tests/testdata/pyiceberg/provision.py b/crates/integration_tests/testdata/pyiceberg/provision.py new file mode 100644 index 000000000..2d6ce9f5b --- /dev/null +++ b/crates/integration_tests/testdata/pyiceberg/provision.py @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +from pyiceberg.catalog import load_catalog +import pyarrow as pa +from datetime import datetime, timedelta + +# Generate a table with various types in memory and dump to a Parquet file +rows = 1001 +columns = [ + pa.array([(i % 2 == 1) for i in range(rows)]), + pa.array([(i % 256 - 128) for i in range(rows)]), + pa.array([i for i in range(rows)]), + pa.array([i for i in range(rows)]), + pa.array([i for i in range(rows)]), + pa.array([float(i) for i in range(rows)]), + pa.array([float(i) for i in range(rows)]), + pa.array([round(i / 100, 2) for i in range(rows)]), + pa.array([(datetime(1970, 1, 1) + timedelta(days=i)).date() for i in range(rows)]), + pa.array([(datetime(1970, 1, 1) + timedelta(seconds=i)) for i in range(rows)]), + pa.array([(datetime(1970, 1, 1) + timedelta(seconds=i)) for i in range(rows)]), + pa.array([str(i) for i in range(rows)]), + pa.array([str(i).encode("utf-8") for i in range(rows)]), +] +schema = pa.schema([ + ('cboolean', pa.bool_()), + ('cint8', pa.int8()), + ('cint16', pa.int16()), + ('cint32', pa.int32()), + ('cint64', pa.int64()), + ('cfloat32', pa.float32()), + ('cfloat64', pa.float64()), + ('cdecimal128', pa.decimal128(8, 2)), + ('cdate32', pa.date32()), + ('ctimestamp', pa.timestamp('us')), + ('ctimestamptz', pa.timestamp('us', tz='UTC')), + ('cutf8', pa.utf8()), + ('cbinary', pa.binary()), +]) + +# Convert to a PyArrow table +table = pa.Table.from_arrays(columns, schema=schema) + +# Connect to the REST catalog +catalog = load_catalog( + "rest", + **{ + "type": "rest", + "uri": "http://rest:8181", + "s3.endpoint": "http://minio:9000", + "s3.access-key-id": os.environ["AWS_ACCESS_KEY_ID"], + "s3.secret-access-key": os.environ["AWS_SECRET_ACCESS_KEY"], + }, +) + +# Create a corresponding Iceberg table and append the file to it +iceberg_table = catalog.create_table_if_not_exists( + identifier=f"default.types_test", + schema=schema, +) +iceberg_table.append(table) diff --git a/crates/integration_tests/tests/datafusion.rs b/crates/integration_tests/tests/datafusion.rs new file mode 100644 index 000000000..f3757a0fe --- /dev/null +++ b/crates/integration_tests/tests/datafusion.rs @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_schema::TimeUnit; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::assert_batches_eq; +use datafusion::catalog::TableProvider; +use datafusion::error::DataFusionError; +use datafusion::prelude::SessionContext; +use iceberg::{Catalog, TableIdent}; +use iceberg_datafusion::IcebergTableProvider; +use iceberg_integration_tests::set_test_fixture; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +#[tokio::test] +async fn test_basic_queries() -> Result<(), DataFusionError> { + let fixture = set_test_fixture("datafusion_basic_read").await; + + let catalog = fixture.rest_catalog; + + let table = catalog + .load_table(&TableIdent::from_strs(["default", "types_test"]).unwrap()) + .await + .unwrap(); + + let ctx = SessionContext::new(); + + let table_provider = Arc::new( + IcebergTableProvider::try_new_from_table(table) + .await + .unwrap(), + ); + + let schema = table_provider.schema(); + + assert_eq!( + schema.as_ref(), + &Schema::new(vec![ + Field::new("cboolean", DataType::Boolean, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("cint8", DataType::Int32, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + Field::new("cint16", DataType::Int32, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + Field::new("cint32", DataType::Int32, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "4".to_string(), + )])), + Field::new("cint64", DataType::Int64, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )])), + Field::new("cfloat32", DataType::Float32, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "6".to_string(), + )])), + Field::new("cfloat64", DataType::Float64, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )])), + Field::new("cdecimal128", DataType::Decimal128(8, 2), true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string(),)]) + ), + Field::new("cdate32", DataType::Date32, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "9".to_string(), + )])), + Field::new( + "ctimestamp", + DataType::Timestamp(TimeUnit::Microsecond, None), + true + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "10".to_string(), + )])), + Field::new( + "ctimestamptz", + DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("+00:00"))), + true + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "11".to_string(), + )])), + Field::new("cutf8", DataType::Utf8, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "12".to_string(), + )])), + Field::new("cbinary", DataType::LargeBinary, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "13".to_string(), + )])), + ]) + ); + + ctx.register_table("types_table", table_provider)?; + + let batches = ctx + .sql("SELECT * FROM types_table LIMIT 3") + .await? + .collect() + .await?; + let expected = [ + "+----------+-------+--------+--------+--------+----------+----------+-------------+------------+---------------------+----------------------+-------+---------+", + "| cboolean | cint8 | cint16 | cint32 | cint64 | cfloat32 | cfloat64 | cdecimal128 | cdate32 | ctimestamp | ctimestamptz | cutf8 | cbinary |", + "+----------+-------+--------+--------+--------+----------+----------+-------------+------------+---------------------+----------------------+-------+---------+", + "| false | -128 | 0 | 0 | 0 | 0.0 | 0.0 | 0.00 | 1970-01-01 | 1970-01-01T00:00:00 | 1970-01-01T00:00:00Z | 0 | 30 |", + "| true | -127 | 1 | 1 | 1 | 1.0 | 1.0 | 0.01 | 1970-01-02 | 1970-01-01T00:00:01 | 1970-01-01T00:00:01Z | 1 | 31 |", + "| false | -126 | 2 | 2 | 2 | 2.0 | 2.0 | 0.02 | 1970-01-03 | 1970-01-01T00:00:02 | 1970-01-01T00:00:02Z | 2 | 32 |", + "+----------+-------+--------+--------+--------+----------+----------+-------------+------------+---------------------+----------------------+-------+---------+", + ]; + assert_batches_eq!(expected, &batches); + + // TODO: this isn't OK, and should be fixed with https://github.com/apache/iceberg-rust/issues/813 + let err = ctx + .sql("SELECT cdecimal128 FROM types_table WHERE cint16 <= 2") + .await? + .collect() + .await + .unwrap_err(); + assert!(err + .to_string() + .contains("Invalid comparison operation: Int16 <= Int32")); + + Ok(()) +}