Skip to content

Commit

Permalink
use postgres and localstack for sql catalog test
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Dec 23, 2024
1 parent b0cc54c commit 5bd2d9d
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 59 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions catalogs/iceberg-sql-catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ uuid = { version = "1.7.0", features = ["v4"] }
[dev-dependencies]
tokio = "1"
sqlx = { version = "0.8", features = ["runtime-tokio", "tls-rustls", "any", "sqlite", "postgres", "mysql"], default-features = false }
testcontainers-modules = { version = "0.8.0", features = ["localstack", "postgres"] }
testcontainers = "0.20.0"
datafusion_iceberg = { path = "../../datafusion_iceberg" , version = "0.6.1" }
datafusion.workspace = true
262 changes: 203 additions & 59 deletions catalogs/iceberg-sql-catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,97 +716,241 @@ impl CatalogList for SqlCatalogList {

#[cfg(test)]
pub mod tests {
use datafusion::{
arrow::array::{Float64Array, Int64Array},
common::tree_node::{TransformedResult, TreeNode},
execution::SessionStateBuilder,
prelude::SessionContext,
};
use datafusion_iceberg::{
catalog::catalog::IcebergCatalog,
planner::{iceberg_transform, IcebergQueryPlanner},
};
use iceberg_rust::{
catalog::{identifier::Identifier, namespace::Namespace, Catalog},
object_store::ObjectStoreBuilder,
spec::{
schema::Schema,
types::{PrimitiveType, StructField, StructType, Type},
util::strip_prefix,
},
table::Table,
};
use testcontainers::{core::ExecCommand, runners::AsyncRunner, ImageExt};
use testcontainers_modules::{localstack::LocalStack, postgres::Postgres};
use tokio::time::sleep;

use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use crate::SqlCatalog;

#[tokio::test]
async fn test_create_update_drop_table() {
let object_store = ObjectStoreBuilder::memory();
let catalog: Arc<dyn Catalog> = Arc::new(
SqlCatalog::new("sqlite://", "test", object_store)
let localstack = LocalStack::default()
.with_env_var("SERVICES", "s3")
.with_env_var("AWS_ACCESS_KEY_ID", "user")
.with_env_var("AWS_SECRET_ACCESS_KEY", "password")
.start()
.await
.unwrap();

let command = localstack
.exec(ExecCommand::new(vec![
"awslocal",
"s3api",
"create-bucket",
"--bucket",
"warehouse",
]))
.await
.unwrap();

let postgres = Postgres::default()
.with_db_name("postgres")
.with_user("postgres")
.with_password("postgres")
.start()
.await
.unwrap();

let postgres_host = postgres.get_host().await.unwrap();
let postgres_port = postgres.get_host_port_ipv4(5432).await.unwrap();

while command.exit_code().await.unwrap().is_none() {
sleep(Duration::from_millis(100)).await;
}

let localstack_host = localstack.get_host().await.unwrap();
let localstack_port = localstack.get_host_port_ipv4(4566).await.unwrap();

let object_store = ObjectStoreBuilder::s3()
.with_config("aws_access_key_id".parse().unwrap(), "user")
.with_config("aws_secret_access_key".parse().unwrap(), "password")
.with_config(
"endpoint".parse().unwrap(),
format!("http://{}:{}", localstack_host, localstack_port),
)
.with_config("region".parse().unwrap(), "us-east-1")
.with_config("allow_http".parse().unwrap(), "true");

let iceberg_catalog = Arc::new(
SqlCatalog::new(
&format!(
"postgres://postgres:postgres@{}:{}/postgres",
postgres_host, postgres_port
),
"warehouse",
object_store,
)
.await
.unwrap(),
);

let catalog = Arc::new(
IcebergCatalog::new(iceberg_catalog.clone(), None)
.await
.unwrap(),
);
let identifier = Identifier::parse("load_table.table3", None).unwrap();
let schema = Schema::builder()
.with_schema_id(0)
.with_identifier_field_ids(vec![1, 2])
.with_fields(
StructType::builder()
.with_struct_field(StructField {
id: 1,
name: "one".to_string(),
required: false,
field_type: Type::Primitive(PrimitiveType::String),
doc: None,
})
.with_struct_field(StructField {
id: 2,
name: "two".to_string(),
required: false,
field_type: Type::Primitive(PrimitiveType::String),
doc: None,
})
.build()
.unwrap(),
)
.build()
.unwrap();

let mut table = Table::builder()
.with_name(identifier.name())
.with_location("/")
.with_schema(schema)
.build(identifier.namespace(), catalog.clone())
let state = SessionStateBuilder::new()
.with_default_features()
.with_query_planner(Arc::new(IcebergQueryPlanner {}))
.build();

let ctx = SessionContext::new_with_state(state);

ctx.register_catalog("warehouse", catalog);

let sql = "CREATE EXTERNAL TABLE lineitem (
L_ORDERKEY BIGINT NOT NULL,
L_PARTKEY BIGINT NOT NULL,
L_SUPPKEY BIGINT NOT NULL,
L_LINENUMBER INT NOT NULL,
L_QUANTITY DOUBLE NOT NULL,
L_EXTENDED_PRICE DOUBLE NOT NULL,
L_DISCOUNT DOUBLE NOT NULL,
L_TAX DOUBLE NOT NULL,
L_RETURNFLAG CHAR NOT NULL,
L_LINESTATUS CHAR NOT NULL,
L_SHIPDATE DATE NOT NULL,
L_COMMITDATE DATE NOT NULL,
L_RECEIPTDATE DATE NOT NULL,
L_SHIPINSTRUCT VARCHAR NOT NULL,
L_SHIPMODE VARCHAR NOT NULL,
L_COMMENT VARCHAR NOT NULL ) STORED AS CSV LOCATION '../../datafusion_iceberg/testdata/tpch/lineitem.csv' OPTIONS ('has_header' 'false');";

let plan = ctx.state().create_logical_plan(sql).await.unwrap();

let transformed = plan.transform(iceberg_transform).data().unwrap();

ctx.execute_logical_plan(transformed)
.await
.expect("Failed to create table");

let exists = Arc::clone(&catalog)
.tabular_exists(&identifier)
.unwrap()
.collect()
.await
.expect("Failed to execute query plan.");

let sql = "CREATE EXTERNAL TABLE warehouse.tpch.lineitem (
L_ORDERKEY BIGINT NOT NULL,
L_PARTKEY BIGINT NOT NULL,
L_SUPPKEY BIGINT NOT NULL,
L_LINENUMBER INT NOT NULL,
L_QUANTITY DOUBLE NOT NULL,
L_EXTENDED_PRICE DOUBLE NOT NULL,
L_DISCOUNT DOUBLE NOT NULL,
L_TAX DOUBLE NOT NULL,
L_RETURNFLAG CHAR NOT NULL,
L_LINESTATUS CHAR NOT NULL,
L_SHIPDATE DATE NOT NULL,
L_COMMITDATE DATE NOT NULL,
L_RECEIPTDATE DATE NOT NULL,
L_SHIPINSTRUCT VARCHAR NOT NULL,
L_SHIPMODE VARCHAR NOT NULL,
L_COMMENT VARCHAR NOT NULL ) STORED AS ICEBERG LOCATION 's3://warehouse/tpch/lineitem' PARTITIONED BY ( \"month(L_SHIPDATE)\" );";

let plan = ctx.state().create_logical_plan(sql).await.unwrap();

let transformed = plan.transform(iceberg_transform).data().unwrap();

ctx.execute_logical_plan(transformed)
.await
.expect("Table doesn't exist");
assert!(exists);
.unwrap()
.collect()
.await
.expect("Failed to execute query plan.");

let tables = catalog
let tables = iceberg_catalog
.clone()
.list_tabulars(
&Namespace::try_new(&["load_table".to_owned()])
.expect("Failed to create namespace"),
&Namespace::try_new(&["tpch".to_owned()]).expect("Failed to create namespace"),
)
.await
.expect("Failed to list Tables");
assert_eq!(tables[0].to_string(), "load_table.table3".to_owned());
assert_eq!(tables[0].to_string(), "tpch.lineitem".to_owned());

let namespaces = catalog
.clone()
.list_namespaces(None)
.await
.expect("Failed to list namespaces");
assert_eq!(namespaces[0].to_string(), "load_table");
let sql = "insert into warehouse.tpch.lineitem select * from lineitem;";

let plan = ctx.state().create_logical_plan(sql).await.unwrap();

let transaction = table.new_transaction(None);
transaction.commit().await.expect("Transaction failed.");
let transformed = plan.transform(iceberg_transform).data().unwrap();

catalog
.drop_table(&identifier)
ctx.execute_logical_plan(transformed)
.await
.expect("Failed to drop table.");
.unwrap()
.collect()
.await
.expect("Failed to execute query plan.");

let batches = ctx
.sql("select sum(L_QUANTITY), L_PARTKEY from warehouse.tpch.lineitem group by L_PARTKEY;")
.await
.expect("Failed to create plan for select")
.collect()
.await
.expect("Failed to execute select query");

let mut once = false;

for batch in batches {
if batch.num_rows() != 0 {
let (amounts, product_ids) = (
batch
.column(0)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap(),
batch
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap(),
);
for (product_id, amount) in product_ids.iter().zip(amounts) {
if product_id.unwrap() == 24027 {
assert_eq!(amount.unwrap(), 24.0)
} else if product_id.unwrap() == 63700 {
assert_eq!(amount.unwrap(), 8.0)
}
}
once = true
}
}

assert!(once);

let exists = Arc::clone(&catalog)
.tabular_exists(&identifier)
let object_store =
iceberg_catalog.object_store(iceberg_rust::object_store::Bucket::S3("warehouse"));

let version_hint = object_store
.get(&strip_prefix("s3://warehouse/tpch/lineitem/metadata/version-hint.text").into())
.await
.unwrap()
.bytes()
.await
.expect("Table exists failed");
assert!(!exists);
.unwrap();

assert!(std::str::from_utf8(&version_hint)
.unwrap()
.ends_with(".metadata.json"));
}
}

0 comments on commit 5bd2d9d

Please sign in to comment.