Skip to content

Commit

Permalink
Merge branch 'main' into docs/minio-docs-edits
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored Sep 23, 2024
2 parents 2674cda + 66441d1 commit 10424f1
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 53 deletions.
3 changes: 3 additions & 0 deletions crates/aws/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ pub const AWS_EC2_METADATA_DISABLED: &str = "AWS_EC2_METADATA_DISABLED";
/// Defaults to 100
pub const AWS_EC2_METADATA_TIMEOUT: &str = "AWS_EC2_METADATA_TIMEOUT";

/// Force the delta-rs to attempt to load AWS credentials
pub const AWS_FORCE_CREDENTIAL_LOAD: &str = "AWS_FORCE_CREDENTIAL_LOAD";

/// The list of option keys owned by the S3 module.
/// Option keys not contained in this list will be added to the `extra_opts`
/// field of [crate::storage::s3::S3StorageOptions].
Expand Down
13 changes: 10 additions & 3 deletions crates/aws/src/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use deltalake_core::storage::StorageOptions;
use deltalake_core::DeltaResult;
use tracing::log::*;

use crate::constants;
use crate::constants::{self, AWS_ENDPOINT_URL};

/// An [object_store::CredentialProvider] which handles converting a populated [SdkConfig]
/// into a necessary [AwsCredential] type for configuring [object_store::aws::AmazonS3]
Expand Down Expand Up @@ -167,12 +167,19 @@ pub async fn resolve_credentials(options: StorageOptions) -> DeltaResult<SdkConf
.build()
.await,
)
.or_else("StorageOptions", OptionsCredentialsProvider { options })
.or_else(
"StorageOptions",
OptionsCredentialsProvider {
options: options.clone(),
},
)
.or_else("DefaultChain", default_provider)
}
None => CredentialsProviderChain::first_try(
"StorageOptions",
OptionsCredentialsProvider { options },
OptionsCredentialsProvider {
options: options.clone(),
},
)
.or_else("DefaultChain", default_provider),
};
Expand Down
26 changes: 17 additions & 9 deletions crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,26 @@ impl LogStoreFactory for S3LogStoreFactory {
let store = url_prefix_handler(store, Path::parse(location.path())?);

// With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent
if options
.0
.contains_key(AmazonS3ConfigKey::ConditionalPut.as_ref())
{
if options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
vec![
AmazonS3ConfigKey::ConditionalPut.as_ref(),
"conditional_put",
]
.contains(&key.as_str())
}) {
debug!("S3LogStoreFactory has been asked to create a default LogStore where the underlying store has Conditonal Put enabled - no locking provider required");
return Ok(default_logstore(store, location, options));
}

if options
.0
.contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref())
{
if options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
vec![
AmazonS3ConfigKey::CopyIfNotExists.as_ref(),
"copy_if_not_exists",
]
.contains(&key.as_str())
}) {
debug!("S3LogStoreFactory has been asked to create a LogStore where the underlying store has copy-if-not-exists enabled - no locking provider required");
return Ok(logstore::default_s3_logstore(store, location, options));
}
Expand Down Expand Up @@ -717,7 +725,7 @@ mod tests {
let factory = S3LogStoreFactory::default();
let store = InMemory::new();
let url = Url::parse("s3://test-bucket").unwrap();
std::env::remove_var(storage::s3_constants::AWS_S3_LOCKING_PROVIDER);
std::env::remove_var(crate::constants::AWS_S3_LOCKING_PROVIDER);
let logstore = factory
.with_options(Arc::new(store), &url, &StorageOptions::from(HashMap::new()))
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion crates/aws/src/logstore/dynamodb_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl S3DynamoDbLogStore {
object_store: ObjectStoreRef,
) -> DeltaResult<Self> {
let lock_client = DynamoDbLockClient::try_new(
&s3_options.sdk_config,
&s3_options.sdk_config.clone().unwrap(),
s3_options
.extra_opts
.get(constants::LOCK_TABLE_KEY_NAME)
Expand Down
107 changes: 83 additions & 24 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use deltalake_core::storage::{
use deltalake_core::{DeltaResult, DeltaTableError, ObjectStoreError, Path};
use futures::stream::BoxStream;
use futures::Future;
use object_store::aws::S3CopyIfNotExists;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Range;
Expand Down Expand Up @@ -72,12 +73,8 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
storage_options: &StorageOptions,
) -> DeltaResult<(ObjectStoreRef, Path)> {
let options = self.with_env_s3(storage_options);
let sdk_config = execute_sdk_future(crate::credentials::resolve_credentials(
storage_options.clone(),
))??;

let os_credentials = Arc::new(crate::credentials::AWSForObjectStore::new(sdk_config));

// All S3-likes should start their builder the same way
let mut builder = AmazonS3Builder::new().with_url(url.to_string());

for (key, value) in options.0.iter() {
Expand All @@ -91,7 +88,18 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
source: Box::new(e),
})?;
let prefix = Path::parse(path)?;
let inner = builder.with_credentials(os_credentials).build()?;

if is_aws(storage_options) {
debug!("Detected AWS S3, resolving credentials");
let sdk_config = execute_sdk_future(crate::credentials::resolve_credentials(
storage_options.clone(),
))??;
builder = builder.with_credentials(Arc::new(
crate::credentials::AWSForObjectStore::new(sdk_config),
));
}

let inner = builder.build()?;

let store = aws_storage_handler(limit_store_handler(inner, &options), &options)?;
debug!("Initialized the object store: {store:?}");
Expand Down Expand Up @@ -125,6 +133,26 @@ fn aws_storage_handler(
}
}

// Determine whether this crate is being configured for use with native AWS S3 or an S3-alike
//
// This function will rteturn true in the default case since it's most likely that the absence of
// options will mean default/S3 configuration
fn is_aws(options: &StorageOptions) -> bool {
if options
.0
.contains_key(crate::constants::AWS_FORCE_CREDENTIAL_LOAD)
{
return true;
}
if options
.0
.contains_key(crate::constants::AWS_S3_LOCKING_PROVIDER)
{
return true;
}
!options.0.contains_key(crate::constants::AWS_ENDPOINT_URL)
}

/// Options used to configure the [S3StorageBackend].
///
/// Available options are described in [s3_constants].
Expand All @@ -139,7 +167,7 @@ pub struct S3StorageOptions {
pub s3_get_internal_server_error_retries: usize,
pub allow_unsafe_rename: bool,
pub extra_opts: HashMap<String, String>,
pub sdk_config: SdkConfig,
pub sdk_config: Option<SdkConfig>,
}

impl Eq for S3StorageOptions {}
Expand All @@ -154,8 +182,6 @@ impl PartialEq for S3StorageOptions {
== other.s3_get_internal_server_error_retries
&& self.allow_unsafe_rename == other.allow_unsafe_rename
&& self.extra_opts == other.extra_opts
&& self.sdk_config.endpoint_url() == other.sdk_config.endpoint_url()
&& self.sdk_config.region() == other.sdk_config.region()
}
}

Expand Down Expand Up @@ -198,8 +224,16 @@ impl S3StorageOptions {
.unwrap_or(false);

let storage_options = StorageOptions(options.clone());
let sdk_config =
execute_sdk_future(crate::credentials::resolve_credentials(storage_options))??;

let sdk_config = match is_aws(&storage_options) {
false => None,
true => {
debug!("Detected AWS S3, resolving credentials");
Some(execute_sdk_future(
crate::credentials::resolve_credentials(storage_options.clone()),
)??)
}
};

Ok(Self {
virtual_hosted_style_request,
Expand All @@ -216,12 +250,12 @@ impl S3StorageOptions {

/// Return the configured endpoint URL for S3 operations
pub fn endpoint_url(&self) -> Option<&str> {
self.sdk_config.endpoint_url()
self.sdk_config.as_ref().map(|v| v.endpoint_url()).flatten()
}

/// Return the configured region used for S3 operations
pub fn region(&self) -> Option<&Region> {
self.sdk_config.region()
self.sdk_config.as_ref().map(|v| v.region()).flatten()
}

fn u64_or_default(map: &HashMap<String, String>, key: &str, default: u64) -> u64 {
Expand Down Expand Up @@ -513,10 +547,12 @@ mod tests {
let options = S3StorageOptions::try_default().unwrap();
assert_eq!(
S3StorageOptions {
sdk_config: SdkConfig::builder()
.endpoint_url("http://localhost".to_string())
.region(Region::from_static("us-west-1"))
.build(),
sdk_config: Some(
SdkConfig::builder()
.endpoint_url("http://localhost".to_string())
.region(Region::from_static("us-west-1"))
.build()
),
virtual_hosted_style_request: false,
locking_provider: Some("dynamodb".to_string()),
dynamodb_endpoint: None,
Expand Down Expand Up @@ -545,9 +581,11 @@ mod tests {
.unwrap();

let mut expected = S3StorageOptions::try_default().unwrap();
expected.sdk_config = SdkConfig::builder()
.region(Region::from_static("eu-west-1"))
.build();
expected.sdk_config = Some(
SdkConfig::builder()
.region(Region::from_static("eu-west-1"))
.build(),
);
assert_eq!(expected, options);
});
}
Expand Down Expand Up @@ -655,10 +693,12 @@ mod tests {

assert_eq!(
S3StorageOptions {
sdk_config: SdkConfig::builder()
.endpoint_url("http://localhost".to_string())
.region(Region::from_static("us-west-2"))
.build(),
sdk_config: Some(
SdkConfig::builder()
.endpoint_url("http://localhost".to_string())
.region(Region::from_static("us-west-2"))
.build()
),
virtual_hosted_style_request: false,
locking_provider: Some("dynamodb".to_string()),
dynamodb_endpoint: Some("http://localhost:dynamodb".to_string()),
Expand Down Expand Up @@ -752,4 +792,23 @@ mod tests {
}
});
}

#[test]
fn test_is_aws() {
let options = StorageOptions::default();
assert!(is_aws(&options));

let minio: HashMap<String, String> = hashmap! {
crate::constants::AWS_ENDPOINT_URL.to_string() => "http://minio:8080".to_string(),
};
let options = StorageOptions::from(minio);
assert!(!is_aws(&options));

let localstack: HashMap<String, String> = hashmap! {
crate::constants::AWS_FORCE_CREDENTIAL_LOAD.to_string() => "true".to_string(),
crate::constants::AWS_ENDPOINT_URL.to_string() => "http://minio:8080".to_string(),
};
let options = StorageOptions::from(localstack);
assert!(is_aws(&options));
}
}
13 changes: 9 additions & 4 deletions crates/aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use lazy_static::lazy_static;
use object_store::path::Path;
use serde_json::Value;
use serial_test::serial;
use tracing::log::*;

use maplit::hashmap;
use object_store::{PutOptions, PutPayload};
Expand All @@ -43,7 +44,7 @@ lazy_static! {
fn make_client() -> TestResult<DynamoDbLockClient> {
let options: S3StorageOptions = S3StorageOptions::try_default().unwrap();
Ok(DynamoDbLockClient::try_new(
&options.sdk_config,
&options.sdk_config.unwrap(),
None,
None,
None,
Expand Down Expand Up @@ -74,7 +75,7 @@ fn client_configs_via_env_variables() -> TestResult<()> {
billing_mode: BillingMode::PayPerRequest,
lock_table_name: "some_table".to_owned(),
max_elapsed_request_time: Duration::from_secs(64),
sdk_config: options.sdk_config,
sdk_config: options.sdk_config.unwrap(),
},
*config,
);
Expand All @@ -87,6 +88,7 @@ fn client_configs_via_env_variables() -> TestResult<()> {
#[tokio::test]
#[serial]
async fn test_create_s3_table() -> TestResult<()> {
let _ = pretty_env_logger::try_init();
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let _client = make_client()?;
let table_name = format!("{}_{}", "create_test", uuid::Uuid::new_v4());
Expand All @@ -98,8 +100,10 @@ async fn test_create_s3_table() -> TestResult<()> {
true,
)]);
let storage_options: HashMap<String, String> = hashmap! {
"AWS_ALLOW_HTTP".into() => "true".into(),
"AWS_ENDPOINT_URL".into() => "http://localhost:4566".into(),
deltalake_aws::constants::AWS_ALLOW_HTTP.into() => "true".into(),
// Despite not being in AWS, we should force credential resolution
deltalake_aws::constants::AWS_FORCE_CREDENTIAL_LOAD.into() => "true".into(),
deltalake_aws::constants::AWS_ENDPOINT_URL.into() => "http://localhost:4566".into(),
};
let log_store = logstore_for(Url::parse(&table_uri)?, storage_options, None)?;

Expand All @@ -113,6 +117,7 @@ async fn test_create_s3_table() -> TestResult<()> {
)
.await?;

debug!("creating a CreateBuilder");
let _created = CreateBuilder::new()
.with_log_store(log_store)
.with_partition_columns(vec!["id"])
Expand Down
4 changes: 1 addition & 3 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ use url::Url;
use crate::delta_datafusion::expr::parse_predicate_expression;
use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{
Add, DataCheck, EagerSnapshot, Invariant, LogicalFile, Snapshot, StructTypeExt,
};
use crate::kernel::{Add, DataCheck, EagerSnapshot, Invariant, Snapshot, StructTypeExt};
use crate::logstore::LogStoreRef;
use crate::table::builder::ensure_table_uri;
use crate::table::state::DeltaTableState;
Expand Down
2 changes: 0 additions & 2 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3099,7 +3099,6 @@ mod tests {
use crate::kernel::Protocol;
use crate::operations::merge::Action;

let _ = pretty_env_logger::try_init();
let schema = get_delta_schema();

let actions = vec![Action::Protocol(Protocol::new(1, 4))];
Expand Down Expand Up @@ -3194,7 +3193,6 @@ mod tests {
use crate::kernel::Protocol;
use crate::operations::merge::Action;

let _ = pretty_env_logger::try_init();
let schema = get_delta_schema();

let actions = vec![Action::Protocol(Protocol::new(1, 4))];
Expand Down
Loading

0 comments on commit 10424f1

Please sign in to comment.