Skip to content

Commit

Permalink
feat: disable AWS configuration resolution for non-AWS S3 storage sce…
Browse files Browse the repository at this point in the history
…narios

This should reduce bugs, reduce warnings, and improve performance when
somebody is using this crate against an S3-alike service rather than AWS
S3 directly.

Currently I cannot think of a valid scenarijo where one would use
AWS_ENDPOINT_URL against actual S3 buckets, so using that as the
heuristic
  • Loading branch information
rtyler committed Sep 22, 2024
1 parent ccc35eb commit 7055b09
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 37 deletions.
3 changes: 3 additions & 0 deletions crates/aws/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ pub const AWS_EC2_METADATA_DISABLED: &str = "AWS_EC2_METADATA_DISABLED";
/// Defaults to 100
pub const AWS_EC2_METADATA_TIMEOUT: &str = "AWS_EC2_METADATA_TIMEOUT";

/// Force the delta-rs to attempt to load AWS credentials
pub const AWS_FORCE_CREDENTIAL_LOAD: &str = "AWS_FORCE_CREDENTIAL_LOAD";

/// The list of option keys owned by the S3 module.
/// Option keys not contained in this list will be added to the `extra_opts`
/// field of [crate::storage::s3::S3StorageOptions].
Expand Down
2 changes: 1 addition & 1 deletion crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ mod tests {
let factory = S3LogStoreFactory::default();
let store = InMemory::new();
let url = Url::parse("s3://test-bucket").unwrap();
std::env::remove_var(storage::s3_constants::AWS_S3_LOCKING_PROVIDER);
std::env::remove_var(crate::constants::AWS_S3_LOCKING_PROVIDER);
let logstore = factory
.with_options(Arc::new(store), &url, &StorageOptions::from(HashMap::new()))
.unwrap();
Expand Down
108 changes: 76 additions & 32 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,7 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
) -> DeltaResult<(ObjectStoreRef, Path)> {
let options = self.with_env_s3(storage_options);

let sdk_config = if options.0.contains_key(constants::AWS_ENDPOINT_URL) {
None
} else {
Some(execute_sdk_future(
crate::credentials::resolve_credentials(storage_options.clone()),
)??)
};

// All S3-likes should start their builder the same way
let mut builder = AmazonS3Builder::new().with_url(url.to_string());

for (key, value) in options.0.iter() {
Expand All @@ -95,14 +88,18 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
source: Box::new(e),
})?;
let prefix = Path::parse(path)?;
let inner = if let Some(sdk_config) = sdk_config {
builder.with_credentials(Arc::new(crate::credentials::AWSForObjectStore::new(
sdk_config,
)))
} else {
builder

if is_aws(storage_options) {
debug!("Detected AWS S3, resolving credentials");
let sdk_config = execute_sdk_future(crate::credentials::resolve_credentials(
storage_options.clone(),
))??;
builder = builder.with_credentials(Arc::new(
crate::credentials::AWSForObjectStore::new(sdk_config),
));
}
.build()?;

let inner = builder.build()?;

let store = aws_storage_handler(limit_store_handler(inner, &options), &options)?;
debug!("Initialized the object store: {store:?}");
Expand Down Expand Up @@ -136,6 +133,26 @@ fn aws_storage_handler(
}
}

// Determine whether this crate is being configured for use with native AWS S3 or an S3-alike
//
// This function will rteturn true in the default case since it's most likely that the absence of
// options will mean default/S3 configuration
fn is_aws(options: &StorageOptions) -> bool {
if options
.0
.contains_key(crate::constants::AWS_FORCE_CREDENTIAL_LOAD)
{
return true;
}
if options
.0
.contains_key(crate::constants::AWS_S3_LOCKING_PROVIDER)
{
return true;
}
!options.0.contains_key(crate::constants::AWS_ENDPOINT_URL)
}

/// Options used to configure the [S3StorageBackend].
///
/// Available options are described in [s3_constants].
Expand Down Expand Up @@ -208,12 +225,14 @@ impl S3StorageOptions {

let storage_options = StorageOptions(options.clone());

let sdk_config = if storage_options.0.contains_key(constants::AWS_ENDPOINT_URL) {
None
} else {
Some(execute_sdk_future(
crate::credentials::resolve_credentials(storage_options.clone()),
)??)
let sdk_config = match is_aws(&storage_options) {
false => None,
true => {
debug!("Detected AWS S3, resolving credentials");
Some(execute_sdk_future(
crate::credentials::resolve_credentials(storage_options.clone()),
)??)
}
};

Ok(Self {
Expand Down Expand Up @@ -528,10 +547,12 @@ mod tests {
let options = S3StorageOptions::try_default().unwrap();
assert_eq!(
S3StorageOptions {
sdk_config: SdkConfig::builder()
.endpoint_url("http://localhost".to_string())
.region(Region::from_static("us-west-1"))
.build(),
sdk_config: Some(
SdkConfig::builder()
.endpoint_url("http://localhost".to_string())
.region(Region::from_static("us-west-1"))
.build()
),
virtual_hosted_style_request: false,
locking_provider: Some("dynamodb".to_string()),
dynamodb_endpoint: None,
Expand Down Expand Up @@ -560,9 +581,11 @@ mod tests {
.unwrap();

let mut expected = S3StorageOptions::try_default().unwrap();
expected.sdk_config = SdkConfig::builder()
.region(Region::from_static("eu-west-1"))
.build();
expected.sdk_config = Some(
SdkConfig::builder()
.region(Region::from_static("eu-west-1"))
.build(),
);
assert_eq!(expected, options);
});
}
Expand Down Expand Up @@ -670,10 +693,12 @@ mod tests {

assert_eq!(
S3StorageOptions {
sdk_config: SdkConfig::builder()
.endpoint_url("http://localhost".to_string())
.region(Region::from_static("us-west-2"))
.build(),
sdk_config: Some(
SdkConfig::builder()
.endpoint_url("http://localhost".to_string())
.region(Region::from_static("us-west-2"))
.build()
),
virtual_hosted_style_request: false,
locking_provider: Some("dynamodb".to_string()),
dynamodb_endpoint: Some("http://localhost:dynamodb".to_string()),
Expand Down Expand Up @@ -767,4 +792,23 @@ mod tests {
}
});
}

#[test]
fn test_is_aws() {
let options = StorageOptions::default();
assert!(is_aws(&options));

let minio: HashMap<String, String> = hashmap! {
crate::constants::AWS_ENDPOINT_URL.to_string() => "http://minio:8080".to_string(),
};
let options = StorageOptions::from(minio);
assert!(!is_aws(&options));

let localstack: HashMap<String, String> = hashmap! {
crate::constants::AWS_FORCE_CREDENTIAL_LOAD.to_string() => "true".to_string(),
crate::constants::AWS_ENDPOINT_URL.to_string() => "http://minio:8080".to_string(),
};
let options = StorageOptions::from(localstack);
assert!(is_aws(&options));
}
}
13 changes: 9 additions & 4 deletions crates/aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use lazy_static::lazy_static;
use object_store::path::Path;
use serde_json::Value;
use serial_test::serial;
use tracing::log::*;

use maplit::hashmap;
use object_store::{PutOptions, PutPayload};
Expand All @@ -43,7 +44,7 @@ lazy_static! {
fn make_client() -> TestResult<DynamoDbLockClient> {
let options: S3StorageOptions = S3StorageOptions::try_default().unwrap();
Ok(DynamoDbLockClient::try_new(
&options.sdk_config,
&options.sdk_config.unwrap(),
None,
None,
None,
Expand Down Expand Up @@ -74,7 +75,7 @@ fn client_configs_via_env_variables() -> TestResult<()> {
billing_mode: BillingMode::PayPerRequest,
lock_table_name: "some_table".to_owned(),
max_elapsed_request_time: Duration::from_secs(64),
sdk_config: options.sdk_config,
sdk_config: options.sdk_config.unwrap(),
},
*config,
);
Expand All @@ -87,6 +88,7 @@ fn client_configs_via_env_variables() -> TestResult<()> {
#[tokio::test]
#[serial]
async fn test_create_s3_table() -> TestResult<()> {
let _ = pretty_env_logger::try_init();
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let _client = make_client()?;
let table_name = format!("{}_{}", "create_test", uuid::Uuid::new_v4());
Expand All @@ -98,8 +100,10 @@ async fn test_create_s3_table() -> TestResult<()> {
true,
)]);
let storage_options: HashMap<String, String> = hashmap! {
"AWS_ALLOW_HTTP".into() => "true".into(),
"AWS_ENDPOINT_URL".into() => "http://localhost:4566".into(),
deltalake_aws::constants::AWS_ALLOW_HTTP.into() => "true".into(),
// Despite not being in AWS, we should force credential resolution
deltalake_aws::constants::AWS_FORCE_CREDENTIAL_LOAD.into() => "true".into(),
deltalake_aws::constants::AWS_ENDPOINT_URL.into() => "http://localhost:4566".into(),
};
let log_store = logstore_for(Url::parse(&table_uri)?, storage_options, None)?;

Expand All @@ -113,6 +117,7 @@ async fn test_create_s3_table() -> TestResult<()> {
)
.await?;

debug!("creating a CreateBuilder");
let _created = CreateBuilder::new()
.with_log_store(log_store)
.with_partition_columns(vec!["id"])
Expand Down

0 comments on commit 7055b09

Please sign in to comment.