Skip to content

Commit

Permalink
Merge branch 'main' into docs-why-use
Browse files Browse the repository at this point in the history
  • Loading branch information
MrPowers authored Jan 17, 2024
2 parents da9bf35 + 8fec38a commit 6d81e0f
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 0 deletions.
12 changes: 12 additions & 0 deletions crates/deltalake-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod logstore;
pub mod storage;

use lazy_static::lazy_static;
use object_store::aws::AmazonS3ConfigKey;
use regex::Regex;
use std::{
collections::HashMap,
Expand Down Expand Up @@ -41,6 +42,17 @@ impl LogStoreFactory for S3LogStoreFactory {
options: &StorageOptions,
) -> DeltaResult<Arc<dyn LogStore>> {
let store = url_prefix_handler(store, Path::parse(location.path())?)?;

if options
.0
.contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref())
{
debug!("S3LogStoreFactory has been asked to create a LogStore where the underlying store has copy-if-not-exists enabled - no locking provider required");
return Ok(deltalake_core::logstore::default_logstore(
store, location, options,
));
}

let s3_options = S3StorageOptions::from_map(&options.0);

if s3_options.locking_provider.as_deref() != Some("dynamodb") {
Expand Down
9 changes: 9 additions & 0 deletions crates/deltalake-aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,16 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
}),
)?;

if options
.0
.contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref())
{
// If the copy-if-not-exists env var is set, we don't need to instantiate a locking client or check for allow-unsafe-rename.
return Ok((Arc::from(store), prefix));
}

let options = S3StorageOptions::from_map(&options.0);

let store = S3StorageBackend::try_new(
store.into(),
Some("dynamodb") == options.locking_provider.as_deref() || options.allow_unsafe_rename,
Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-core/src/kernel/actions/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ pub struct Remove {
/// [RFC 2396 URI Generic Syntax], which needs to be decoded to get the data file path.
///
/// [RFC 2396 URI Generic Syntax]: https://www.ietf.org/rfc/rfc2396.txt
#[serde(with = "serde_path")]
pub path: String,

/// When `false` the logical file must already be present in the table or the records
Expand Down
32 changes: 32 additions & 0 deletions crates/deltalake-core/src/table/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,11 @@ impl DeltaTableState {

#[cfg(test)]
mod tests {

use super::*;
use crate::kernel::Txn;
use pretty_assertions::assert_eq;
use serde_json::json;

#[test]
fn state_round_trip() {
Expand Down Expand Up @@ -414,4 +416,34 @@ mod tests {
assert_eq!(2, *state.app_transaction_version().get("abc").unwrap());
assert_eq!(1, *state.app_transaction_version().get("xyz").unwrap());
}

#[test]
fn test_merging_deserialized_special_tombstones_and_files_paths() {
let add = serde_json::from_value(json!({
"path": "x=A%252FA/part-00016-94175338-2acc-40c2-a68a-d08ba677975f.c000.snappy.parquet",
"partitionValues": {"x": "A/A"},
"size": 460,
"modificationTime": 1631873480,
"dataChange": true
}))
.unwrap();

let remove = serde_json::from_value(json!({
"path": "x=A%252FA/part-00016-94175338-2acc-40c2-a68a-d08ba677975f.c000.snappy.parquet",
"deletionTimestamp": 1631873481,
"partitionValues": {"x": "A/A"},
"size": 460,
"modificationTime": 1631873481,
"dataChange": true
}))
.unwrap();

let state = DeltaTableState::from_actions(vec![Action::Add(add)], 0).unwrap();
let state_next = DeltaTableState::from_actions(vec![Action::Remove(remove)], 1).unwrap();

let mut merged_state = state.clone();
merged_state.merge(state_next, true, true);

assert_eq!(merged_state.files().len(), 0);
}
}

0 comments on commit 6d81e0f

Please sign in to comment.