Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: check lowercase config keys with/without aws prefix in s3logstorefactory #2896

Merged
merged 2 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/aws/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ pub const AWS_EC2_METADATA_DISABLED: &str = "AWS_EC2_METADATA_DISABLED";
/// Defaults to 100
pub const AWS_EC2_METADATA_TIMEOUT: &str = "AWS_EC2_METADATA_TIMEOUT";

/// Force the delta-rs to attempt to load AWS credentials
pub const AWS_FORCE_CREDENTIAL_LOAD: &str = "AWS_FORCE_CREDENTIAL_LOAD";

/// The list of option keys owned by the S3 module.
/// Option keys not contained in this list will be added to the `extra_opts`
/// field of [crate::storage::s3::S3StorageOptions].
Expand Down
13 changes: 10 additions & 3 deletions crates/aws/src/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use deltalake_core::storage::StorageOptions;
use deltalake_core::DeltaResult;
use tracing::log::*;

use crate::constants;
use crate::constants::{self, AWS_ENDPOINT_URL};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prolly want to revert this and use consistent references here


/// An [object_store::CredentialProvider] which handles converting a populated [SdkConfig]
/// into a necessary [AwsCredential] type for configuring [object_store::aws::AmazonS3]
Expand Down Expand Up @@ -167,12 +167,19 @@ pub async fn resolve_credentials(options: StorageOptions) -> DeltaResult<SdkConf
.build()
.await,
)
.or_else("StorageOptions", OptionsCredentialsProvider { options })
.or_else(
"StorageOptions",
OptionsCredentialsProvider {
options: options.clone(),
},
)
.or_else("DefaultChain", default_provider)
}
None => CredentialsProviderChain::first_try(
"StorageOptions",
OptionsCredentialsProvider { options },
OptionsCredentialsProvider {
options: options.clone(),
},
)
.or_else("DefaultChain", default_provider),
};
Expand Down
26 changes: 17 additions & 9 deletions crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,26 @@ impl LogStoreFactory for S3LogStoreFactory {
let store = url_prefix_handler(store, Path::parse(location.path())?);

// With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent
if options
.0
.contains_key(AmazonS3ConfigKey::ConditionalPut.as_ref())
{
if options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
vec![
AmazonS3ConfigKey::ConditionalPut.as_ref(),
"conditional_put",
]
.contains(&key.as_str())
}) {
debug!("S3LogStoreFactory has been asked to create a default LogStore where the underlying store has Conditonal Put enabled - no locking provider required");
return Ok(default_logstore(store, location, options));
}

if options
.0
.contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref())
{
if options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
vec![
AmazonS3ConfigKey::CopyIfNotExists.as_ref(),
"copy_if_not_exists",
]
.contains(&key.as_str())
}) {
debug!("S3LogStoreFactory has been asked to create a LogStore where the underlying store has copy-if-not-exists enabled - no locking provider required");
return Ok(logstore::default_s3_logstore(store, location, options));
}
Expand Down Expand Up @@ -717,7 +725,7 @@ mod tests {
let factory = S3LogStoreFactory::default();
let store = InMemory::new();
let url = Url::parse("s3://test-bucket").unwrap();
std::env::remove_var(storage::s3_constants::AWS_S3_LOCKING_PROVIDER);
std::env::remove_var(crate::constants::AWS_S3_LOCKING_PROVIDER);
let logstore = factory
.with_options(Arc::new(store), &url, &StorageOptions::from(HashMap::new()))
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion crates/aws/src/logstore/dynamodb_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl S3DynamoDbLogStore {
object_store: ObjectStoreRef,
) -> DeltaResult<Self> {
let lock_client = DynamoDbLockClient::try_new(
&s3_options.sdk_config,
&s3_options.sdk_config.clone().unwrap(),
s3_options
.extra_opts
.get(constants::LOCK_TABLE_KEY_NAME)
Expand Down
107 changes: 83 additions & 24 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use deltalake_core::storage::{
use deltalake_core::{DeltaResult, DeltaTableError, ObjectStoreError, Path};
use futures::stream::BoxStream;
use futures::Future;
use object_store::aws::S3CopyIfNotExists;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Range;
Expand Down Expand Up @@ -72,12 +73,8 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
storage_options: &StorageOptions,
) -> DeltaResult<(ObjectStoreRef, Path)> {
let options = self.with_env_s3(storage_options);
let sdk_config = execute_sdk_future(crate::credentials::resolve_credentials(
storage_options.clone(),
))??;

let os_credentials = Arc::new(crate::credentials::AWSForObjectStore::new(sdk_config));

// All S3-likes should start their builder the same way
let mut builder = AmazonS3Builder::new().with_url(url.to_string());

for (key, value) in options.0.iter() {
Expand All @@ -91,7 +88,18 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
source: Box::new(e),
})?;
let prefix = Path::parse(path)?;
let inner = builder.with_credentials(os_credentials).build()?;

if is_aws(storage_options) {
debug!("Detected AWS S3, resolving credentials");
let sdk_config = execute_sdk_future(crate::credentials::resolve_credentials(
storage_options.clone(),
))??;
builder = builder.with_credentials(Arc::new(
crate::credentials::AWSForObjectStore::new(sdk_config),
));
}

let inner = builder.build()?;

let store = aws_storage_handler(limit_store_handler(inner, &options), &options)?;
debug!("Initialized the object store: {store:?}");
Expand Down Expand Up @@ -125,6 +133,26 @@ fn aws_storage_handler(
}
}

// Determine whether this crate is being configured for use with native AWS S3 or an S3-alike
//
// This function will rteturn true in the default case since it's most likely that the absence of
// options will mean default/S3 configuration
fn is_aws(options: &StorageOptions) -> bool {
if options
.0
.contains_key(crate::constants::AWS_FORCE_CREDENTIAL_LOAD)
{
return true;
}
if options
.0
.contains_key(crate::constants::AWS_S3_LOCKING_PROVIDER)
{
return true;
}
!options.0.contains_key(crate::constants::AWS_ENDPOINT_URL)
}

/// Options used to configure the [S3StorageBackend].
///
/// Available options are described in [s3_constants].
Expand All @@ -139,7 +167,7 @@ pub struct S3StorageOptions {
pub s3_get_internal_server_error_retries: usize,
pub allow_unsafe_rename: bool,
pub extra_opts: HashMap<String, String>,
pub sdk_config: SdkConfig,
pub sdk_config: Option<SdkConfig>,
}

impl Eq for S3StorageOptions {}
Expand All @@ -154,8 +182,6 @@ impl PartialEq for S3StorageOptions {
== other.s3_get_internal_server_error_retries
&& self.allow_unsafe_rename == other.allow_unsafe_rename
&& self.extra_opts == other.extra_opts
&& self.sdk_config.endpoint_url() == other.sdk_config.endpoint_url()
&& self.sdk_config.region() == other.sdk_config.region()
}
}

Expand Down Expand Up @@ -198,8 +224,16 @@ impl S3StorageOptions {
.unwrap_or(false);

let storage_options = StorageOptions(options.clone());
let sdk_config =
execute_sdk_future(crate::credentials::resolve_credentials(storage_options))??;

let sdk_config = match is_aws(&storage_options) {
false => None,
true => {
debug!("Detected AWS S3, resolving credentials");
Some(execute_sdk_future(
crate::credentials::resolve_credentials(storage_options.clone()),
)??)
}
};

Ok(Self {
virtual_hosted_style_request,
Expand All @@ -216,12 +250,12 @@ impl S3StorageOptions {

/// Return the configured endpoint URL for S3 operations
pub fn endpoint_url(&self) -> Option<&str> {
self.sdk_config.endpoint_url()
self.sdk_config.as_ref().map(|v| v.endpoint_url()).flatten()
}

/// Return the configured region used for S3 operations
pub fn region(&self) -> Option<&Region> {
self.sdk_config.region()
self.sdk_config.as_ref().map(|v| v.region()).flatten()
}

fn u64_or_default(map: &HashMap<String, String>, key: &str, default: u64) -> u64 {
Expand Down Expand Up @@ -513,10 +547,12 @@ mod tests {
let options = S3StorageOptions::try_default().unwrap();
assert_eq!(
S3StorageOptions {
sdk_config: SdkConfig::builder()
.endpoint_url("http://localhost".to_string())
.region(Region::from_static("us-west-1"))
.build(),
sdk_config: Some(
SdkConfig::builder()
.endpoint_url("http://localhost".to_string())
.region(Region::from_static("us-west-1"))
.build()
),
virtual_hosted_style_request: false,
locking_provider: Some("dynamodb".to_string()),
dynamodb_endpoint: None,
Expand Down Expand Up @@ -545,9 +581,11 @@ mod tests {
.unwrap();

let mut expected = S3StorageOptions::try_default().unwrap();
expected.sdk_config = SdkConfig::builder()
.region(Region::from_static("eu-west-1"))
.build();
expected.sdk_config = Some(
SdkConfig::builder()
.region(Region::from_static("eu-west-1"))
.build(),
);
assert_eq!(expected, options);
});
}
Expand Down Expand Up @@ -655,10 +693,12 @@ mod tests {

assert_eq!(
S3StorageOptions {
sdk_config: SdkConfig::builder()
.endpoint_url("http://localhost".to_string())
.region(Region::from_static("us-west-2"))
.build(),
sdk_config: Some(
SdkConfig::builder()
.endpoint_url("http://localhost".to_string())
.region(Region::from_static("us-west-2"))
.build()
),
virtual_hosted_style_request: false,
locking_provider: Some("dynamodb".to_string()),
dynamodb_endpoint: Some("http://localhost:dynamodb".to_string()),
Expand Down Expand Up @@ -752,4 +792,23 @@ mod tests {
}
});
}

#[test]
fn test_is_aws() {
let options = StorageOptions::default();
assert!(is_aws(&options));

let minio: HashMap<String, String> = hashmap! {
crate::constants::AWS_ENDPOINT_URL.to_string() => "http://minio:8080".to_string(),
};
let options = StorageOptions::from(minio);
assert!(!is_aws(&options));

let localstack: HashMap<String, String> = hashmap! {
crate::constants::AWS_FORCE_CREDENTIAL_LOAD.to_string() => "true".to_string(),
crate::constants::AWS_ENDPOINT_URL.to_string() => "http://minio:8080".to_string(),
};
let options = StorageOptions::from(localstack);
assert!(is_aws(&options));
}
}
13 changes: 9 additions & 4 deletions crates/aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use lazy_static::lazy_static;
use object_store::path::Path;
use serde_json::Value;
use serial_test::serial;
use tracing::log::*;

use maplit::hashmap;
use object_store::{PutOptions, PutPayload};
Expand All @@ -43,7 +44,7 @@ lazy_static! {
fn make_client() -> TestResult<DynamoDbLockClient> {
let options: S3StorageOptions = S3StorageOptions::try_default().unwrap();
Ok(DynamoDbLockClient::try_new(
&options.sdk_config,
&options.sdk_config.unwrap(),
None,
None,
None,
Expand Down Expand Up @@ -74,7 +75,7 @@ fn client_configs_via_env_variables() -> TestResult<()> {
billing_mode: BillingMode::PayPerRequest,
lock_table_name: "some_table".to_owned(),
max_elapsed_request_time: Duration::from_secs(64),
sdk_config: options.sdk_config,
sdk_config: options.sdk_config.unwrap(),
},
*config,
);
Expand All @@ -87,6 +88,7 @@ fn client_configs_via_env_variables() -> TestResult<()> {
#[tokio::test]
#[serial]
async fn test_create_s3_table() -> TestResult<()> {
let _ = pretty_env_logger::try_init();
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let _client = make_client()?;
let table_name = format!("{}_{}", "create_test", uuid::Uuid::new_v4());
Expand All @@ -98,8 +100,10 @@ async fn test_create_s3_table() -> TestResult<()> {
true,
)]);
let storage_options: HashMap<String, String> = hashmap! {
"AWS_ALLOW_HTTP".into() => "true".into(),
"AWS_ENDPOINT_URL".into() => "http://localhost:4566".into(),
deltalake_aws::constants::AWS_ALLOW_HTTP.into() => "true".into(),
// Despite not being in AWS, we should force credential resolution
deltalake_aws::constants::AWS_FORCE_CREDENTIAL_LOAD.into() => "true".into(),
deltalake_aws::constants::AWS_ENDPOINT_URL.into() => "http://localhost:4566".into(),
};
let log_store = logstore_for(Url::parse(&table_uri)?, storage_options, None)?;

Expand All @@ -113,6 +117,7 @@ async fn test_create_s3_table() -> TestResult<()> {
)
.await?;

debug!("creating a CreateBuilder");
let _created = CreateBuilder::new()
.with_log_store(log_store)
.with_partition_columns(vec!["id"])
Expand Down
Loading