diff --git a/crates/aws/src/credentials.rs b/crates/aws/src/credentials.rs index 8052026f5f..71441bf05e 100644 --- a/crates/aws/src/credentials.rs +++ b/crates/aws/src/credentials.rs @@ -19,7 +19,7 @@ use deltalake_core::storage::StorageOptions; use deltalake_core::DeltaResult; use tracing::log::*; -use crate::constants; +use crate::constants::{self, AWS_ENDPOINT_URL}; /// An [object_store::CredentialProvider] which handles converting a populated [SdkConfig] /// into a necessary [AwsCredential] type for configuring [object_store::aws::AmazonS3] @@ -167,12 +167,19 @@ pub async fn resolve_credentials(options: StorageOptions) -> DeltaResult CredentialsProviderChain::first_try( "StorageOptions", - OptionsCredentialsProvider { options }, + OptionsCredentialsProvider { + options: options.clone(), + }, ) .or_else("DefaultChain", default_provider), }; diff --git a/crates/aws/src/lib.rs b/crates/aws/src/lib.rs index 9834fc8b54..ffaae15333 100644 --- a/crates/aws/src/lib.rs +++ b/crates/aws/src/lib.rs @@ -55,18 +55,26 @@ impl LogStoreFactory for S3LogStoreFactory { let store = url_prefix_handler(store, Path::parse(location.path())?); // With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent - if options - .0 - .contains_key(AmazonS3ConfigKey::ConditionalPut.as_ref()) - { + if options.0.keys().any(|key| { + let key = key.to_ascii_lowercase(); + vec![ + AmazonS3ConfigKey::ConditionalPut.as_ref(), + "conditional_put", + ] + .contains(&key.as_str()) + }) { debug!("S3LogStoreFactory has been asked to create a default LogStore where the underlying store has Conditonal Put enabled - no locking provider required"); return Ok(default_logstore(store, location, options)); } - if options - .0 - .contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref()) - { + if options.0.keys().any(|key| { + let key = key.to_ascii_lowercase(); + vec![ + AmazonS3ConfigKey::CopyIfNotExists.as_ref(), + "copy_if_not_exists", + ] + .contains(&key.as_str()) + }) { debug!("S3LogStoreFactory has been asked to create a LogStore where the underlying store has copy-if-not-exists enabled - no locking provider required"); return Ok(logstore::default_s3_logstore(store, location, options)); } diff --git a/crates/aws/src/logstore/dynamodb_logstore.rs b/crates/aws/src/logstore/dynamodb_logstore.rs index 5307040538..202df1709e 100644 --- a/crates/aws/src/logstore/dynamodb_logstore.rs +++ b/crates/aws/src/logstore/dynamodb_logstore.rs @@ -45,7 +45,7 @@ impl S3DynamoDbLogStore { object_store: ObjectStoreRef, ) -> DeltaResult { let lock_client = DynamoDbLockClient::try_new( - &s3_options.sdk_config, + &s3_options.sdk_config.clone().unwrap(), s3_options .extra_opts .get(constants::LOCK_TABLE_KEY_NAME) diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index da8528a360..64c34d5b3a 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -14,6 +14,7 @@ use deltalake_core::storage::{ use deltalake_core::{DeltaResult, DeltaTableError, ObjectStoreError, Path}; use futures::stream::BoxStream; use futures::Future; +use object_store::aws::S3CopyIfNotExists; use std::collections::HashMap; use std::fmt::Debug; use std::ops::Range; @@ -72,11 +73,14 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { storage_options: &StorageOptions, ) -> DeltaResult<(ObjectStoreRef, Path)> { let options = self.with_env_s3(storage_options); - let sdk_config = execute_sdk_future(crate::credentials::resolve_credentials( - storage_options.clone(), - ))??; - let os_credentials = Arc::new(crate::credentials::AWSForObjectStore::new(sdk_config)); + let sdk_config = if options.0.contains_key(constants::AWS_ENDPOINT_URL) { + None + } else { + Some(execute_sdk_future( + crate::credentials::resolve_credentials(storage_options.clone()), + )??) + }; let mut builder = AmazonS3Builder::new().with_url(url.to_string()); @@ -91,7 +95,14 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { source: Box::new(e), })?; let prefix = Path::parse(path)?; - let inner = builder.with_credentials(os_credentials).build()?; + let inner = if let Some(sdk_config) = sdk_config { + builder.with_credentials(Arc::new(crate::credentials::AWSForObjectStore::new( + sdk_config, + ))) + } else { + builder + } + .build()?; let store = aws_storage_handler(limit_store_handler(inner, &options), &options)?; debug!("Initialized the object store: {store:?}"); @@ -139,7 +150,7 @@ pub struct S3StorageOptions { pub s3_get_internal_server_error_retries: usize, pub allow_unsafe_rename: bool, pub extra_opts: HashMap, - pub sdk_config: SdkConfig, + pub sdk_config: Option, } impl Eq for S3StorageOptions {} @@ -154,8 +165,6 @@ impl PartialEq for S3StorageOptions { == other.s3_get_internal_server_error_retries && self.allow_unsafe_rename == other.allow_unsafe_rename && self.extra_opts == other.extra_opts - && self.sdk_config.endpoint_url() == other.sdk_config.endpoint_url() - && self.sdk_config.region() == other.sdk_config.region() } } @@ -198,8 +207,14 @@ impl S3StorageOptions { .unwrap_or(false); let storage_options = StorageOptions(options.clone()); - let sdk_config = - execute_sdk_future(crate::credentials::resolve_credentials(storage_options))??; + + let sdk_config = if storage_options.0.contains_key(constants::AWS_ENDPOINT_URL) { + None + } else { + Some(execute_sdk_future( + crate::credentials::resolve_credentials(storage_options.clone()), + )??) + }; Ok(Self { virtual_hosted_style_request, @@ -216,12 +231,12 @@ impl S3StorageOptions { /// Return the configured endpoint URL for S3 operations pub fn endpoint_url(&self) -> Option<&str> { - self.sdk_config.endpoint_url() + self.sdk_config.as_ref().map(|v| v.endpoint_url()).flatten() } /// Return the configured region used for S3 operations pub fn region(&self) -> Option<&Region> { - self.sdk_config.region() + self.sdk_config.as_ref().map(|v| v.region()).flatten() } fn u64_or_default(map: &HashMap, key: &str, default: u64) -> u64 {