From c34788bbce804ff5f2118ca8a62ab343e79a46dc Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Sun, 8 Dec 2024 08:48:10 +0100 Subject: [PATCH 1/4] add aws credential provider --- Cargo.lock | 24 ++++--- datafusion_iceberg/examples/dataframe.rs | 2 +- datafusion_iceberg/examples/insert_csv.rs | 2 +- datafusion_iceberg/examples/insert_table.rs | 3 +- .../examples/refresh_materialized_view.rs | 2 +- datafusion_iceberg/src/catalog/mirror.rs | 2 +- datafusion_iceberg/src/materialized_view.rs | 16 ++--- datafusion_iceberg/src/planner.rs | 2 +- datafusion_iceberg/src/table.rs | 17 ++--- datafusion_iceberg/tests/integration_trino.rs | 4 +- iceberg-file-catalog/src/lib.rs | 7 +- iceberg-glue-catalog/src/lib.rs | 7 +- iceberg-rest-catalog/src/catalog.rs | 7 +- iceberg-rust/Cargo.toml | 2 + iceberg-rust/src/arrow/write.rs | 2 +- iceberg-rust/src/catalog/mod.rs | 3 +- iceberg-rust/src/lib.rs | 1 + iceberg-rust/src/materialized_view/mod.rs | 4 +- .../{catalog/bucket.rs => object_store.rs} | 71 +++++++++++++++++-- iceberg-rust/src/table/mod.rs | 3 +- iceberg-rust/src/view/mod.rs | 3 +- iceberg-sql-catalog/src/lib.rs | 7 +- 22 files changed, 133 insertions(+), 58 deletions(-) rename iceberg-rust/src/{catalog/bucket.rs => object_store.rs} (68%) diff --git a/Cargo.lock b/Cargo.lock index 3e630602..53c24c17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -555,9 +555,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "aws-config" -version = "1.5.9" +version = "1.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d6448cfb224dd6a9b9ac734f58622dd0d4751f3589f3b777345745f46b2eb14" +checksum = "9b49afaa341e8dd8577e1a2200468f98956d6eda50bcf4a53246cc00174ba924" dependencies = [ "aws-credential-types", "aws-runtime", @@ -666,15 +666,15 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.48.0" +version = "1.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ded855583fa1d22e88fe39fd6062b062376e50a8211989e07cf5e38d52eb3453" +checksum = "05ca43a4ef210894f93096039ef1d6fa4ad3edfabb3be92b80908b9f2e4b4eab" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json 0.60.7", + "aws-smithy-json 0.61.1", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -688,15 +688,15 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.49.0" +version = "1.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9177ea1192e6601ae16c7273385690d88a7ed386a00b74a6bc894d12103cd933" +checksum = "abaf490c2e48eed0bb8e2da2fb08405647bd7f253996e0f93b981958ea0f73b0" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json 0.60.7", + "aws-smithy-json 0.61.1", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -710,15 +710,15 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.48.0" +version = "1.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "823ef553cf36713c97453e2ddff1eb8f62be7f4523544e2a5db64caf80100f0a" +checksum = "b68fde0d69c8bfdc1060ea7da21df3e39f6014da316783336deff0a9ec28f4bf" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json 0.60.7", + "aws-smithy-json 0.61.1", "aws-smithy-query", "aws-smithy-runtime", "aws-smithy-runtime-api", @@ -2981,6 +2981,8 @@ dependencies = [ "apache-avro", "arrow", "async-trait", + "aws-config", + "aws-credential-types", "chrono", "derive-getters", "derive_builder", diff --git a/datafusion_iceberg/examples/dataframe.rs b/datafusion_iceberg/examples/dataframe.rs index 9307944b..cfd3c310 100644 --- a/datafusion_iceberg/examples/dataframe.rs +++ b/datafusion_iceberg/examples/dataframe.rs @@ -3,9 +3,9 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::dataframe::DataFrameWriteOptions; use datafusion::prelude::SessionContext; use datafusion_iceberg::catalog::catalog::IcebergCatalog; -use iceberg_rust::catalog::bucket::ObjectStoreBuilder; use iceberg_rust::catalog::identifier::Identifier; use iceberg_rust::catalog::Catalog; +use iceberg_rust::object_store::ObjectStoreBuilder; use iceberg_rust::spec::schema::Schema; use iceberg_rust::spec::types::{StructField, StructType}; use iceberg_rust::table::Table; diff --git a/datafusion_iceberg/examples/insert_csv.rs b/datafusion_iceberg/examples/insert_csv.rs index e46596df..a311038e 100644 --- a/datafusion_iceberg/examples/insert_csv.rs +++ b/datafusion_iceberg/examples/insert_csv.rs @@ -6,7 +6,7 @@ use datafusion::{ execution::{context::SessionContext, SessionStateBuilder}, }; use datafusion_expr::ScalarUDF; -use iceberg_rust::catalog::bucket::ObjectStoreBuilder; +use iceberg_rust::object_store::ObjectStoreBuilder; use iceberg_sql_catalog::SqlCatalogList; use datafusion_iceberg::{ diff --git a/datafusion_iceberg/examples/insert_table.rs b/datafusion_iceberg/examples/insert_table.rs index 7ef4544f..e0d1933a 100644 --- a/datafusion_iceberg/examples/insert_table.rs +++ b/datafusion_iceberg/examples/insert_table.rs @@ -1,7 +1,8 @@ use datafusion::{arrow::array::Int64Array, prelude::SessionContext}; use datafusion_iceberg::DataFusionTable; use iceberg_rust::{ - catalog::{bucket::ObjectStoreBuilder, Catalog}, + catalog::Catalog, + object_store::ObjectStoreBuilder, spec::{ partition::{PartitionField, PartitionSpec, Transform}, schema::Schema, diff --git a/datafusion_iceberg/examples/refresh_materialized_view.rs b/datafusion_iceberg/examples/refresh_materialized_view.rs index e4b5ecb8..36573647 100644 --- a/datafusion_iceberg/examples/refresh_materialized_view.rs +++ b/datafusion_iceberg/examples/refresh_materialized_view.rs @@ -1,9 +1,9 @@ use datafusion::{arrow::array::Int64Array, prelude::SessionContext}; use datafusion_iceberg::catalog::catalog::IcebergCatalog; use datafusion_iceberg::materialized_view::refresh_materialized_view; -use iceberg_rust::catalog::bucket::ObjectStoreBuilder; use iceberg_rust::catalog::CatalogList; use iceberg_rust::materialized_view::MaterializedView; +use iceberg_rust::object_store::ObjectStoreBuilder; use iceberg_rust::spec::partition::PartitionSpec; use iceberg_rust::spec::view_metadata::{Version, ViewRepresentation}; use iceberg_rust::spec::{ diff --git a/datafusion_iceberg/src/catalog/mirror.rs b/datafusion_iceberg/src/catalog/mirror.rs index 6b54cea1..10715d4d 100644 --- a/datafusion_iceberg/src/catalog/mirror.rs +++ b/datafusion_iceberg/src/catalog/mirror.rs @@ -7,7 +7,6 @@ use std::{collections::HashSet, sync::Arc}; use iceberg_rust::spec::{tabular::TabularMetadata, view_metadata::REF_PREFIX}; use iceberg_rust::{ catalog::{ - bucket::Bucket, create::{CreateMaterializedView, CreateView}, identifier::Identifier, namespace::Namespace, @@ -15,6 +14,7 @@ use iceberg_rust::{ Catalog, }, error::Error as IcebergError, + object_store::Bucket, spec::table_metadata::new_metadata_location, }; diff --git a/datafusion_iceberg/src/materialized_view.rs b/datafusion_iceberg/src/materialized_view.rs index f0e78be7..fc007b81 100644 --- a/datafusion_iceberg/src/materialized_view.rs +++ b/datafusion_iceberg/src/materialized_view.rs @@ -217,14 +217,6 @@ pub async fn refresh_materialized_view( mod tests { use datafusion::{arrow::array::Int64Array, prelude::SessionContext}; - use iceberg_rust::{ - catalog::bucket::ObjectStoreBuilder, - spec::{ - partition::{PartitionField, Transform}, - schema::Schema, - types::{PrimitiveType, StructField, StructType, Type}, - }, - }; use iceberg_rust::{ catalog::CatalogList, materialized_view::MaterializedView, @@ -234,6 +226,14 @@ mod tests { }, table::Table, }; + use iceberg_rust::{ + object_store::ObjectStoreBuilder, + spec::{ + partition::{PartitionField, Transform}, + schema::Schema, + types::{PrimitiveType, StructField, StructType, Type}, + }, + }; use iceberg_sql_catalog::SqlCatalogList; use std::sync::Arc; diff --git a/datafusion_iceberg/src/planner.rs b/datafusion_iceberg/src/planner.rs index 2e987132..159d54cd 100644 --- a/datafusion_iceberg/src/planner.rs +++ b/datafusion_iceberg/src/planner.rs @@ -640,7 +640,7 @@ mod tests { execution::{context::SessionContext, SessionStateBuilder}, }; use datafusion_expr::ScalarUDF; - use iceberg_rust::catalog::bucket::ObjectStoreBuilder; + use iceberg_rust::object_store::ObjectStoreBuilder; use iceberg_sql_catalog::SqlCatalogList; use tokio::time::sleep; diff --git a/datafusion_iceberg/src/table.rs b/datafusion_iceberg/src/table.rs index db07cbc8..533982f0 100644 --- a/datafusion_iceberg/src/table.rs +++ b/datafusion_iceberg/src/table.rs @@ -590,6 +590,15 @@ impl DataSink for IcebergDataSink { mod tests { use datafusion::{arrow::array::Int64Array, prelude::SessionContext}; + use iceberg_rust::{ + catalog::tabular::Tabular, + object_store::ObjectStoreBuilder, + spec::{ + partition::{PartitionField, Transform}, + schema::Schema, + types::{PrimitiveType, StructField, StructType, Type}, + }, + }; use iceberg_rust::{ catalog::Catalog, spec::{ @@ -599,14 +608,6 @@ mod tests { table::Table, view::View, }; - use iceberg_rust::{ - catalog::{bucket::ObjectStoreBuilder, tabular::Tabular}, - spec::{ - partition::{PartitionField, Transform}, - schema::Schema, - types::{PrimitiveType, StructField, StructType, Type}, - }, - }; use iceberg_sql_catalog::SqlCatalog; use std::{ops::Deref, sync::Arc}; diff --git a/datafusion_iceberg/tests/integration_trino.rs b/datafusion_iceberg/tests/integration_trino.rs index 446892ae..8fda3565 100644 --- a/datafusion_iceberg/tests/integration_trino.rs +++ b/datafusion_iceberg/tests/integration_trino.rs @@ -9,9 +9,9 @@ use datafusion::execution::context::SessionContext; use datafusion_iceberg::catalog::catalog::IcebergCatalog; use iceberg_rest_catalog::apis::configuration::Configuration; use iceberg_rest_catalog::catalog::RestCatalog; -use iceberg_rust::catalog::bucket::ObjectStoreBuilder; use iceberg_rust::catalog::namespace::Namespace; use iceberg_rust::catalog::Catalog; +use iceberg_rust::object_store::ObjectStoreBuilder; use iceberg_rust::spec::partition::{PartitionField, PartitionSpec, Transform}; use iceberg_rust::spec::schema::Schema; use iceberg_rust::spec::types::{PrimitiveType, StructField, StructType, Type}; @@ -183,7 +183,7 @@ async fn integration_trino_rest() { .await .unwrap(); - let object_store = ObjectStoreBuilder::aws() + let object_store = ObjectStoreBuilder::s3() .with_config("aws_access_key_id".parse().unwrap(), "user") .with_config("aws_secret_access_key".parse().unwrap(), "password") .with_config( diff --git a/iceberg-file-catalog/src/lib.rs b/iceberg-file-catalog/src/lib.rs index 6437c190..8e64456c 100644 --- a/iceberg-file-catalog/src/lib.rs +++ b/iceberg-file-catalog/src/lib.rs @@ -8,7 +8,6 @@ use async_trait::async_trait; use futures::{future, TryStreamExt}; use iceberg_rust::{ catalog::{ - bucket::{Bucket, ObjectStoreBuilder}, commit::{ apply_table_updates, apply_view_updates, check_table_requirements, check_view_requirements, CommitTable, CommitView, TableRequirement, @@ -21,6 +20,7 @@ use iceberg_rust::{ }, error::Error as IcebergError, materialized_view::MaterializedView, + object_store::{Bucket, ObjectStoreBuilder}, spec::{ materialized_view_metadata::MaterializedViewMetadata, table_metadata::{new_metadata_location, TableMetadata}, @@ -628,7 +628,10 @@ pub mod tests { catalog::catalog::IcebergCatalog, planner::{iceberg_transform, IcebergQueryPlanner}, }; - use iceberg_rust::catalog::{bucket::ObjectStoreBuilder, namespace::Namespace, Catalog}; + use iceberg_rust::{ + catalog::{namespace::Namespace, Catalog}, + object_store::ObjectStoreBuilder, + }; use std::sync::Arc; // use testcontainers::{core::ExecCommand, runners::AsyncRunner, ImageExt}; // use testcontainers_modules::localstack::LocalStack; diff --git a/iceberg-glue-catalog/src/lib.rs b/iceberg-glue-catalog/src/lib.rs index b1dd55de..2e5ed791 100644 --- a/iceberg-glue-catalog/src/lib.rs +++ b/iceberg-glue-catalog/src/lib.rs @@ -11,7 +11,6 @@ use aws_sdk_glue::{ }; use iceberg_rust::{ catalog::{ - bucket::{Bucket, ObjectStoreBuilder}, commit::{ apply_table_updates, apply_view_updates, check_table_requirements, check_view_requirements, CommitTable, CommitView, TableRequirement, @@ -24,6 +23,7 @@ use iceberg_rust::{ }, error::Error as IcebergError, materialized_view::MaterializedView, + object_store::{Bucket, ObjectStoreBuilder}, spec::{ self, materialized_view_metadata::MaterializedViewMetadata, @@ -910,7 +910,10 @@ pub mod tests { catalog::catalog::IcebergCatalog, planner::{iceberg_transform, IcebergQueryPlanner}, }; - use iceberg_rust::catalog::{bucket::ObjectStoreBuilder, namespace::Namespace, Catalog}; + use iceberg_rust::{ + catalog::{namespace::Namespace, Catalog}, + object_store::ObjectStoreBuilder, + }; use std::sync::Arc; use testcontainers::{ diff --git a/iceberg-rest-catalog/src/catalog.rs b/iceberg-rest-catalog/src/catalog.rs index ce21af5a..c86b03c5 100644 --- a/iceberg-rest-catalog/src/catalog.rs +++ b/iceberg-rest-catalog/src/catalog.rs @@ -5,7 +5,6 @@ Iceberg rest catalog implementation */ use iceberg_rust::{ catalog::{ - bucket::{Bucket, ObjectStoreBuilder}, commit::CommitView, create::{CreateMaterializedView, CreateTable, CreateView}, identifier::{self, Identifier}, @@ -15,6 +14,7 @@ use iceberg_rust::{ }, error::Error, materialized_view::MaterializedView, + object_store::{Bucket, ObjectStoreBuilder}, spec::{ materialized_view_metadata::MaterializedViewMetadata, table_metadata::TableMetadata, @@ -516,9 +516,8 @@ impl CatalogList for RestCatalogList { #[cfg(test)] pub mod tests { use iceberg_rust::{ - catalog::{ - bucket::ObjectStoreBuilder, identifier::Identifier, namespace::Namespace, Catalog, - }, + catalog::{identifier::Identifier, namespace::Namespace, Catalog}, + object_store::ObjectStoreBuilder, spec::{ schema::Schema, types::{PrimitiveType, StructField, StructType, Type}, diff --git a/iceberg-rust/Cargo.toml b/iceberg-rust/Cargo.toml index 1f371a70..ed3bdc05 100644 --- a/iceberg-rust/Cargo.toml +++ b/iceberg-rust/Cargo.toml @@ -31,6 +31,8 @@ thiserror = { workspace = true } derive-getters = { workspace = true } iceberg-rust-spec = { path = "../iceberg-rust-spec", version = "0.5.8" } smallvec = { version = "1.13.2", features = ["const_generics"] } +aws-config = "1.5.10" +aws-credential-types = "1.2.1" [dev-dependencies] chrono = { workspace = true } diff --git a/iceberg-rust/src/arrow/write.rs b/iceberg-rust/src/arrow/write.rs index bcb35b70..69ce4de5 100644 --- a/iceberg-rust/src/arrow/write.rs +++ b/iceberg-rust/src/arrow/write.rs @@ -29,7 +29,7 @@ use parquet::{ }; use uuid::Uuid; -use crate::{catalog::bucket::Bucket, error::Error, file_format::parquet::parquet_to_datafile}; +use crate::{error::Error, file_format::parquet::parquet_to_datafile, object_store::Bucket}; use super::partition::partition_record_batches; diff --git a/iceberg-rust/src/catalog/mod.rs b/iceberg-rust/src/catalog/mod.rs index c232fed1..2042561a 100644 --- a/iceberg-rust/src/catalog/mod.rs +++ b/iceberg-rust/src/catalog/mod.rs @@ -14,13 +14,12 @@ use crate::materialized_view::MaterializedView; use crate::table::Table; use crate::view::View; -use self::bucket::Bucket; use self::commit::{CommitTable, CommitView}; use self::create::{CreateMaterializedView, CreateTable, CreateView}; use self::namespace::Namespace; use self::tabular::Tabular; +use crate::object_store::Bucket; -pub mod bucket; pub mod commit; pub mod create; pub mod tabular; diff --git a/iceberg-rust/src/lib.rs b/iceberg-rust/src/lib.rs index 8b55632e..42da47a7 100644 --- a/iceberg-rust/src/lib.rs +++ b/iceberg-rust/src/lib.rs @@ -7,6 +7,7 @@ pub mod catalog; pub mod error; pub mod file_format; pub mod materialized_view; +pub mod object_store; pub mod spec; pub mod sql; pub mod store; diff --git a/iceberg-rust/src/materialized_view/mod.rs b/iceberg-rust/src/materialized_view/mod.rs index da452e34..8e0b3af7 100644 --- a/iceberg-rust/src/materialized_view/mod.rs +++ b/iceberg-rust/src/materialized_view/mod.rs @@ -11,10 +11,10 @@ use object_store::ObjectStore; use crate::{ catalog::{ - bucket::Bucket, create::CreateMaterializedViewBuilder, identifier::Identifier, - tabular::Tabular, Catalog, + create::CreateMaterializedViewBuilder, identifier::Identifier, tabular::Tabular, Catalog, }, error::Error, + object_store::Bucket, }; use self::{storage_table::StorageTable, transaction::Transaction as MaterializedViewTransaction}; diff --git a/iceberg-rust/src/catalog/bucket.rs b/iceberg-rust/src/object_store.rs similarity index 68% rename from iceberg-rust/src/catalog/bucket.rs rename to iceberg-rust/src/object_store.rs index e6094808..cb9684d4 100644 --- a/iceberg-rust/src/catalog/bucket.rs +++ b/iceberg-rust/src/object_store.rs @@ -2,14 +2,19 @@ Defining the [Bucket] struct for specifying buckets for the ObjectStore. */ -use std::{fmt::Display, path::Path, str::FromStr, sync::Arc}; +use std::{fmt::Display, ops::Deref, path::Path, str::FromStr, sync::Arc, time::SystemTime}; +use async_trait::async_trait; +use aws_config::SdkConfig; +use aws_credential_types::{provider::ProvideCredentials, Credentials}; +use futures::lock::Mutex; +use object_store::Error as ObjectStoreError; use object_store::{ - aws::{AmazonS3Builder, AmazonS3ConfigKey}, + aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential}, gcp::{GoogleCloudStorageBuilder, GoogleConfigKey}, local::LocalFileSystem, memory::InMemory, - ObjectStore, + CredentialProvider, ObjectStore, }; use crate::error::Error; @@ -94,7 +99,7 @@ impl FromStr for ConfigKey { } impl ObjectStoreBuilder { /// Create new AWS S3 Object Store builder - pub fn aws() -> Self { + pub fn s3() -> Self { ObjectStoreBuilder::S3(AmazonS3Builder::from_env()) } /// Create new AWS S3 Object Store builder @@ -144,3 +149,61 @@ impl ObjectStoreBuilder { } } } + +/// AWS Sdk credential provider for object_store +#[derive(Debug)] +#[allow(clippy::type_complexity)] +pub struct AwsCredentialProvider { + config: SdkConfig, + cache: Arc, Credentials)>>>, +} + +#[async_trait] +impl CredentialProvider for AwsCredentialProvider { + type Credential = AwsCredential; + + async fn get_credential(&self) -> Result, ObjectStoreError> { + let mut guard = self.cache.lock().await; + + let is_valid = if let Some((Some(time), _)) = guard.deref() { + *time >= SystemTime::now() + } else { + false + }; + + if !is_valid { + let provider = self + .config + .credentials_provider() + .ok_or(ObjectStoreError::NotImplemented)?; + + let credentials = + provider + .provide_credentials() + .await + .map_err(|err| ObjectStoreError::Generic { + store: "s3", + source: Box::new(err), + })?; + *guard = Some((credentials.expiry(), credentials)); + }; + + let credentials = &guard.as_ref().unwrap().1; + + Ok(Arc::new(AwsCredential { + key_id: credentials.access_key_id().to_string(), + secret_key: credentials.secret_access_key().to_string(), + token: credentials.session_token().map(ToString::to_string), + })) + } +} + +impl AwsCredentialProvider { + /// Create new credential provider + pub fn new(config: &SdkConfig) -> Self { + Self { + config: config.clone(), + cache: Arc::new(Mutex::new(None)), + } + } +} diff --git a/iceberg-rust/src/table/mod.rs b/iceberg-rust/src/table/mod.rs index 61b66785..ddee3645 100644 --- a/iceberg-rust/src/table/mod.rs +++ b/iceberg-rust/src/table/mod.rs @@ -26,8 +26,9 @@ use iceberg_rust_spec::{ }; use crate::{ - catalog::{bucket::Bucket, create::CreateTableBuilder, identifier::Identifier, Catalog}, + catalog::{create::CreateTableBuilder, identifier::Identifier, Catalog}, error::Error, + object_store::Bucket, table::transaction::TableTransaction, }; diff --git a/iceberg-rust/src/view/mod.rs b/iceberg-rust/src/view/mod.rs index 128bce68..d5a264d5 100644 --- a/iceberg-rust/src/view/mod.rs +++ b/iceberg-rust/src/view/mod.rs @@ -8,8 +8,9 @@ use iceberg_rust_spec::spec::{schema::Schema, view_metadata::ViewMetadata}; use object_store::ObjectStore; use crate::{ - catalog::{bucket::Bucket, create::CreateViewBuilder, identifier::Identifier, Catalog}, + catalog::{create::CreateViewBuilder, identifier::Identifier, Catalog}, error::Error, + object_store::Bucket, }; use self::transaction::Transaction as ViewTransaction; diff --git a/iceberg-sql-catalog/src/lib.rs b/iceberg-sql-catalog/src/lib.rs index b91e0dc5..509755ac 100644 --- a/iceberg-sql-catalog/src/lib.rs +++ b/iceberg-sql-catalog/src/lib.rs @@ -6,7 +6,6 @@ use std::{ use async_trait::async_trait; use iceberg_rust::{ catalog::{ - bucket::{Bucket, ObjectStoreBuilder}, commit::{ apply_table_updates, apply_view_updates, check_table_requirements, check_view_requirements, CommitTable, CommitView, TableRequirement, @@ -19,6 +18,7 @@ use iceberg_rust::{ }, error::Error as IcebergError, materialized_view::MaterializedView, + object_store::{Bucket, ObjectStoreBuilder}, spec::{ materialized_view_metadata::MaterializedViewMetadata, table_metadata::{new_metadata_location, TableMetadata}, @@ -705,9 +705,8 @@ impl CatalogList for SqlCatalogList { #[cfg(test)] pub mod tests { use iceberg_rust::{ - catalog::{ - bucket::ObjectStoreBuilder, identifier::Identifier, namespace::Namespace, Catalog, - }, + catalog::{identifier::Identifier, namespace::Namespace, Catalog}, + object_store::ObjectStoreBuilder, spec::{ schema::Schema, types::{PrimitiveType, StructField, StructType, Type}, From d57c9d9427324d44890bedf67af0e2c1ea7c6745 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Sun, 8 Dec 2024 08:57:29 +0100 Subject: [PATCH 2/4] refactor files --- datafusion_iceberg/src/catalog/mirror.rs | 2 +- iceberg-file-catalog/src/lib.rs | 2 +- iceberg-glue-catalog/src/lib.rs | 2 +- iceberg-rust/src/lib.rs | 1 - iceberg-rust/src/{object_store.rs => object_store/mod.rs} | 2 ++ iceberg-rust/src/{ => object_store}/store.rs | 0 iceberg-sql-catalog/src/lib.rs | 2 +- 7 files changed, 6 insertions(+), 5 deletions(-) rename iceberg-rust/src/{object_store.rs => object_store/mod.rs} (99%) rename iceberg-rust/src/{ => object_store}/store.rs (100%) diff --git a/datafusion_iceberg/src/catalog/mirror.rs b/datafusion_iceberg/src/catalog/mirror.rs index 10715d4d..ce05fd3f 100644 --- a/datafusion_iceberg/src/catalog/mirror.rs +++ b/datafusion_iceberg/src/catalog/mirror.rs @@ -1,7 +1,7 @@ use dashmap::DashMap; use datafusion::{datasource::TableProvider, error::DataFusionError}; use futures::{executor::LocalPool, task::LocalSpawnExt}; -use iceberg_rust::store::IcebergStore; +use iceberg_rust::object_store::store::IcebergStore; use std::{collections::HashSet, sync::Arc}; use iceberg_rust::spec::{tabular::TabularMetadata, view_metadata::REF_PREFIX}; diff --git a/iceberg-file-catalog/src/lib.rs b/iceberg-file-catalog/src/lib.rs index 8e64456c..c7aefb27 100644 --- a/iceberg-file-catalog/src/lib.rs +++ b/iceberg-file-catalog/src/lib.rs @@ -20,6 +20,7 @@ use iceberg_rust::{ }, error::Error as IcebergError, materialized_view::MaterializedView, + object_store::store::IcebergStore, object_store::{Bucket, ObjectStoreBuilder}, spec::{ materialized_view_metadata::MaterializedViewMetadata, @@ -28,7 +29,6 @@ use iceberg_rust::{ util::strip_prefix, view_metadata::ViewMetadata, }, - store::IcebergStore, table::Table, view::View, }; diff --git a/iceberg-glue-catalog/src/lib.rs b/iceberg-glue-catalog/src/lib.rs index 2e5ed791..6ee8a614 100644 --- a/iceberg-glue-catalog/src/lib.rs +++ b/iceberg-glue-catalog/src/lib.rs @@ -23,6 +23,7 @@ use iceberg_rust::{ }, error::Error as IcebergError, materialized_view::MaterializedView, + object_store::store::IcebergStore, object_store::{Bucket, ObjectStoreBuilder}, spec::{ self, @@ -32,7 +33,6 @@ use iceberg_rust::{ util::strip_prefix, view_metadata::ViewMetadata, }, - store::IcebergStore, table::Table, view::View, }; diff --git a/iceberg-rust/src/lib.rs b/iceberg-rust/src/lib.rs index 42da47a7..8c031f30 100644 --- a/iceberg-rust/src/lib.rs +++ b/iceberg-rust/src/lib.rs @@ -10,7 +10,6 @@ pub mod materialized_view; pub mod object_store; pub mod spec; pub mod sql; -pub mod store; pub mod table; pub(crate) mod util; pub mod view; diff --git a/iceberg-rust/src/object_store.rs b/iceberg-rust/src/object_store/mod.rs similarity index 99% rename from iceberg-rust/src/object_store.rs rename to iceberg-rust/src/object_store/mod.rs index cb9684d4..94d9ae49 100644 --- a/iceberg-rust/src/object_store.rs +++ b/iceberg-rust/src/object_store/mod.rs @@ -19,6 +19,8 @@ use object_store::{ use crate::error::Error; +pub mod store; + /// Type for buckets for different cloud providers #[derive(Debug)] pub enum Bucket<'s> { diff --git a/iceberg-rust/src/store.rs b/iceberg-rust/src/object_store/store.rs similarity index 100% rename from iceberg-rust/src/store.rs rename to iceberg-rust/src/object_store/store.rs diff --git a/iceberg-sql-catalog/src/lib.rs b/iceberg-sql-catalog/src/lib.rs index 509755ac..ad9c581a 100644 --- a/iceberg-sql-catalog/src/lib.rs +++ b/iceberg-sql-catalog/src/lib.rs @@ -18,6 +18,7 @@ use iceberg_rust::{ }, error::Error as IcebergError, materialized_view::MaterializedView, + object_store::store::IcebergStore, object_store::{Bucket, ObjectStoreBuilder}, spec::{ materialized_view_metadata::MaterializedViewMetadata, @@ -26,7 +27,6 @@ use iceberg_rust::{ util::strip_prefix, view_metadata::ViewMetadata, }, - store::IcebergStore, table::Table, view::View, }; From b5433c185c86c88ba24e88960d25e668e0bceab2 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Sun, 8 Dec 2024 09:12:31 +0100 Subject: [PATCH 3/4] fix s3tables --- iceberg-s3tables-catalog/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iceberg-s3tables-catalog/src/lib.rs b/iceberg-s3tables-catalog/src/lib.rs index 12818779..e7285ee5 100644 --- a/iceberg-s3tables-catalog/src/lib.rs +++ b/iceberg-s3tables-catalog/src/lib.rs @@ -9,7 +9,6 @@ use aws_config::SdkConfig; use aws_sdk_s3tables::{types::OpenTableFormat, Client}; use iceberg_rust::{ catalog::{ - bucket::{Bucket, ObjectStoreBuilder}, commit::{ apply_table_updates, apply_view_updates, check_table_requirements, check_view_requirements, CommitTable, CommitView, TableRequirement, @@ -22,6 +21,8 @@ use iceberg_rust::{ }, error::Error as IcebergError, materialized_view::MaterializedView, + object_store::store::IcebergStore, + object_store::{Bucket, ObjectStoreBuilder}, spec::{ self, materialized_view_metadata::MaterializedViewMetadata, @@ -30,7 +31,6 @@ use iceberg_rust::{ util::strip_prefix, view_metadata::ViewMetadata, }, - store::IcebergStore, table::Table, view::View, }; From 1150d6618ac559edc7b033223cc230bc9199247a Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Sun, 8 Dec 2024 09:44:00 +0100 Subject: [PATCH 4/4] remove credential provider --- Cargo.lock | 2 - iceberg-rust/Cargo.toml | 2 - iceberg-rust/src/object_store/mod.rs | 69 ++-------------------------- 3 files changed, 3 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 53c24c17..b42f47d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2981,8 +2981,6 @@ dependencies = [ "apache-avro", "arrow", "async-trait", - "aws-config", - "aws-credential-types", "chrono", "derive-getters", "derive_builder", diff --git a/iceberg-rust/Cargo.toml b/iceberg-rust/Cargo.toml index ed3bdc05..1f371a70 100644 --- a/iceberg-rust/Cargo.toml +++ b/iceberg-rust/Cargo.toml @@ -31,8 +31,6 @@ thiserror = { workspace = true } derive-getters = { workspace = true } iceberg-rust-spec = { path = "../iceberg-rust-spec", version = "0.5.8" } smallvec = { version = "1.13.2", features = ["const_generics"] } -aws-config = "1.5.10" -aws-credential-types = "1.2.1" [dev-dependencies] chrono = { workspace = true } diff --git a/iceberg-rust/src/object_store/mod.rs b/iceberg-rust/src/object_store/mod.rs index 94d9ae49..a3c38c41 100644 --- a/iceberg-rust/src/object_store/mod.rs +++ b/iceberg-rust/src/object_store/mod.rs @@ -2,19 +2,14 @@ Defining the [Bucket] struct for specifying buckets for the ObjectStore. */ -use std::{fmt::Display, ops::Deref, path::Path, str::FromStr, sync::Arc, time::SystemTime}; +use std::{fmt::Display, path::Path, str::FromStr, sync::Arc}; -use async_trait::async_trait; -use aws_config::SdkConfig; -use aws_credential_types::{provider::ProvideCredentials, Credentials}; -use futures::lock::Mutex; -use object_store::Error as ObjectStoreError; use object_store::{ - aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential}, + aws::{AmazonS3Builder, AmazonS3ConfigKey}, gcp::{GoogleCloudStorageBuilder, GoogleConfigKey}, local::LocalFileSystem, memory::InMemory, - CredentialProvider, ObjectStore, + ObjectStore, }; use crate::error::Error; @@ -151,61 +146,3 @@ impl ObjectStoreBuilder { } } } - -/// AWS Sdk credential provider for object_store -#[derive(Debug)] -#[allow(clippy::type_complexity)] -pub struct AwsCredentialProvider { - config: SdkConfig, - cache: Arc, Credentials)>>>, -} - -#[async_trait] -impl CredentialProvider for AwsCredentialProvider { - type Credential = AwsCredential; - - async fn get_credential(&self) -> Result, ObjectStoreError> { - let mut guard = self.cache.lock().await; - - let is_valid = if let Some((Some(time), _)) = guard.deref() { - *time >= SystemTime::now() - } else { - false - }; - - if !is_valid { - let provider = self - .config - .credentials_provider() - .ok_or(ObjectStoreError::NotImplemented)?; - - let credentials = - provider - .provide_credentials() - .await - .map_err(|err| ObjectStoreError::Generic { - store: "s3", - source: Box::new(err), - })?; - *guard = Some((credentials.expiry(), credentials)); - }; - - let credentials = &guard.as_ref().unwrap().1; - - Ok(Arc::new(AwsCredential { - key_id: credentials.access_key_id().to_string(), - secret_key: credentials.secret_access_key().to_string(), - token: credentials.session_token().map(ToString::to_string), - })) - } -} - -impl AwsCredentialProvider { - /// Create new credential provider - pub fn new(config: &SdkConfig) -> Self { - Self { - config: config.clone(), - cache: Arc::new(Mutex::new(None)), - } - } -}