From 68f6d741903e1dea7347bf30faa78b91d993a1bd Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 30 Aug 2024 20:29:26 +0000 Subject: [PATCH 1/3] fix: double-encode paths during zorder optimize when they containspecial characters The real fix should be likely done in Datafusion, allowing [ListingTableUrl] to be directly created rather than needing to parse the &str passed through --- crates/core/src/delta_datafusion/mod.rs | 4 +- crates/core/src/operations/merge/mod.rs | 2 - crates/core/src/operations/optimize.rs | 120 ++++++++++++++++++++++-- python/tests/test_optimize.py | 35 +++++++ 4 files changed, 149 insertions(+), 12 deletions(-) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index dc742b2f2d..8d64f85fb2 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -76,9 +76,7 @@ use url::Url; use crate::delta_datafusion::expr::parse_predicate_expression; use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{ - Add, DataCheck, EagerSnapshot, Invariant, LogicalFile, Snapshot, StructTypeExt, -}; +use crate::kernel::{Add, DataCheck, EagerSnapshot, Invariant, Snapshot, StructTypeExt}; use crate::logstore::LogStoreRef; use crate::table::builder::ensure_table_uri; use crate::table::state::DeltaTableState; diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index b081b387ca..7f87d30d35 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -3099,7 +3099,6 @@ mod tests { use crate::kernel::Protocol; use crate::operations::merge::Action; - let _ = pretty_env_logger::try_init(); let schema = get_delta_schema(); let actions = vec![Action::Protocol(Protocol::new(1, 4))]; @@ -3194,7 +3193,6 @@ mod tests { use crate::kernel::Protocol; use crate::operations::merge::Action; - let _ = pretty_env_logger::try_init(); let schema = get_delta_schema(); let actions = vec![Action::Protocol(Protocol::new(1, 4))]; diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index e00fd6451e..cf096d56d1 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -39,7 +39,8 @@ use parquet::basic::{Compression, ZstdLevel}; use parquet::errors::ParquetError; use parquet::file::properties::WriterProperties; use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize, Serializer}; -use tracing::debug; +use tracing::*; +use url::Url; use super::transaction::PROTOCOL; use super::writer::{PartitionWriter, PartitionWriterConfig}; @@ -137,6 +138,7 @@ impl fmt::Display for MetricDetails { } } +#[derive(Debug)] /// Metrics for a single partition pub struct PartialMetrics { /// Number of optimized files added @@ -345,6 +347,7 @@ impl From for DeltaOperation { } } +/// Generate an appropriate remove action for the optimization task fn create_remove( path: &str, partitions: &IndexMap, @@ -606,12 +609,26 @@ impl MergePlan { use datafusion_expr::expr::ScalarFunction; use datafusion_expr::{Expr, ScalarUDF}; - let locations = files + // This code is ... not ideal. Essentially `read_parquet` expects Strings that it will then + // parse as URLs and then pass back to the object store (x_x). This can cause problems when + // paths in object storage have special characters like spaces, etc. + // + // This [str::replace] i kind of a hack to address + // + let locations: Vec = files .iter() - .map(|file| format!("delta-rs:///{}", file.location)) - .collect_vec(); + .map(|om| { + format!( + "delta-rs:///{}", + str::replace(om.location.as_ref(), "%", "%25") + ) + }) + .collect(); + debug!("Reading z-order with locations are: {locations:?}"); + let df = context .ctx + // TODO: should read options have the partition columns .read_parquet(locations, ParquetReadOptions::default()) .await?; @@ -712,6 +729,7 @@ impl MergePlan { bins.len() <= num_cpus::get(), )); + debug!("Starting zorder with the columns: {zorder_columns:?} {bins:?}"); #[cfg(feature = "datafusion")] let exec_context = Arc::new(zorder::ZOrderExecContext::new( zorder_columns, @@ -719,6 +737,7 @@ impl MergePlan { max_spill_size, )?); let task_parameters = self.task_parameters.clone(); + let log_store = log_store.clone(); futures::stream::iter(bins) .map(move |(_, (partition, files))| { @@ -891,9 +910,7 @@ impl MergeBin { self.size_bytes += meta.size as i64; self.files.push(meta); } -} -impl MergeBin { fn iter(&self) -> impl Iterator { self.files.iter() } @@ -1036,6 +1053,7 @@ fn build_zorder_plan( .or_insert_with(|| (partition_values, MergeBin::new())) .1 .add(object_meta); + error!("partition_files inside the zorder plan: {partition_files:?}"); } let operation = OptimizeOperations::ZOrder(zorder_columns, partition_files); @@ -1229,7 +1247,6 @@ pub(super) mod zorder { let runtime = Arc::new(RuntimeEnv::new(config)?); runtime.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store); - use url::Url; let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime); ctx.register_udf(ScalarUDF::from(datafusion::ZOrderUDF)); Ok(Self { columns, ctx }) @@ -1269,6 +1286,7 @@ pub(super) mod zorder { fn zorder_key_datafusion( columns: &[ColumnarValue], ) -> Result { + debug!("zorder_key_datafusion: {columns:#?}"); let length = columns .iter() .map(|col| match col { @@ -1423,6 +1441,94 @@ pub(super) mod zorder { .await; assert!(res.is_ok()); } + + /// Issue + #[tokio::test] + async fn test_zorder_space_in_partition_value() { + use arrow_schema::Schema as ArrowSchema; + let _ = pretty_env_logger::try_init(); + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("modified", DataType::Utf8, true), + Field::new("country", DataType::Utf8, true), + Field::new("value", DataType::Int32, true), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(arrow::array::StringArray::from(vec![ + "2021-02-01", + "2021-02-01", + "2021-02-02", + "2021-02-02", + ])), + Arc::new(arrow::array::StringArray::from(vec![ + "Germany", + "China", + "Canada", + "Dominican Republic", + ])), + Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])), + //Arc::new(arrow::array::StringArray::from(vec!["Dominican Republic"])), + //Arc::new(arrow::array::Int32Array::from(vec![100])), + ], + ) + .unwrap(); + // write some data + let table = crate::DeltaOps::new_in_memory() + .write(vec![batch.clone()]) + .with_partition_columns(vec!["country"]) + .with_save_mode(crate::protocol::SaveMode::Overwrite) + .await + .unwrap(); + + let res = crate::DeltaOps(table) + .optimize() + .with_type(OptimizeType::ZOrder(vec!["modified".into()])) + .await; + assert!(res.is_ok(), "Failed to optimize: {res:#?}"); + } + + #[tokio::test] + async fn test_zorder_space_in_partition_value_garbage() { + use arrow_schema::Schema as ArrowSchema; + let _ = pretty_env_logger::try_init(); + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("modified", DataType::Utf8, true), + Field::new("country", DataType::Utf8, true), + Field::new("value", DataType::Int32, true), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(arrow::array::StringArray::from(vec![ + "2021-02-01", + "2021-02-01", + "2021-02-02", + "2021-02-02", + ])), + Arc::new(arrow::array::StringArray::from(vec![ + "Germany", "China", "Canada", "USA$$!", + ])), + Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])), + ], + ) + .unwrap(); + // write some data + let table = crate::DeltaOps::new_in_memory() + .write(vec![batch.clone()]) + .with_partition_columns(vec!["country"]) + .with_save_mode(crate::protocol::SaveMode::Overwrite) + .await + .unwrap(); + + let res = crate::DeltaOps(table) + .optimize() + .with_type(OptimizeType::ZOrder(vec!["modified".into()])) + .await; + assert!(res.is_ok(), "Failed to optimize: {res:#?}"); + } } } diff --git a/python/tests/test_optimize.py b/python/tests/test_optimize.py index 2c9685e116..3bce0ad35f 100644 --- a/python/tests/test_optimize.py +++ b/python/tests/test_optimize.py @@ -4,6 +4,13 @@ import pyarrow as pa import pytest +try: + import pandas as pd +except ModuleNotFoundError: + _has_pandas = False +else: + _has_pandas = True + from deltalake import DeltaTable, write_deltalake from deltalake.table import CommitProperties @@ -132,3 +139,31 @@ def test_optimize_schema_evolved_table( assert dt.to_pyarrow_table().sort_by([("foo", "ascending")]) == data.sort_by( [("foo", "ascending")] ) + + +@pytest.mark.pandas +def test_zorder_with_space_partition(tmp_path: pathlib.Path): + df = pd.DataFrame( + { + "user": ["James", "Anna", "Sara", "Martin"], + "country": ["United States", "Canada", "Costa Rica", "South Africa"], + "age": [34, 23, 45, 26], + } + ) + + write_deltalake( + table_or_uri=tmp_path, + data=df, + mode="overwrite", + partition_by=["country"], + ) + + test_table = DeltaTable(tmp_path) + + # retrieve by partition works fine + partitioned_df = test_table.to_pandas( + partitions=[("country", "=", "United States")], + ) + print(partitioned_df) + + test_table.optimize.z_order(columns=["user"]) From a72756d80526254beef15d5f3bc27f43ce1ef762 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 22 Sep 2024 18:39:58 +0200 Subject: [PATCH 2/3] fix: check lowercase config keys with/without aws prefix --- crates/aws/src/credentials.rs | 13 +++++-- crates/aws/src/lib.rs | 24 ++++++++---- crates/aws/src/logstore/dynamodb_logstore.rs | 2 +- crates/aws/src/storage.rs | 39 ++++++++++++++------ 4 files changed, 54 insertions(+), 24 deletions(-) diff --git a/crates/aws/src/credentials.rs b/crates/aws/src/credentials.rs index 8052026f5f..71441bf05e 100644 --- a/crates/aws/src/credentials.rs +++ b/crates/aws/src/credentials.rs @@ -19,7 +19,7 @@ use deltalake_core::storage::StorageOptions; use deltalake_core::DeltaResult; use tracing::log::*; -use crate::constants; +use crate::constants::{self, AWS_ENDPOINT_URL}; /// An [object_store::CredentialProvider] which handles converting a populated [SdkConfig] /// into a necessary [AwsCredential] type for configuring [object_store::aws::AmazonS3] @@ -167,12 +167,19 @@ pub async fn resolve_credentials(options: StorageOptions) -> DeltaResult CredentialsProviderChain::first_try( "StorageOptions", - OptionsCredentialsProvider { options }, + OptionsCredentialsProvider { + options: options.clone(), + }, ) .or_else("DefaultChain", default_provider), }; diff --git a/crates/aws/src/lib.rs b/crates/aws/src/lib.rs index 9834fc8b54..ffaae15333 100644 --- a/crates/aws/src/lib.rs +++ b/crates/aws/src/lib.rs @@ -55,18 +55,26 @@ impl LogStoreFactory for S3LogStoreFactory { let store = url_prefix_handler(store, Path::parse(location.path())?); // With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent - if options - .0 - .contains_key(AmazonS3ConfigKey::ConditionalPut.as_ref()) - { + if options.0.keys().any(|key| { + let key = key.to_ascii_lowercase(); + vec![ + AmazonS3ConfigKey::ConditionalPut.as_ref(), + "conditional_put", + ] + .contains(&key.as_str()) + }) { debug!("S3LogStoreFactory has been asked to create a default LogStore where the underlying store has Conditonal Put enabled - no locking provider required"); return Ok(default_logstore(store, location, options)); } - if options - .0 - .contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref()) - { + if options.0.keys().any(|key| { + let key = key.to_ascii_lowercase(); + vec![ + AmazonS3ConfigKey::CopyIfNotExists.as_ref(), + "copy_if_not_exists", + ] + .contains(&key.as_str()) + }) { debug!("S3LogStoreFactory has been asked to create a LogStore where the underlying store has copy-if-not-exists enabled - no locking provider required"); return Ok(logstore::default_s3_logstore(store, location, options)); } diff --git a/crates/aws/src/logstore/dynamodb_logstore.rs b/crates/aws/src/logstore/dynamodb_logstore.rs index 5307040538..202df1709e 100644 --- a/crates/aws/src/logstore/dynamodb_logstore.rs +++ b/crates/aws/src/logstore/dynamodb_logstore.rs @@ -45,7 +45,7 @@ impl S3DynamoDbLogStore { object_store: ObjectStoreRef, ) -> DeltaResult { let lock_client = DynamoDbLockClient::try_new( - &s3_options.sdk_config, + &s3_options.sdk_config.clone().unwrap(), s3_options .extra_opts .get(constants::LOCK_TABLE_KEY_NAME) diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index da8528a360..64c34d5b3a 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -14,6 +14,7 @@ use deltalake_core::storage::{ use deltalake_core::{DeltaResult, DeltaTableError, ObjectStoreError, Path}; use futures::stream::BoxStream; use futures::Future; +use object_store::aws::S3CopyIfNotExists; use std::collections::HashMap; use std::fmt::Debug; use std::ops::Range; @@ -72,11 +73,14 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { storage_options: &StorageOptions, ) -> DeltaResult<(ObjectStoreRef, Path)> { let options = self.with_env_s3(storage_options); - let sdk_config = execute_sdk_future(crate::credentials::resolve_credentials( - storage_options.clone(), - ))??; - let os_credentials = Arc::new(crate::credentials::AWSForObjectStore::new(sdk_config)); + let sdk_config = if options.0.contains_key(constants::AWS_ENDPOINT_URL) { + None + } else { + Some(execute_sdk_future( + crate::credentials::resolve_credentials(storage_options.clone()), + )??) + }; let mut builder = AmazonS3Builder::new().with_url(url.to_string()); @@ -91,7 +95,14 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { source: Box::new(e), })?; let prefix = Path::parse(path)?; - let inner = builder.with_credentials(os_credentials).build()?; + let inner = if let Some(sdk_config) = sdk_config { + builder.with_credentials(Arc::new(crate::credentials::AWSForObjectStore::new( + sdk_config, + ))) + } else { + builder + } + .build()?; let store = aws_storage_handler(limit_store_handler(inner, &options), &options)?; debug!("Initialized the object store: {store:?}"); @@ -139,7 +150,7 @@ pub struct S3StorageOptions { pub s3_get_internal_server_error_retries: usize, pub allow_unsafe_rename: bool, pub extra_opts: HashMap, - pub sdk_config: SdkConfig, + pub sdk_config: Option, } impl Eq for S3StorageOptions {} @@ -154,8 +165,6 @@ impl PartialEq for S3StorageOptions { == other.s3_get_internal_server_error_retries && self.allow_unsafe_rename == other.allow_unsafe_rename && self.extra_opts == other.extra_opts - && self.sdk_config.endpoint_url() == other.sdk_config.endpoint_url() - && self.sdk_config.region() == other.sdk_config.region() } } @@ -198,8 +207,14 @@ impl S3StorageOptions { .unwrap_or(false); let storage_options = StorageOptions(options.clone()); - let sdk_config = - execute_sdk_future(crate::credentials::resolve_credentials(storage_options))??; + + let sdk_config = if storage_options.0.contains_key(constants::AWS_ENDPOINT_URL) { + None + } else { + Some(execute_sdk_future( + crate::credentials::resolve_credentials(storage_options.clone()), + )??) + }; Ok(Self { virtual_hosted_style_request, @@ -216,12 +231,12 @@ impl S3StorageOptions { /// Return the configured endpoint URL for S3 operations pub fn endpoint_url(&self) -> Option<&str> { - self.sdk_config.endpoint_url() + self.sdk_config.as_ref().map(|v| v.endpoint_url()).flatten() } /// Return the configured region used for S3 operations pub fn region(&self) -> Option<&Region> { - self.sdk_config.region() + self.sdk_config.as_ref().map(|v| v.region()).flatten() } fn u64_or_default(map: &HashMap, key: &str, default: u64) -> u64 { From 66441d10ec54a8afb31f6b46489f15ef3369d8bd Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sun, 22 Sep 2024 19:28:55 +0000 Subject: [PATCH 3/3] feat: disable AWS configuration resolution for non-AWS S3 storage scenarios This should reduce bugs, reduce warnings, and improve performance when somebody is using this crate against an S3-alike service rather than AWS S3 directly. Currently I cannot think of a valid scenarijo where one would use AWS_ENDPOINT_URL against actual S3 buckets, so using that as the heuristic --- crates/aws/src/constants.rs | 3 + crates/aws/src/lib.rs | 2 +- crates/aws/src/storage.rs | 108 ++++++++++++++------ crates/aws/tests/integration_s3_dynamodb.rs | 13 ++- 4 files changed, 89 insertions(+), 37 deletions(-) diff --git a/crates/aws/src/constants.rs b/crates/aws/src/constants.rs index 73d2da1b48..90c23ff572 100644 --- a/crates/aws/src/constants.rs +++ b/crates/aws/src/constants.rs @@ -84,6 +84,9 @@ pub const AWS_EC2_METADATA_DISABLED: &str = "AWS_EC2_METADATA_DISABLED"; /// Defaults to 100 pub const AWS_EC2_METADATA_TIMEOUT: &str = "AWS_EC2_METADATA_TIMEOUT"; +/// Force the delta-rs to attempt to load AWS credentials +pub const AWS_FORCE_CREDENTIAL_LOAD: &str = "AWS_FORCE_CREDENTIAL_LOAD"; + /// The list of option keys owned by the S3 module. /// Option keys not contained in this list will be added to the `extra_opts` /// field of [crate::storage::s3::S3StorageOptions]. diff --git a/crates/aws/src/lib.rs b/crates/aws/src/lib.rs index ffaae15333..ddb768bdd9 100644 --- a/crates/aws/src/lib.rs +++ b/crates/aws/src/lib.rs @@ -725,7 +725,7 @@ mod tests { let factory = S3LogStoreFactory::default(); let store = InMemory::new(); let url = Url::parse("s3://test-bucket").unwrap(); - std::env::remove_var(storage::s3_constants::AWS_S3_LOCKING_PROVIDER); + std::env::remove_var(crate::constants::AWS_S3_LOCKING_PROVIDER); let logstore = factory .with_options(Arc::new(store), &url, &StorageOptions::from(HashMap::new())) .unwrap(); diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index 64c34d5b3a..a6735b1c0f 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -74,14 +74,7 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { ) -> DeltaResult<(ObjectStoreRef, Path)> { let options = self.with_env_s3(storage_options); - let sdk_config = if options.0.contains_key(constants::AWS_ENDPOINT_URL) { - None - } else { - Some(execute_sdk_future( - crate::credentials::resolve_credentials(storage_options.clone()), - )??) - }; - + // All S3-likes should start their builder the same way let mut builder = AmazonS3Builder::new().with_url(url.to_string()); for (key, value) in options.0.iter() { @@ -95,14 +88,18 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { source: Box::new(e), })?; let prefix = Path::parse(path)?; - let inner = if let Some(sdk_config) = sdk_config { - builder.with_credentials(Arc::new(crate::credentials::AWSForObjectStore::new( - sdk_config, - ))) - } else { - builder + + if is_aws(storage_options) { + debug!("Detected AWS S3, resolving credentials"); + let sdk_config = execute_sdk_future(crate::credentials::resolve_credentials( + storage_options.clone(), + ))??; + builder = builder.with_credentials(Arc::new( + crate::credentials::AWSForObjectStore::new(sdk_config), + )); } - .build()?; + + let inner = builder.build()?; let store = aws_storage_handler(limit_store_handler(inner, &options), &options)?; debug!("Initialized the object store: {store:?}"); @@ -136,6 +133,26 @@ fn aws_storage_handler( } } +// Determine whether this crate is being configured for use with native AWS S3 or an S3-alike +// +// This function will rteturn true in the default case since it's most likely that the absence of +// options will mean default/S3 configuration +fn is_aws(options: &StorageOptions) -> bool { + if options + .0 + .contains_key(crate::constants::AWS_FORCE_CREDENTIAL_LOAD) + { + return true; + } + if options + .0 + .contains_key(crate::constants::AWS_S3_LOCKING_PROVIDER) + { + return true; + } + !options.0.contains_key(crate::constants::AWS_ENDPOINT_URL) +} + /// Options used to configure the [S3StorageBackend]. /// /// Available options are described in [s3_constants]. @@ -208,12 +225,14 @@ impl S3StorageOptions { let storage_options = StorageOptions(options.clone()); - let sdk_config = if storage_options.0.contains_key(constants::AWS_ENDPOINT_URL) { - None - } else { - Some(execute_sdk_future( - crate::credentials::resolve_credentials(storage_options.clone()), - )??) + let sdk_config = match is_aws(&storage_options) { + false => None, + true => { + debug!("Detected AWS S3, resolving credentials"); + Some(execute_sdk_future( + crate::credentials::resolve_credentials(storage_options.clone()), + )??) + } }; Ok(Self { @@ -528,10 +547,12 @@ mod tests { let options = S3StorageOptions::try_default().unwrap(); assert_eq!( S3StorageOptions { - sdk_config: SdkConfig::builder() - .endpoint_url("http://localhost".to_string()) - .region(Region::from_static("us-west-1")) - .build(), + sdk_config: Some( + SdkConfig::builder() + .endpoint_url("http://localhost".to_string()) + .region(Region::from_static("us-west-1")) + .build() + ), virtual_hosted_style_request: false, locking_provider: Some("dynamodb".to_string()), dynamodb_endpoint: None, @@ -560,9 +581,11 @@ mod tests { .unwrap(); let mut expected = S3StorageOptions::try_default().unwrap(); - expected.sdk_config = SdkConfig::builder() - .region(Region::from_static("eu-west-1")) - .build(); + expected.sdk_config = Some( + SdkConfig::builder() + .region(Region::from_static("eu-west-1")) + .build(), + ); assert_eq!(expected, options); }); } @@ -670,10 +693,12 @@ mod tests { assert_eq!( S3StorageOptions { - sdk_config: SdkConfig::builder() - .endpoint_url("http://localhost".to_string()) - .region(Region::from_static("us-west-2")) - .build(), + sdk_config: Some( + SdkConfig::builder() + .endpoint_url("http://localhost".to_string()) + .region(Region::from_static("us-west-2")) + .build() + ), virtual_hosted_style_request: false, locking_provider: Some("dynamodb".to_string()), dynamodb_endpoint: Some("http://localhost:dynamodb".to_string()), @@ -767,4 +792,23 @@ mod tests { } }); } + + #[test] + fn test_is_aws() { + let options = StorageOptions::default(); + assert!(is_aws(&options)); + + let minio: HashMap = hashmap! { + crate::constants::AWS_ENDPOINT_URL.to_string() => "http://minio:8080".to_string(), + }; + let options = StorageOptions::from(minio); + assert!(!is_aws(&options)); + + let localstack: HashMap = hashmap! { + crate::constants::AWS_FORCE_CREDENTIAL_LOAD.to_string() => "true".to_string(), + crate::constants::AWS_ENDPOINT_URL.to_string() => "http://minio:8080".to_string(), + }; + let options = StorageOptions::from(localstack); + assert!(is_aws(&options)); + } } diff --git a/crates/aws/tests/integration_s3_dynamodb.rs b/crates/aws/tests/integration_s3_dynamodb.rs index 28c99664f4..da0b0e06c8 100644 --- a/crates/aws/tests/integration_s3_dynamodb.rs +++ b/crates/aws/tests/integration_s3_dynamodb.rs @@ -23,6 +23,7 @@ use lazy_static::lazy_static; use object_store::path::Path; use serde_json::Value; use serial_test::serial; +use tracing::log::*; use maplit::hashmap; use object_store::{PutOptions, PutPayload}; @@ -43,7 +44,7 @@ lazy_static! { fn make_client() -> TestResult { let options: S3StorageOptions = S3StorageOptions::try_default().unwrap(); Ok(DynamoDbLockClient::try_new( - &options.sdk_config, + &options.sdk_config.unwrap(), None, None, None, @@ -74,7 +75,7 @@ fn client_configs_via_env_variables() -> TestResult<()> { billing_mode: BillingMode::PayPerRequest, lock_table_name: "some_table".to_owned(), max_elapsed_request_time: Duration::from_secs(64), - sdk_config: options.sdk_config, + sdk_config: options.sdk_config.unwrap(), }, *config, ); @@ -87,6 +88,7 @@ fn client_configs_via_env_variables() -> TestResult<()> { #[tokio::test] #[serial] async fn test_create_s3_table() -> TestResult<()> { + let _ = pretty_env_logger::try_init(); let context = IntegrationContext::new(Box::new(S3Integration::default()))?; let _client = make_client()?; let table_name = format!("{}_{}", "create_test", uuid::Uuid::new_v4()); @@ -98,8 +100,10 @@ async fn test_create_s3_table() -> TestResult<()> { true, )]); let storage_options: HashMap = hashmap! { - "AWS_ALLOW_HTTP".into() => "true".into(), - "AWS_ENDPOINT_URL".into() => "http://localhost:4566".into(), + deltalake_aws::constants::AWS_ALLOW_HTTP.into() => "true".into(), + // Despite not being in AWS, we should force credential resolution + deltalake_aws::constants::AWS_FORCE_CREDENTIAL_LOAD.into() => "true".into(), + deltalake_aws::constants::AWS_ENDPOINT_URL.into() => "http://localhost:4566".into(), }; let log_store = logstore_for(Url::parse(&table_uri)?, storage_options, None)?; @@ -113,6 +117,7 @@ async fn test_create_s3_table() -> TestResult<()> { ) .await?; + debug!("creating a CreateBuilder"); let _created = CreateBuilder::new() .with_log_store(log_store) .with_partition_columns(vec!["id"])