From a6fd9299565a54589d34604af51cd7e4db0c4a46 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Sat, 7 Dec 2024 21:58:31 +0100 Subject: [PATCH] implement aws sdk credential provider --- Cargo.lock | 24 +-- datafusion_iceberg/examples/dataframe.rs | 2 +- datafusion_iceberg/examples/insert_csv.rs | 2 +- datafusion_iceberg/examples/insert_table.rs | 3 +- .../examples/refresh_materialized_view.rs | 2 +- datafusion_iceberg/src/catalog/mirror.rs | 2 +- datafusion_iceberg/src/materialized_view.rs | 16 +- datafusion_iceberg/src/planner.rs | 2 +- datafusion_iceberg/src/table.rs | 17 +- datafusion_iceberg/tests/integration_trino.rs | 4 +- iceberg-file-catalog/src/lib.rs | 7 +- iceberg-glue-catalog/src/lib.rs | 7 +- iceberg-rest-catalog/src/catalog.rs | 7 +- iceberg-rust/Cargo.toml | 2 + iceberg-rust/src/arrow/write.rs | 2 +- iceberg-rust/src/catalog/mod.rs | 3 +- iceberg-rust/src/lib.rs | 1 + iceberg-rust/src/materialized_view/mod.rs | 4 +- .../{catalog/bucket.rs => object_store.rs} | 70 ++++++- iceberg-rust/src/table/mod.rs | 3 +- iceberg-rust/src/view/mod.rs | 3 +- iceberg-s3tables-catalog/src/lib.rs | 185 +++++++++++++++++- iceberg-sql-catalog/src/lib.rs | 7 +- 23 files changed, 316 insertions(+), 59 deletions(-) rename iceberg-rust/src/{catalog/bucket.rs => object_store.rs} (68%) diff --git a/Cargo.lock b/Cargo.lock index 3e630602..53c24c17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -555,9 +555,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "aws-config" -version = "1.5.9" +version = "1.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d6448cfb224dd6a9b9ac734f58622dd0d4751f3589f3b777345745f46b2eb14" +checksum = "9b49afaa341e8dd8577e1a2200468f98956d6eda50bcf4a53246cc00174ba924" dependencies = [ "aws-credential-types", "aws-runtime", @@ -666,15 +666,15 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.48.0" +version = "1.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ded855583fa1d22e88fe39fd6062b062376e50a8211989e07cf5e38d52eb3453" +checksum = "05ca43a4ef210894f93096039ef1d6fa4ad3edfabb3be92b80908b9f2e4b4eab" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json 0.60.7", + "aws-smithy-json 0.61.1", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -688,15 +688,15 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.49.0" +version = "1.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9177ea1192e6601ae16c7273385690d88a7ed386a00b74a6bc894d12103cd933" +checksum = "abaf490c2e48eed0bb8e2da2fb08405647bd7f253996e0f93b981958ea0f73b0" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json 0.60.7", + "aws-smithy-json 0.61.1", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -710,15 +710,15 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.48.0" +version = "1.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "823ef553cf36713c97453e2ddff1eb8f62be7f4523544e2a5db64caf80100f0a" +checksum = "b68fde0d69c8bfdc1060ea7da21df3e39f6014da316783336deff0a9ec28f4bf" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json 0.60.7", + "aws-smithy-json 0.61.1", "aws-smithy-query", "aws-smithy-runtime", "aws-smithy-runtime-api", @@ -2981,6 +2981,8 @@ dependencies = [ "apache-avro", "arrow", "async-trait", + "aws-config", + "aws-credential-types", "chrono", "derive-getters", "derive_builder", diff --git a/datafusion_iceberg/examples/dataframe.rs b/datafusion_iceberg/examples/dataframe.rs index 9307944b..cfd3c310 100644 --- a/datafusion_iceberg/examples/dataframe.rs +++ b/datafusion_iceberg/examples/dataframe.rs @@ -3,9 +3,9 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::dataframe::DataFrameWriteOptions; use datafusion::prelude::SessionContext; use datafusion_iceberg::catalog::catalog::IcebergCatalog; -use iceberg_rust::catalog::bucket::ObjectStoreBuilder; use iceberg_rust::catalog::identifier::Identifier; use iceberg_rust::catalog::Catalog; +use iceberg_rust::object_store::ObjectStoreBuilder; use iceberg_rust::spec::schema::Schema; use iceberg_rust::spec::types::{StructField, StructType}; use iceberg_rust::table::Table; diff --git a/datafusion_iceberg/examples/insert_csv.rs b/datafusion_iceberg/examples/insert_csv.rs index e46596df..a311038e 100644 --- a/datafusion_iceberg/examples/insert_csv.rs +++ b/datafusion_iceberg/examples/insert_csv.rs @@ -6,7 +6,7 @@ use datafusion::{ execution::{context::SessionContext, SessionStateBuilder}, }; use datafusion_expr::ScalarUDF; -use iceberg_rust::catalog::bucket::ObjectStoreBuilder; +use iceberg_rust::object_store::ObjectStoreBuilder; use iceberg_sql_catalog::SqlCatalogList; use datafusion_iceberg::{ diff --git a/datafusion_iceberg/examples/insert_table.rs b/datafusion_iceberg/examples/insert_table.rs index 7ef4544f..e0d1933a 100644 --- a/datafusion_iceberg/examples/insert_table.rs +++ b/datafusion_iceberg/examples/insert_table.rs @@ -1,7 +1,8 @@ use datafusion::{arrow::array::Int64Array, prelude::SessionContext}; use datafusion_iceberg::DataFusionTable; use iceberg_rust::{ - catalog::{bucket::ObjectStoreBuilder, Catalog}, + catalog::Catalog, + object_store::ObjectStoreBuilder, spec::{ partition::{PartitionField, PartitionSpec, Transform}, schema::Schema, diff --git a/datafusion_iceberg/examples/refresh_materialized_view.rs b/datafusion_iceberg/examples/refresh_materialized_view.rs index e4b5ecb8..36573647 100644 --- a/datafusion_iceberg/examples/refresh_materialized_view.rs +++ b/datafusion_iceberg/examples/refresh_materialized_view.rs @@ -1,9 +1,9 @@ use datafusion::{arrow::array::Int64Array, prelude::SessionContext}; use datafusion_iceberg::catalog::catalog::IcebergCatalog; use datafusion_iceberg::materialized_view::refresh_materialized_view; -use iceberg_rust::catalog::bucket::ObjectStoreBuilder; use iceberg_rust::catalog::CatalogList; use iceberg_rust::materialized_view::MaterializedView; +use iceberg_rust::object_store::ObjectStoreBuilder; use iceberg_rust::spec::partition::PartitionSpec; use iceberg_rust::spec::view_metadata::{Version, ViewRepresentation}; use iceberg_rust::spec::{ diff --git a/datafusion_iceberg/src/catalog/mirror.rs b/datafusion_iceberg/src/catalog/mirror.rs index 6b54cea1..10715d4d 100644 --- a/datafusion_iceberg/src/catalog/mirror.rs +++ b/datafusion_iceberg/src/catalog/mirror.rs @@ -7,7 +7,6 @@ use std::{collections::HashSet, sync::Arc}; use iceberg_rust::spec::{tabular::TabularMetadata, view_metadata::REF_PREFIX}; use iceberg_rust::{ catalog::{ - bucket::Bucket, create::{CreateMaterializedView, CreateView}, identifier::Identifier, namespace::Namespace, @@ -15,6 +14,7 @@ use iceberg_rust::{ Catalog, }, error::Error as IcebergError, + object_store::Bucket, spec::table_metadata::new_metadata_location, }; diff --git a/datafusion_iceberg/src/materialized_view.rs b/datafusion_iceberg/src/materialized_view.rs index f0e78be7..fc007b81 100644 --- a/datafusion_iceberg/src/materialized_view.rs +++ b/datafusion_iceberg/src/materialized_view.rs @@ -217,14 +217,6 @@ pub async fn refresh_materialized_view( mod tests { use datafusion::{arrow::array::Int64Array, prelude::SessionContext}; - use iceberg_rust::{ - catalog::bucket::ObjectStoreBuilder, - spec::{ - partition::{PartitionField, Transform}, - schema::Schema, - types::{PrimitiveType, StructField, StructType, Type}, - }, - }; use iceberg_rust::{ catalog::CatalogList, materialized_view::MaterializedView, @@ -234,6 +226,14 @@ mod tests { }, table::Table, }; + use iceberg_rust::{ + object_store::ObjectStoreBuilder, + spec::{ + partition::{PartitionField, Transform}, + schema::Schema, + types::{PrimitiveType, StructField, StructType, Type}, + }, + }; use iceberg_sql_catalog::SqlCatalogList; use std::sync::Arc; diff --git a/datafusion_iceberg/src/planner.rs b/datafusion_iceberg/src/planner.rs index 2e987132..159d54cd 100644 --- a/datafusion_iceberg/src/planner.rs +++ b/datafusion_iceberg/src/planner.rs @@ -640,7 +640,7 @@ mod tests { execution::{context::SessionContext, SessionStateBuilder}, }; use datafusion_expr::ScalarUDF; - use iceberg_rust::catalog::bucket::ObjectStoreBuilder; + use iceberg_rust::object_store::ObjectStoreBuilder; use iceberg_sql_catalog::SqlCatalogList; use tokio::time::sleep; diff --git a/datafusion_iceberg/src/table.rs b/datafusion_iceberg/src/table.rs index db07cbc8..533982f0 100644 --- a/datafusion_iceberg/src/table.rs +++ b/datafusion_iceberg/src/table.rs @@ -590,6 +590,15 @@ impl DataSink for IcebergDataSink { mod tests { use datafusion::{arrow::array::Int64Array, prelude::SessionContext}; + use iceberg_rust::{ + catalog::tabular::Tabular, + object_store::ObjectStoreBuilder, + spec::{ + partition::{PartitionField, Transform}, + schema::Schema, + types::{PrimitiveType, StructField, StructType, Type}, + }, + }; use iceberg_rust::{ catalog::Catalog, spec::{ @@ -599,14 +608,6 @@ mod tests { table::Table, view::View, }; - use iceberg_rust::{ - catalog::{bucket::ObjectStoreBuilder, tabular::Tabular}, - spec::{ - partition::{PartitionField, Transform}, - schema::Schema, - types::{PrimitiveType, StructField, StructType, Type}, - }, - }; use iceberg_sql_catalog::SqlCatalog; use std::{ops::Deref, sync::Arc}; diff --git a/datafusion_iceberg/tests/integration_trino.rs b/datafusion_iceberg/tests/integration_trino.rs index 446892ae..8fda3565 100644 --- a/datafusion_iceberg/tests/integration_trino.rs +++ b/datafusion_iceberg/tests/integration_trino.rs @@ -9,9 +9,9 @@ use datafusion::execution::context::SessionContext; use datafusion_iceberg::catalog::catalog::IcebergCatalog; use iceberg_rest_catalog::apis::configuration::Configuration; use iceberg_rest_catalog::catalog::RestCatalog; -use iceberg_rust::catalog::bucket::ObjectStoreBuilder; use iceberg_rust::catalog::namespace::Namespace; use iceberg_rust::catalog::Catalog; +use iceberg_rust::object_store::ObjectStoreBuilder; use iceberg_rust::spec::partition::{PartitionField, PartitionSpec, Transform}; use iceberg_rust::spec::schema::Schema; use iceberg_rust::spec::types::{PrimitiveType, StructField, StructType, Type}; @@ -183,7 +183,7 @@ async fn integration_trino_rest() { .await .unwrap(); - let object_store = ObjectStoreBuilder::aws() + let object_store = ObjectStoreBuilder::s3() .with_config("aws_access_key_id".parse().unwrap(), "user") .with_config("aws_secret_access_key".parse().unwrap(), "password") .with_config( diff --git a/iceberg-file-catalog/src/lib.rs b/iceberg-file-catalog/src/lib.rs index 6437c190..8e64456c 100644 --- a/iceberg-file-catalog/src/lib.rs +++ b/iceberg-file-catalog/src/lib.rs @@ -8,7 +8,6 @@ use async_trait::async_trait; use futures::{future, TryStreamExt}; use iceberg_rust::{ catalog::{ - bucket::{Bucket, ObjectStoreBuilder}, commit::{ apply_table_updates, apply_view_updates, check_table_requirements, check_view_requirements, CommitTable, CommitView, TableRequirement, @@ -21,6 +20,7 @@ use iceberg_rust::{ }, error::Error as IcebergError, materialized_view::MaterializedView, + object_store::{Bucket, ObjectStoreBuilder}, spec::{ materialized_view_metadata::MaterializedViewMetadata, table_metadata::{new_metadata_location, TableMetadata}, @@ -628,7 +628,10 @@ pub mod tests { catalog::catalog::IcebergCatalog, planner::{iceberg_transform, IcebergQueryPlanner}, }; - use iceberg_rust::catalog::{bucket::ObjectStoreBuilder, namespace::Namespace, Catalog}; + use iceberg_rust::{ + catalog::{namespace::Namespace, Catalog}, + object_store::ObjectStoreBuilder, + }; use std::sync::Arc; // use testcontainers::{core::ExecCommand, runners::AsyncRunner, ImageExt}; // use testcontainers_modules::localstack::LocalStack; diff --git a/iceberg-glue-catalog/src/lib.rs b/iceberg-glue-catalog/src/lib.rs index b1dd55de..2e5ed791 100644 --- a/iceberg-glue-catalog/src/lib.rs +++ b/iceberg-glue-catalog/src/lib.rs @@ -11,7 +11,6 @@ use aws_sdk_glue::{ }; use iceberg_rust::{ catalog::{ - bucket::{Bucket, ObjectStoreBuilder}, commit::{ apply_table_updates, apply_view_updates, check_table_requirements, check_view_requirements, CommitTable, CommitView, TableRequirement, @@ -24,6 +23,7 @@ use iceberg_rust::{ }, error::Error as IcebergError, materialized_view::MaterializedView, + object_store::{Bucket, ObjectStoreBuilder}, spec::{ self, materialized_view_metadata::MaterializedViewMetadata, @@ -910,7 +910,10 @@ pub mod tests { catalog::catalog::IcebergCatalog, planner::{iceberg_transform, IcebergQueryPlanner}, }; - use iceberg_rust::catalog::{bucket::ObjectStoreBuilder, namespace::Namespace, Catalog}; + use iceberg_rust::{ + catalog::{namespace::Namespace, Catalog}, + object_store::ObjectStoreBuilder, + }; use std::sync::Arc; use testcontainers::{ diff --git a/iceberg-rest-catalog/src/catalog.rs b/iceberg-rest-catalog/src/catalog.rs index ce21af5a..c86b03c5 100644 --- a/iceberg-rest-catalog/src/catalog.rs +++ b/iceberg-rest-catalog/src/catalog.rs @@ -5,7 +5,6 @@ Iceberg rest catalog implementation */ use iceberg_rust::{ catalog::{ - bucket::{Bucket, ObjectStoreBuilder}, commit::CommitView, create::{CreateMaterializedView, CreateTable, CreateView}, identifier::{self, Identifier}, @@ -15,6 +14,7 @@ use iceberg_rust::{ }, error::Error, materialized_view::MaterializedView, + object_store::{Bucket, ObjectStoreBuilder}, spec::{ materialized_view_metadata::MaterializedViewMetadata, table_metadata::TableMetadata, @@ -516,9 +516,8 @@ impl CatalogList for RestCatalogList { #[cfg(test)] pub mod tests { use iceberg_rust::{ - catalog::{ - bucket::ObjectStoreBuilder, identifier::Identifier, namespace::Namespace, Catalog, - }, + catalog::{identifier::Identifier, namespace::Namespace, Catalog}, + object_store::ObjectStoreBuilder, spec::{ schema::Schema, types::{PrimitiveType, StructField, StructType, Type}, diff --git a/iceberg-rust/Cargo.toml b/iceberg-rust/Cargo.toml index 1f371a70..ed3bdc05 100644 --- a/iceberg-rust/Cargo.toml +++ b/iceberg-rust/Cargo.toml @@ -31,6 +31,8 @@ thiserror = { workspace = true } derive-getters = { workspace = true } iceberg-rust-spec = { path = "../iceberg-rust-spec", version = "0.5.8" } smallvec = { version = "1.13.2", features = ["const_generics"] } +aws-config = "1.5.10" +aws-credential-types = "1.2.1" [dev-dependencies] chrono = { workspace = true } diff --git a/iceberg-rust/src/arrow/write.rs b/iceberg-rust/src/arrow/write.rs index bcb35b70..69ce4de5 100644 --- a/iceberg-rust/src/arrow/write.rs +++ b/iceberg-rust/src/arrow/write.rs @@ -29,7 +29,7 @@ use parquet::{ }; use uuid::Uuid; -use crate::{catalog::bucket::Bucket, error::Error, file_format::parquet::parquet_to_datafile}; +use crate::{error::Error, file_format::parquet::parquet_to_datafile, object_store::Bucket}; use super::partition::partition_record_batches; diff --git a/iceberg-rust/src/catalog/mod.rs b/iceberg-rust/src/catalog/mod.rs index c232fed1..2042561a 100644 --- a/iceberg-rust/src/catalog/mod.rs +++ b/iceberg-rust/src/catalog/mod.rs @@ -14,13 +14,12 @@ use crate::materialized_view::MaterializedView; use crate::table::Table; use crate::view::View; -use self::bucket::Bucket; use self::commit::{CommitTable, CommitView}; use self::create::{CreateMaterializedView, CreateTable, CreateView}; use self::namespace::Namespace; use self::tabular::Tabular; +use crate::object_store::Bucket; -pub mod bucket; pub mod commit; pub mod create; pub mod tabular; diff --git a/iceberg-rust/src/lib.rs b/iceberg-rust/src/lib.rs index 8b55632e..42da47a7 100644 --- a/iceberg-rust/src/lib.rs +++ b/iceberg-rust/src/lib.rs @@ -7,6 +7,7 @@ pub mod catalog; pub mod error; pub mod file_format; pub mod materialized_view; +pub mod object_store; pub mod spec; pub mod sql; pub mod store; diff --git a/iceberg-rust/src/materialized_view/mod.rs b/iceberg-rust/src/materialized_view/mod.rs index da452e34..8e0b3af7 100644 --- a/iceberg-rust/src/materialized_view/mod.rs +++ b/iceberg-rust/src/materialized_view/mod.rs @@ -11,10 +11,10 @@ use object_store::ObjectStore; use crate::{ catalog::{ - bucket::Bucket, create::CreateMaterializedViewBuilder, identifier::Identifier, - tabular::Tabular, Catalog, + create::CreateMaterializedViewBuilder, identifier::Identifier, tabular::Tabular, Catalog, }, error::Error, + object_store::Bucket, }; use self::{storage_table::StorageTable, transaction::Transaction as MaterializedViewTransaction}; diff --git a/iceberg-rust/src/catalog/bucket.rs b/iceberg-rust/src/object_store.rs similarity index 68% rename from iceberg-rust/src/catalog/bucket.rs rename to iceberg-rust/src/object_store.rs index e6094808..8b8dabee 100644 --- a/iceberg-rust/src/catalog/bucket.rs +++ b/iceberg-rust/src/object_store.rs @@ -2,14 +2,19 @@ Defining the [Bucket] struct for specifying buckets for the ObjectStore. */ -use std::{fmt::Display, path::Path, str::FromStr, sync::Arc}; +use std::{fmt::Display, ops::Deref, path::Path, str::FromStr, sync::Arc, time::SystemTime}; +use async_trait::async_trait; +use aws_config::SdkConfig; +use aws_credential_types::{provider::ProvideCredentials, Credentials}; +use futures::lock::Mutex; +use object_store::Error as ObjectStoreError; use object_store::{ - aws::{AmazonS3Builder, AmazonS3ConfigKey}, + aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential}, gcp::{GoogleCloudStorageBuilder, GoogleConfigKey}, local::LocalFileSystem, memory::InMemory, - ObjectStore, + CredentialProvider, ObjectStore, }; use crate::error::Error; @@ -94,7 +99,7 @@ impl FromStr for ConfigKey { } impl ObjectStoreBuilder { /// Create new AWS S3 Object Store builder - pub fn aws() -> Self { + pub fn s3() -> Self { ObjectStoreBuilder::S3(AmazonS3Builder::from_env()) } /// Create new AWS S3 Object Store builder @@ -144,3 +149,60 @@ impl ObjectStoreBuilder { } } } + +/// AWS Sdk credential provider for object_store +#[derive(Debug)] +pub struct AwsCredentialProvider { + config: SdkConfig, + cache: Arc, Credentials)>>>, +} + +#[async_trait] +impl CredentialProvider for AwsCredentialProvider { + type Credential = AwsCredential; + + async fn get_credential(&self) -> Result, ObjectStoreError> { + let mut guard = self.cache.lock().await; + + let is_valid = if let Some((Some(time), _)) = guard.deref() { + *time >= SystemTime::now() + } else { + false + }; + + if !is_valid { + let provider = self + .config + .credentials_provider() + .ok_or(ObjectStoreError::NotImplemented)?; + + let credentials = + provider + .provide_credentials() + .await + .map_err(|err| ObjectStoreError::Generic { + store: "s3", + source: Box::new(err), + })?; + *guard = Some((credentials.expiry(), credentials)); + }; + + let credentials = &guard.as_ref().unwrap().1; + + Ok(Arc::new(AwsCredential { + key_id: credentials.access_key_id().to_string(), + secret_key: credentials.secret_access_key().to_string(), + token: credentials.session_token().map(ToString::to_string), + })) + } +} + +impl AwsCredentialProvider { + /// Create new credential provider + pub fn new(config: &SdkConfig) -> Self { + Self { + config: config.clone(), + cache: Arc::new(Mutex::new(None)), + } + } +} diff --git a/iceberg-rust/src/table/mod.rs b/iceberg-rust/src/table/mod.rs index 61b66785..ddee3645 100644 --- a/iceberg-rust/src/table/mod.rs +++ b/iceberg-rust/src/table/mod.rs @@ -26,8 +26,9 @@ use iceberg_rust_spec::{ }; use crate::{ - catalog::{bucket::Bucket, create::CreateTableBuilder, identifier::Identifier, Catalog}, + catalog::{create::CreateTableBuilder, identifier::Identifier, Catalog}, error::Error, + object_store::Bucket, table::transaction::TableTransaction, }; diff --git a/iceberg-rust/src/view/mod.rs b/iceberg-rust/src/view/mod.rs index 128bce68..d5a264d5 100644 --- a/iceberg-rust/src/view/mod.rs +++ b/iceberg-rust/src/view/mod.rs @@ -8,8 +8,9 @@ use iceberg_rust_spec::spec::{schema::Schema, view_metadata::ViewMetadata}; use object_store::ObjectStore; use crate::{ - catalog::{bucket::Bucket, create::CreateViewBuilder, identifier::Identifier, Catalog}, + catalog::{create::CreateViewBuilder, identifier::Identifier, Catalog}, error::Error, + object_store::Bucket, }; use self::transaction::Transaction as ViewTransaction; diff --git a/iceberg-s3tables-catalog/src/lib.rs b/iceberg-s3tables-catalog/src/lib.rs index 12818779..fc6b5266 100644 --- a/iceberg-s3tables-catalog/src/lib.rs +++ b/iceberg-s3tables-catalog/src/lib.rs @@ -9,7 +9,6 @@ use aws_config::SdkConfig; use aws_sdk_s3tables::{types::OpenTableFormat, Client}; use iceberg_rust::{ catalog::{ - bucket::{Bucket, ObjectStoreBuilder}, commit::{ apply_table_updates, apply_view_updates, check_table_requirements, check_view_requirements, CommitTable, CommitView, TableRequirement, @@ -22,6 +21,7 @@ use iceberg_rust::{ }, error::Error as IcebergError, materialized_view::MaterializedView, + object_store::{Bucket, ObjectStoreBuilder}, spec::{ self, materialized_view_metadata::MaterializedViewMetadata, @@ -809,3 +809,186 @@ impl CatalogList for S3TablesCatalogList { Vec::new() } } +#[cfg(test)] +pub mod tests { + use aws_config::BehaviorVersion; + use datafusion::{ + arrow::array::{Float64Array, Int64Array}, + common::tree_node::{TransformedResult, TreeNode}, + execution::SessionStateBuilder, + prelude::SessionContext, + }; + use datafusion_iceberg::{ + catalog::catalog::IcebergCatalog, + planner::{iceberg_transform, IcebergQueryPlanner}, + }; + use iceberg_rust::{ + catalog::{namespace::Namespace, Catalog}, + object_store::ObjectStoreBuilder, + }; + + use std::sync::Arc; + + use crate::S3TablesCatalog; + + #[tokio::test] + async fn test_create_update_drop_table() { + let config = aws_config::defaults(BehaviorVersion::v2024_03_28()) + .load() + .await; + + let object_store = ObjectStoreBuilder::s3(); + let iceberg_catalog: Arc = Arc::new( + S3TablesCatalog::new( + &config, + "arn:aws:s3tables:us-east-1:976193222250:bucket/dashbook-s3tables", + object_store, + ) + .unwrap(), + ); + + // iceberg_catalog + // .create_namespace(&Namespace::try_new(&["tpch".to_owned()]).unwrap(), None) + // .await + // .expect("Failed to create namespace"); + + // let namespaces = iceberg_catalog + // .clone() + // .list_namespaces(None) + // .await + // .expect("Failed to list namespaces"); + // assert_eq!(namespaces[0].to_string(), "tpch"); + + let catalog = Arc::new( + IcebergCatalog::new(iceberg_catalog.clone(), None) + .await + .unwrap(), + ); + + let state = SessionStateBuilder::new() + .with_default_features() + .with_query_planner(Arc::new(IcebergQueryPlanner {})) + .build(); + + let ctx = SessionContext::new_with_state(state); + + ctx.register_catalog("warehouse", catalog); + + let sql = "CREATE EXTERNAL TABLE lineitem ( + L_ORDERKEY BIGINT NOT NULL, + L_PARTKEY BIGINT NOT NULL, + L_SUPPKEY BIGINT NOT NULL, + L_LINENUMBER INT NOT NULL, + L_QUANTITY DOUBLE NOT NULL, + L_EXTENDED_PRICE DOUBLE NOT NULL, + L_DISCOUNT DOUBLE NOT NULL, + L_TAX DOUBLE NOT NULL, + L_RETURNFLAG CHAR NOT NULL, + L_LINESTATUS CHAR NOT NULL, + L_SHIPDATE DATE NOT NULL, + L_COMMITDATE DATE NOT NULL, + L_RECEIPTDATE DATE NOT NULL, + L_SHIPINSTRUCT VARCHAR NOT NULL, + L_SHIPMODE VARCHAR NOT NULL, + L_COMMENT VARCHAR NOT NULL ) STORED AS CSV LOCATION '../datafusion_iceberg/testdata/tpch/lineitem.csv' OPTIONS ('has_header' 'false');"; + + let plan = ctx.state().create_logical_plan(sql).await.unwrap(); + + let transformed = plan.transform(iceberg_transform).data().unwrap(); + + ctx.execute_logical_plan(transformed) + .await + .unwrap() + .collect() + .await + .expect("Failed to execute query plan."); + + let sql = "CREATE EXTERNAL TABLE warehouse.tpch.lineitem ( + L_ORDERKEY BIGINT NOT NULL, + L_PARTKEY BIGINT NOT NULL, + L_SUPPKEY BIGINT NOT NULL, + L_LINENUMBER INT NOT NULL, + L_QUANTITY DOUBLE NOT NULL, + L_EXTENDED_PRICE DOUBLE NOT NULL, + L_DISCOUNT DOUBLE NOT NULL, + L_TAX DOUBLE NOT NULL, + L_RETURNFLAG CHAR NOT NULL, + L_LINESTATUS CHAR NOT NULL, + L_SHIPDATE DATE NOT NULL, + L_COMMITDATE DATE NOT NULL, + L_RECEIPTDATE DATE NOT NULL, + L_SHIPINSTRUCT VARCHAR NOT NULL, + L_SHIPMODE VARCHAR NOT NULL, + L_COMMENT VARCHAR NOT NULL ) STORED AS ICEBERG LOCATION '' PARTITIONED BY ( \"month(L_SHIPDATE)\" );"; + + let plan = ctx.state().create_logical_plan(sql).await.unwrap(); + + let transformed = plan.transform(iceberg_transform).data().unwrap(); + + ctx.execute_logical_plan(transformed) + .await + .unwrap() + .collect() + .await + .expect("Failed to execute query plan."); + + let tables = iceberg_catalog + .clone() + .list_tabulars( + &Namespace::try_new(&["tpch".to_owned()]).expect("Failed to create namespace"), + ) + .await + .expect("Failed to list Tables"); + assert_eq!(tables[0].to_string(), "tpch.lineitem".to_owned()); + + let sql = "insert into warehouse.tpch.lineitem select * from lineitem;"; + + let plan = ctx.state().create_logical_plan(sql).await.unwrap(); + + let transformed = plan.transform(iceberg_transform).data().unwrap(); + + ctx.execute_logical_plan(transformed) + .await + .unwrap() + .collect() + .await + .expect("Failed to execute query plan."); + + let batches = ctx + .sql("select sum(L_QUANTITY), L_PARTKEY from warehouse.tpch.lineitem group by L_PARTKEY;") + .await + .expect("Failed to create plan for select") + .collect() + .await + .expect("Failed to execute select query"); + + let mut once = false; + + for batch in batches { + if batch.num_rows() != 0 { + let (amounts, product_ids) = ( + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(), + batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(), + ); + for (product_id, amount) in product_ids.iter().zip(amounts) { + if product_id.unwrap() == 24027 { + assert_eq!(amount.unwrap(), 24.0) + } else if product_id.unwrap() == 63700 { + assert_eq!(amount.unwrap(), 8.0) + } + } + once = true + } + } + + assert!(once); + } +} diff --git a/iceberg-sql-catalog/src/lib.rs b/iceberg-sql-catalog/src/lib.rs index b91e0dc5..509755ac 100644 --- a/iceberg-sql-catalog/src/lib.rs +++ b/iceberg-sql-catalog/src/lib.rs @@ -6,7 +6,6 @@ use std::{ use async_trait::async_trait; use iceberg_rust::{ catalog::{ - bucket::{Bucket, ObjectStoreBuilder}, commit::{ apply_table_updates, apply_view_updates, check_table_requirements, check_view_requirements, CommitTable, CommitView, TableRequirement, @@ -19,6 +18,7 @@ use iceberg_rust::{ }, error::Error as IcebergError, materialized_view::MaterializedView, + object_store::{Bucket, ObjectStoreBuilder}, spec::{ materialized_view_metadata::MaterializedViewMetadata, table_metadata::{new_metadata_location, TableMetadata}, @@ -705,9 +705,8 @@ impl CatalogList for SqlCatalogList { #[cfg(test)] pub mod tests { use iceberg_rust::{ - catalog::{ - bucket::ObjectStoreBuilder, identifier::Identifier, namespace::Namespace, Catalog, - }, + catalog::{identifier::Identifier, namespace::Namespace, Catalog}, + object_store::ObjectStoreBuilder, spec::{ schema::Schema, types::{PrimitiveType, StructField, StructType, Type},