From 5bd2d9d8eb2475ecc60707de8150982b151a7713 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Mon, 23 Dec 2024 13:52:50 +0100 Subject: [PATCH] use postgres and localstack for sql catalog test --- Cargo.lock | 4 + catalogs/iceberg-sql-catalog/Cargo.toml | 4 + catalogs/iceberg-sql-catalog/src/lib.rs | 262 ++++++++++++++++++------ 3 files changed, 211 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 92819b0..c3cfcd2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3040,11 +3040,15 @@ name = "iceberg-sql-catalog" version = "0.6.1" dependencies = [ "async-trait", + "datafusion", + "datafusion_iceberg", "futures", "iceberg-rust", "object_store", "serde_json", "sqlx", + "testcontainers", + "testcontainers-modules", "thiserror 2.0.4", "tokio", "url", diff --git a/catalogs/iceberg-sql-catalog/Cargo.toml b/catalogs/iceberg-sql-catalog/Cargo.toml index 8497486..e0d04a6 100644 --- a/catalogs/iceberg-sql-catalog/Cargo.toml +++ b/catalogs/iceberg-sql-catalog/Cargo.toml @@ -28,3 +28,7 @@ uuid = { version = "1.7.0", features = ["v4"] } [dev-dependencies] tokio = "1" sqlx = { version = "0.8", features = ["runtime-tokio", "tls-rustls", "any", "sqlite", "postgres", "mysql"], default-features = false } +testcontainers-modules = { version = "0.8.0", features = ["localstack", "postgres"] } +testcontainers = "0.20.0" +datafusion_iceberg = { path = "../../datafusion_iceberg" , version = "0.6.1" } +datafusion.workspace = true diff --git a/catalogs/iceberg-sql-catalog/src/lib.rs b/catalogs/iceberg-sql-catalog/src/lib.rs index 23bc230..ac6a005 100644 --- a/catalogs/iceberg-sql-catalog/src/lib.rs +++ b/catalogs/iceberg-sql-catalog/src/lib.rs @@ -716,97 +716,241 @@ impl CatalogList for SqlCatalogList { #[cfg(test)] pub mod tests { + use datafusion::{ + arrow::array::{Float64Array, Int64Array}, + common::tree_node::{TransformedResult, TreeNode}, + execution::SessionStateBuilder, + prelude::SessionContext, + }; + use datafusion_iceberg::{ + catalog::catalog::IcebergCatalog, + planner::{iceberg_transform, IcebergQueryPlanner}, + }; use iceberg_rust::{ catalog::{identifier::Identifier, namespace::Namespace, Catalog}, object_store::ObjectStoreBuilder, spec::{ schema::Schema, types::{PrimitiveType, StructField, StructType, Type}, + util::strip_prefix, }, table::Table, }; + use testcontainers::{core::ExecCommand, runners::AsyncRunner, ImageExt}; + use testcontainers_modules::{localstack::LocalStack, postgres::Postgres}; + use tokio::time::sleep; - use std::sync::Arc; + use std::{sync::Arc, time::Duration}; use crate::SqlCatalog; #[tokio::test] async fn test_create_update_drop_table() { - let object_store = ObjectStoreBuilder::memory(); - let catalog: Arc = Arc::new( - SqlCatalog::new("sqlite://", "test", object_store) + let localstack = LocalStack::default() + .with_env_var("SERVICES", "s3") + .with_env_var("AWS_ACCESS_KEY_ID", "user") + .with_env_var("AWS_SECRET_ACCESS_KEY", "password") + .start() + .await + .unwrap(); + + let command = localstack + .exec(ExecCommand::new(vec![ + "awslocal", + "s3api", + "create-bucket", + "--bucket", + "warehouse", + ])) + .await + .unwrap(); + + let postgres = Postgres::default() + .with_db_name("postgres") + .with_user("postgres") + .with_password("postgres") + .start() + .await + .unwrap(); + + let postgres_host = postgres.get_host().await.unwrap(); + let postgres_port = postgres.get_host_port_ipv4(5432).await.unwrap(); + + while command.exit_code().await.unwrap().is_none() { + sleep(Duration::from_millis(100)).await; + } + + let localstack_host = localstack.get_host().await.unwrap(); + let localstack_port = localstack.get_host_port_ipv4(4566).await.unwrap(); + + let object_store = ObjectStoreBuilder::s3() + .with_config("aws_access_key_id".parse().unwrap(), "user") + .with_config("aws_secret_access_key".parse().unwrap(), "password") + .with_config( + "endpoint".parse().unwrap(), + format!("http://{}:{}", localstack_host, localstack_port), + ) + .with_config("region".parse().unwrap(), "us-east-1") + .with_config("allow_http".parse().unwrap(), "true"); + + let iceberg_catalog = Arc::new( + SqlCatalog::new( + &format!( + "postgres://postgres:postgres@{}:{}/postgres", + postgres_host, postgres_port + ), + "warehouse", + object_store, + ) + .await + .unwrap(), + ); + + let catalog = Arc::new( + IcebergCatalog::new(iceberg_catalog.clone(), None) .await .unwrap(), ); - let identifier = Identifier::parse("load_table.table3", None).unwrap(); - let schema = Schema::builder() - .with_schema_id(0) - .with_identifier_field_ids(vec![1, 2]) - .with_fields( - StructType::builder() - .with_struct_field(StructField { - id: 1, - name: "one".to_string(), - required: false, - field_type: Type::Primitive(PrimitiveType::String), - doc: None, - }) - .with_struct_field(StructField { - id: 2, - name: "two".to_string(), - required: false, - field_type: Type::Primitive(PrimitiveType::String), - doc: None, - }) - .build() - .unwrap(), - ) - .build() - .unwrap(); - let mut table = Table::builder() - .with_name(identifier.name()) - .with_location("/") - .with_schema(schema) - .build(identifier.namespace(), catalog.clone()) + let state = SessionStateBuilder::new() + .with_default_features() + .with_query_planner(Arc::new(IcebergQueryPlanner {})) + .build(); + + let ctx = SessionContext::new_with_state(state); + + ctx.register_catalog("warehouse", catalog); + + let sql = "CREATE EXTERNAL TABLE lineitem ( + L_ORDERKEY BIGINT NOT NULL, + L_PARTKEY BIGINT NOT NULL, + L_SUPPKEY BIGINT NOT NULL, + L_LINENUMBER INT NOT NULL, + L_QUANTITY DOUBLE NOT NULL, + L_EXTENDED_PRICE DOUBLE NOT NULL, + L_DISCOUNT DOUBLE NOT NULL, + L_TAX DOUBLE NOT NULL, + L_RETURNFLAG CHAR NOT NULL, + L_LINESTATUS CHAR NOT NULL, + L_SHIPDATE DATE NOT NULL, + L_COMMITDATE DATE NOT NULL, + L_RECEIPTDATE DATE NOT NULL, + L_SHIPINSTRUCT VARCHAR NOT NULL, + L_SHIPMODE VARCHAR NOT NULL, + L_COMMENT VARCHAR NOT NULL ) STORED AS CSV LOCATION '../../datafusion_iceberg/testdata/tpch/lineitem.csv' OPTIONS ('has_header' 'false');"; + + let plan = ctx.state().create_logical_plan(sql).await.unwrap(); + + let transformed = plan.transform(iceberg_transform).data().unwrap(); + + ctx.execute_logical_plan(transformed) .await - .expect("Failed to create table"); - - let exists = Arc::clone(&catalog) - .tabular_exists(&identifier) + .unwrap() + .collect() + .await + .expect("Failed to execute query plan."); + + let sql = "CREATE EXTERNAL TABLE warehouse.tpch.lineitem ( + L_ORDERKEY BIGINT NOT NULL, + L_PARTKEY BIGINT NOT NULL, + L_SUPPKEY BIGINT NOT NULL, + L_LINENUMBER INT NOT NULL, + L_QUANTITY DOUBLE NOT NULL, + L_EXTENDED_PRICE DOUBLE NOT NULL, + L_DISCOUNT DOUBLE NOT NULL, + L_TAX DOUBLE NOT NULL, + L_RETURNFLAG CHAR NOT NULL, + L_LINESTATUS CHAR NOT NULL, + L_SHIPDATE DATE NOT NULL, + L_COMMITDATE DATE NOT NULL, + L_RECEIPTDATE DATE NOT NULL, + L_SHIPINSTRUCT VARCHAR NOT NULL, + L_SHIPMODE VARCHAR NOT NULL, + L_COMMENT VARCHAR NOT NULL ) STORED AS ICEBERG LOCATION 's3://warehouse/tpch/lineitem' PARTITIONED BY ( \"month(L_SHIPDATE)\" );"; + + let plan = ctx.state().create_logical_plan(sql).await.unwrap(); + + let transformed = plan.transform(iceberg_transform).data().unwrap(); + + ctx.execute_logical_plan(transformed) .await - .expect("Table doesn't exist"); - assert!(exists); + .unwrap() + .collect() + .await + .expect("Failed to execute query plan."); - let tables = catalog + let tables = iceberg_catalog .clone() .list_tabulars( - &Namespace::try_new(&["load_table".to_owned()]) - .expect("Failed to create namespace"), + &Namespace::try_new(&["tpch".to_owned()]).expect("Failed to create namespace"), ) .await .expect("Failed to list Tables"); - assert_eq!(tables[0].to_string(), "load_table.table3".to_owned()); + assert_eq!(tables[0].to_string(), "tpch.lineitem".to_owned()); - let namespaces = catalog - .clone() - .list_namespaces(None) - .await - .expect("Failed to list namespaces"); - assert_eq!(namespaces[0].to_string(), "load_table"); + let sql = "insert into warehouse.tpch.lineitem select * from lineitem;"; + + let plan = ctx.state().create_logical_plan(sql).await.unwrap(); - let transaction = table.new_transaction(None); - transaction.commit().await.expect("Transaction failed."); + let transformed = plan.transform(iceberg_transform).data().unwrap(); - catalog - .drop_table(&identifier) + ctx.execute_logical_plan(transformed) .await - .expect("Failed to drop table."); + .unwrap() + .collect() + .await + .expect("Failed to execute query plan."); + + let batches = ctx + .sql("select sum(L_QUANTITY), L_PARTKEY from warehouse.tpch.lineitem group by L_PARTKEY;") + .await + .expect("Failed to create plan for select") + .collect() + .await + .expect("Failed to execute select query"); + + let mut once = false; + + for batch in batches { + if batch.num_rows() != 0 { + let (amounts, product_ids) = ( + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(), + batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(), + ); + for (product_id, amount) in product_ids.iter().zip(amounts) { + if product_id.unwrap() == 24027 { + assert_eq!(amount.unwrap(), 24.0) + } else if product_id.unwrap() == 63700 { + assert_eq!(amount.unwrap(), 8.0) + } + } + once = true + } + } + + assert!(once); - let exists = Arc::clone(&catalog) - .tabular_exists(&identifier) + let object_store = + iceberg_catalog.object_store(iceberg_rust::object_store::Bucket::S3("warehouse")); + + let version_hint = object_store + .get(&strip_prefix("s3://warehouse/tpch/lineitem/metadata/version-hint.text").into()) + .await + .unwrap() + .bytes() .await - .expect("Table exists failed"); - assert!(!exists); + .unwrap(); + + assert!(std::str::from_utf8(&version_hint) + .unwrap() + .ends_with(".metadata.json")); } }