diff --git a/crates/deltalake-aws/src/lib.rs b/crates/deltalake-aws/src/lib.rs index dd33854456..2630f80512 100644 --- a/crates/deltalake-aws/src/lib.rs +++ b/crates/deltalake-aws/src/lib.rs @@ -5,6 +5,7 @@ pub mod logstore; pub mod storage; use lazy_static::lazy_static; +use object_store::aws::AmazonS3ConfigKey; use regex::Regex; use std::{ collections::HashMap, @@ -41,6 +42,17 @@ impl LogStoreFactory for S3LogStoreFactory { options: &StorageOptions, ) -> DeltaResult> { let store = url_prefix_handler(store, Path::parse(location.path())?)?; + + if options + .0 + .contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref()) + { + debug!("S3LogStoreFactory has been asked to create a LogStore where the underlying store has copy-if-not-exists enabled - no locking provider required"); + return Ok(deltalake_core::logstore::default_logstore( + store, location, options, + )); + } + let s3_options = S3StorageOptions::from_map(&options.0); if s3_options.locking_provider.as_deref() != Some("dynamodb") { diff --git a/crates/deltalake-aws/src/storage.rs b/crates/deltalake-aws/src/storage.rs index b71d17bb64..87d488b54f 100644 --- a/crates/deltalake-aws/src/storage.rs +++ b/crates/deltalake-aws/src/storage.rs @@ -56,7 +56,16 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { }), )?; + if options + .0 + .contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref()) + { + // If the copy-if-not-exists env var is set, we don't need to instantiate a locking client or check for allow-unsafe-rename. + return Ok((Arc::from(store), prefix)); + } + let options = S3StorageOptions::from_map(&options.0); + let store = S3StorageBackend::try_new( store.into(), Some("dynamodb") == options.locking_provider.as_deref() || options.allow_unsafe_rename, diff --git a/crates/deltalake-core/src/kernel/actions/types.rs b/crates/deltalake-core/src/kernel/actions/types.rs index 3dc177fb5b..f9a2eb9909 100644 --- a/crates/deltalake-core/src/kernel/actions/types.rs +++ b/crates/deltalake-core/src/kernel/actions/types.rs @@ -632,6 +632,7 @@ pub struct Remove { /// [RFC 2396 URI Generic Syntax], which needs to be decoded to get the data file path. /// /// [RFC 2396 URI Generic Syntax]: https://www.ietf.org/rfc/rfc2396.txt + #[serde(with = "serde_path")] pub path: String, /// When `false` the logical file must already be present in the table or the records diff --git a/crates/deltalake-core/src/table/state.rs b/crates/deltalake-core/src/table/state.rs index 03483ef8ff..061abac6af 100644 --- a/crates/deltalake-core/src/table/state.rs +++ b/crates/deltalake-core/src/table/state.rs @@ -363,9 +363,11 @@ impl DeltaTableState { #[cfg(test)] mod tests { + use super::*; use crate::kernel::Txn; use pretty_assertions::assert_eq; + use serde_json::json; #[test] fn state_round_trip() { @@ -414,4 +416,34 @@ mod tests { assert_eq!(2, *state.app_transaction_version().get("abc").unwrap()); assert_eq!(1, *state.app_transaction_version().get("xyz").unwrap()); } + + #[test] + fn test_merging_deserialized_special_tombstones_and_files_paths() { + let add = serde_json::from_value(json!({ + "path": "x=A%252FA/part-00016-94175338-2acc-40c2-a68a-d08ba677975f.c000.snappy.parquet", + "partitionValues": {"x": "A/A"}, + "size": 460, + "modificationTime": 1631873480, + "dataChange": true + })) + .unwrap(); + + let remove = serde_json::from_value(json!({ + "path": "x=A%252FA/part-00016-94175338-2acc-40c2-a68a-d08ba677975f.c000.snappy.parquet", + "deletionTimestamp": 1631873481, + "partitionValues": {"x": "A/A"}, + "size": 460, + "modificationTime": 1631873481, + "dataChange": true + })) + .unwrap(); + + let state = DeltaTableState::from_actions(vec![Action::Add(add)], 0).unwrap(); + let state_next = DeltaTableState::from_actions(vec![Action::Remove(remove)], 1).unwrap(); + + let mut merged_state = state.clone(); + merged_state.merge(state_next, true, true); + + assert_eq!(merged_state.files().len(), 0); + } }