Skip to content

Commit

Permalink
Add asserts for the pyiceberg resulting arrow schema
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Dec 18, 2024
1 parent c261ddf commit b5d23ff
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 7 deletions.
4 changes: 2 additions & 2 deletions crates/integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ rust-version = { workspace = true }
[dependencies]
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
datafusion = "43"
futures = { workspace = true }
iceberg = { workspace = true }
iceberg-catalog-rest = { workspace = true }
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
iceberg-datafusion = { workspace = true }
datafusion = "43"
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
parquet = { workspace = true }
tokio = { workspace = true }
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import os
from datafusion import SessionContext
from pyiceberg.catalog import load_catalog
import pyarrow.parquet as pq
Expand Down Expand Up @@ -66,8 +67,8 @@
"type": "rest",
"uri": "http://rest:8181",
"s3.endpoint": "http://minio:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
"s3.access-key-id": os.environ["AWS_ACCESS_KEY_ID"],
"s3.secret-access-key": os.environ["AWS_SECRET_ACCESS_KEY"],
},
)

Expand Down
80 changes: 77 additions & 3 deletions crates/integration_tests/tests/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::sync::Arc;

pub use datafusion::assert_batches_eq;
pub use datafusion::error::DataFusionError;
pub use datafusion::prelude::SessionContext;
use arrow_schema::TimeUnit;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::assert_batches_eq;
use datafusion::catalog::TableProvider;
use datafusion::error::DataFusionError;
use datafusion::prelude::SessionContext;
use iceberg::{Catalog, TableIdent};
use iceberg_datafusion::IcebergTableProvider;
use iceberg_integration_tests::set_test_fixture;
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;

#[tokio::test]
async fn test_basic_queries() -> Result<(), DataFusionError> {
Expand All @@ -43,6 +48,75 @@ async fn test_basic_queries() -> Result<(), DataFusionError> {
.unwrap(),
);

let schema = table_provider.schema();

assert_eq!(
schema.as_ref(),
&Schema::new(vec![
Field::new("cboolean", DataType::Boolean, true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
Field::new("cint8", DataType::Int32, true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string(),
)])),
Field::new("cint16", DataType::Int32, true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"3".to_string(),
)])),
Field::new("cint32", DataType::Int32, true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"4".to_string(),
)])),
Field::new("cint64", DataType::Int64, true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"5".to_string(),
)])),
Field::new("cfloat32", DataType::Float32, true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"6".to_string(),
)])),
Field::new("cfloat64", DataType::Float64, true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"7".to_string(),
)])),
Field::new("cdecimal", DataType::Decimal128(8, 2), true).with_metadata(HashMap::from(
[(PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string(),)]
)),
Field::new("cdate32", DataType::Date32, true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"9".to_string(),
)])),
Field::new(
"ctimestamp",
DataType::Timestamp(TimeUnit::Microsecond, None),
true
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"10".to_string(),
)])),
Field::new(
"ctimestamptz",
DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("+00:00"))),
true
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"11".to_string(),
)])),
Field::new("cutf8", DataType::Utf8, true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"12".to_string(),
)])),
Field::new("cbinary", DataType::LargeBinary, true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"13".to_string(),
)])),
])
);

ctx.register_table("types_table", table_provider)?;

let batches = ctx
Expand Down

0 comments on commit b5d23ff

Please sign in to comment.