Skip to content

Commit

Permalink
fix: check lowercase config keys with/without aws prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco authored and rtyler committed Sep 22, 2024
1 parent 68f6d74 commit ccc35eb
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 24 deletions.
13 changes: 10 additions & 3 deletions crates/aws/src/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use deltalake_core::storage::StorageOptions;
use deltalake_core::DeltaResult;
use tracing::log::*;

use crate::constants;
use crate::constants::{self, AWS_ENDPOINT_URL};

/// An [object_store::CredentialProvider] which handles converting a populated [SdkConfig]
/// into a necessary [AwsCredential] type for configuring [object_store::aws::AmazonS3]
Expand Down Expand Up @@ -167,12 +167,19 @@ pub async fn resolve_credentials(options: StorageOptions) -> DeltaResult<SdkConf
.build()
.await,
)
.or_else("StorageOptions", OptionsCredentialsProvider { options })
.or_else(
"StorageOptions",
OptionsCredentialsProvider {
options: options.clone(),
},
)
.or_else("DefaultChain", default_provider)
}
None => CredentialsProviderChain::first_try(
"StorageOptions",
OptionsCredentialsProvider { options },
OptionsCredentialsProvider {
options: options.clone(),
},
)
.or_else("DefaultChain", default_provider),
};
Expand Down
24 changes: 16 additions & 8 deletions crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,26 @@ impl LogStoreFactory for S3LogStoreFactory {
let store = url_prefix_handler(store, Path::parse(location.path())?);

// With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent
if options
.0
.contains_key(AmazonS3ConfigKey::ConditionalPut.as_ref())
{
if options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
vec![
AmazonS3ConfigKey::ConditionalPut.as_ref(),
"conditional_put",
]
.contains(&key.as_str())
}) {
debug!("S3LogStoreFactory has been asked to create a default LogStore where the underlying store has Conditonal Put enabled - no locking provider required");
return Ok(default_logstore(store, location, options));
}

if options
.0
.contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref())
{
if options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
vec![
AmazonS3ConfigKey::CopyIfNotExists.as_ref(),
"copy_if_not_exists",
]
.contains(&key.as_str())
}) {
debug!("S3LogStoreFactory has been asked to create a LogStore where the underlying store has copy-if-not-exists enabled - no locking provider required");
return Ok(logstore::default_s3_logstore(store, location, options));
}
Expand Down
2 changes: 1 addition & 1 deletion crates/aws/src/logstore/dynamodb_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl S3DynamoDbLogStore {
object_store: ObjectStoreRef,
) -> DeltaResult<Self> {
let lock_client = DynamoDbLockClient::try_new(
&s3_options.sdk_config,
&s3_options.sdk_config.clone().unwrap(),
s3_options
.extra_opts
.get(constants::LOCK_TABLE_KEY_NAME)
Expand Down
39 changes: 27 additions & 12 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use deltalake_core::storage::{
use deltalake_core::{DeltaResult, DeltaTableError, ObjectStoreError, Path};
use futures::stream::BoxStream;
use futures::Future;
use object_store::aws::S3CopyIfNotExists;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Range;
Expand Down Expand Up @@ -72,11 +73,14 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
storage_options: &StorageOptions,
) -> DeltaResult<(ObjectStoreRef, Path)> {
let options = self.with_env_s3(storage_options);
let sdk_config = execute_sdk_future(crate::credentials::resolve_credentials(
storage_options.clone(),
))??;

let os_credentials = Arc::new(crate::credentials::AWSForObjectStore::new(sdk_config));
let sdk_config = if options.0.contains_key(constants::AWS_ENDPOINT_URL) {
None
} else {
Some(execute_sdk_future(
crate::credentials::resolve_credentials(storage_options.clone()),
)??)
};

let mut builder = AmazonS3Builder::new().with_url(url.to_string());

Expand All @@ -91,7 +95,14 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
source: Box::new(e),
})?;
let prefix = Path::parse(path)?;
let inner = builder.with_credentials(os_credentials).build()?;
let inner = if let Some(sdk_config) = sdk_config {
builder.with_credentials(Arc::new(crate::credentials::AWSForObjectStore::new(
sdk_config,
)))
} else {
builder
}
.build()?;

let store = aws_storage_handler(limit_store_handler(inner, &options), &options)?;
debug!("Initialized the object store: {store:?}");
Expand Down Expand Up @@ -139,7 +150,7 @@ pub struct S3StorageOptions {
pub s3_get_internal_server_error_retries: usize,
pub allow_unsafe_rename: bool,
pub extra_opts: HashMap<String, String>,
pub sdk_config: SdkConfig,
pub sdk_config: Option<SdkConfig>,
}

impl Eq for S3StorageOptions {}
Expand All @@ -154,8 +165,6 @@ impl PartialEq for S3StorageOptions {
== other.s3_get_internal_server_error_retries
&& self.allow_unsafe_rename == other.allow_unsafe_rename
&& self.extra_opts == other.extra_opts
&& self.sdk_config.endpoint_url() == other.sdk_config.endpoint_url()
&& self.sdk_config.region() == other.sdk_config.region()
}
}

Expand Down Expand Up @@ -198,8 +207,14 @@ impl S3StorageOptions {
.unwrap_or(false);

let storage_options = StorageOptions(options.clone());
let sdk_config =
execute_sdk_future(crate::credentials::resolve_credentials(storage_options))??;

let sdk_config = if storage_options.0.contains_key(constants::AWS_ENDPOINT_URL) {
None
} else {
Some(execute_sdk_future(
crate::credentials::resolve_credentials(storage_options.clone()),
)??)
};

Ok(Self {
virtual_hosted_style_request,
Expand All @@ -216,12 +231,12 @@ impl S3StorageOptions {

/// Return the configured endpoint URL for S3 operations
pub fn endpoint_url(&self) -> Option<&str> {
self.sdk_config.endpoint_url()
self.sdk_config.as_ref().map(|v| v.endpoint_url()).flatten()
}

/// Return the configured region used for S3 operations
pub fn region(&self) -> Option<&Region> {
self.sdk_config.region()
self.sdk_config.as_ref().map(|v| v.region()).flatten()
}

fn u64_or_default(map: &HashMap<String, String>, key: &str, default: u64) -> u64 {
Expand Down

0 comments on commit ccc35eb

Please sign in to comment.