From 9cacdd865af7c28dc01f4cf6ef2c9920e56458ce Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Tue, 3 Dec 2024 04:46:42 +0000 Subject: [PATCH 1/4] chore: add more robust regression tests for the checkpoint related failure(s) See #3030 Signed-off-by: R. Tyler Croy --- crates/core/src/protocol/checkpoints.rs | 26 ++++++++++++++++++++++--- python/tests/test_checkpoint.py | 25 ++++++++++++------------ 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index 42ab5355b7..bf9cdf1fea 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -1163,10 +1163,16 @@ mod tests { } /// + #[cfg(feature = "datafusion")] #[tokio::test] async fn test_create_checkpoint_overwrite() -> DeltaResult<()> { use crate::protocol::SaveMode; + use crate::writer::test_utils::datafusion::get_data_sorted; use crate::writer::test_utils::get_arrow_schema; + use datafusion::assert_batches_sorted_eq; + + let tmp_dir = tempfile::tempdir().unwrap(); + let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap(); let batch = RecordBatch::try_new( Arc::clone(&get_arrow_schema(&None)), @@ -1177,13 +1183,15 @@ mod tests { ], ) .unwrap(); - let table = DeltaOps::try_from_uri_with_storage_options("memory://", HashMap::default()) + + let mut table = DeltaOps::try_from_uri(tmp_path.as_os_str().to_str().unwrap()) .await? .write(vec![batch]) .await?; + table.load().await?; assert_eq!(table.version(), 0); - create_checkpoint_for(0, table.snapshot().unwrap(), table.log_store.as_ref()).await?; + create_checkpoint(&table).await?; let batch = RecordBatch::try_new( Arc::clone(&get_arrow_schema(&None)), @@ -1194,11 +1202,23 @@ mod tests { ], ) .unwrap(); - let table = DeltaOps(table) + + let table = DeltaOps::try_from_uri(tmp_path.as_os_str().to_str().unwrap()) + .await? .write(vec![batch]) .with_save_mode(SaveMode::Overwrite) .await?; assert_eq!(table.version(), 1); + + let expected = [ + "+----+-------+------------+", + "| id | value | modified |", + "+----+-------+------------+", + "| A | 0 | 2021-02-02 |", + "+----+-------+------------+", + ]; + let actual = get_data_sorted(&table, "id,value,modified").await; + assert_batches_sorted_eq!(&expected, &actual); Ok(()) } } diff --git a/python/tests/test_checkpoint.py b/python/tests/test_checkpoint.py index 5961a57b09..309a1f3663 100644 --- a/python/tests/test_checkpoint.py +++ b/python/tests/test_checkpoint.py @@ -483,18 +483,19 @@ def test_checkpoint_with_multiple_writes(tmp_path: pathlib.Path): } ), ) - DeltaTable(tmp_path).create_checkpoint() + dt = DeltaTable(tmp_path) + dt.create_checkpoint() + assert dt.version() == 0 + df = pd.DataFrame( + { + "a": ["a"], + "b": [100], + } + ) + write_deltalake(tmp_path, df, mode="overwrite") dt = DeltaTable(tmp_path) + assert dt.version() == 1 + new_df = dt.to_pandas() print(dt.to_pandas()) - - write_deltalake( - tmp_path, - pd.DataFrame( - { - "a": ["a"], - "b": [100], - } - ), - mode="overwrite", - ) + assert len(new_df) == 1, "We overwrote! there should only be one row" From 1ab882a2f16f89c55bf9c6474799b037d1ce06bd Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Wed, 4 Dec 2024 15:25:42 +0000 Subject: [PATCH 2/4] fix: prevent attempting to read empty DVInfo The root cause remains elusive but with the recent arrow upgrade the nullable deletionVector struct, which contains non-nullable fields, is being read out as "empty" in the default case rather than as null. This causes the "seen" code in log replay to fail to correctly identify files which should be marked as removed when computing the state of the snapshot. This change introduces another workaround that I am still not thrilled about :unamused: Fixes #3030 Signed-off-by: R. Tyler Croy --- Cargo.toml | 2 +- crates/core/src/kernel/snapshot/replay.rs | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a48f8c7894..9fe8c44bb3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ debug = true debug = "line-tables-only" [workspace.dependencies] -delta_kernel = { version = "0.4.1", features = ["sync-engine"] } +delta_kernel = { version = "0.4.1", features = ["default-engine"] } #delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] } # arrow diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index 1b18b61bc7..316ac6dbba 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -20,7 +20,7 @@ use hashbrown::HashSet; use itertools::Itertools; use percent_encoding::percent_decode_str; use pin_project_lite::pin_project; -use tracing::debug; +use tracing::log::*; use super::parse::collect_map; use super::ReplayVisitor; @@ -440,6 +440,14 @@ pub(super) struct DVInfo<'a> { fn seen_key(info: &FileInfo<'_>) -> String { let path = percent_decode_str(info.path).decode_utf8_lossy(); if let Some(dv) = &info.dv { + // If storage_type is empty then delta-rs has somehow gotten an empty rather than a null + // deletion vector, oooof + // + // See #3030 + if dv.storage_type.is_empty() { + warn!("An empty but not nullable deletionVector was seen for {info:?}"); + return path.to_string(); + } if let Some(offset) = &dv.offset { format!( "{}::{}{}@{offset}", From fbc4496cebc6f2ecb66669c430c1d3840fc1cda9 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Wed, 4 Dec 2024 20:51:10 +0100 Subject: [PATCH 3/4] fix: add null checks when working with structs Signed-off-by: R. Tyler Croy --- crates/core/src/kernel/snapshot/log_data.rs | 50 ++++++------ crates/core/src/kernel/snapshot/parse.rs | 90 +++++++++++++-------- crates/core/src/kernel/snapshot/replay.rs | 33 +++++--- 3 files changed, 103 insertions(+), 70 deletions(-) diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 562ca3da90..f22f88ad23 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -245,23 +245,14 @@ impl LogicalFile<'_> { /// Defines a deletion vector pub fn deletion_vector(&self) -> Option> { - if let Some(arr) = self.deletion_vector.as_ref() { - // With v0.22 and the upgrade to a more recent arrow. Reading nullable structs with - // non-nullable entries back out of parquet is resulting in the DeletionVector having - // an empty string rather than a null. The addition check on the value ensures that a - // [DeletionVectorView] is not created in this scenario - // - // - if arr.storage_type.is_valid(self.index) - && !arr.storage_type.value(self.index).is_empty() - { - return Some(DeletionVectorView { + self.deletion_vector.as_ref().and_then(|arr| { + arr.storage_type + .is_valid(self.index) + .then_some(DeletionVectorView { data: arr, index: self.index, - }); - } - } - None + }) + }) } /// The number of records stored in the data file. @@ -380,18 +371,23 @@ impl<'a> FileStatsAccessor<'a> { ); let deletion_vector = extract_and_cast_opt::(data, "add.deletionVector"); let deletion_vector = deletion_vector.and_then(|dv| { - let storage_type = extract_and_cast::(dv, "storageType").ok()?; - let path_or_inline_dv = extract_and_cast::(dv, "pathOrInlineDv").ok()?; - let size_in_bytes = extract_and_cast::(dv, "sizeInBytes").ok()?; - let cardinality = extract_and_cast::(dv, "cardinality").ok()?; - let offset = extract_and_cast_opt::(dv, "offset"); - Some(DeletionVector { - storage_type, - path_or_inline_dv, - size_in_bytes, - cardinality, - offset, - }) + if dv.null_count() == dv.len() { + None + } else { + let storage_type = extract_and_cast::(dv, "storageType").ok()?; + let path_or_inline_dv = + extract_and_cast::(dv, "pathOrInlineDv").ok()?; + let size_in_bytes = extract_and_cast::(dv, "sizeInBytes").ok()?; + let cardinality = extract_and_cast::(dv, "cardinality").ok()?; + let offset = extract_and_cast_opt::(dv, "offset"); + Some(DeletionVector { + storage_type, + path_or_inline_dv, + size_in_bytes, + cardinality, + offset, + }) + } }); Ok(Self { diff --git a/crates/core/src/kernel/snapshot/parse.rs b/crates/core/src/kernel/snapshot/parse.rs index f75744691e..e8630cbe0c 100644 --- a/crates/core/src/kernel/snapshot/parse.rs +++ b/crates/core/src/kernel/snapshot/parse.rs @@ -78,6 +78,10 @@ pub(super) fn read_adds(array: &dyn ProvidesColumnByName) -> DeltaResult(array, "add") { + // Stop early if all values are null + if arr.null_count() == arr.len() { + return Ok(vec![]); + } let path = ex::extract_and_cast::(arr, "path")?; let pvs = ex::extract_and_cast_opt::(arr, "partitionValues"); let size = ex::extract_and_cast::(arr, "size")?; @@ -94,22 +98,33 @@ pub(super) fn read_adds(array: &dyn ProvidesColumnByName) -> DeltaResult(d, "sizeInBytes")?; let cardinality = ex::extract_and_cast::(d, "cardinality")?; - Box::new(|idx: usize| { - if ex::read_str(storage_type, idx).is_ok() { - Some(DeletionVectorDescriptor { - storage_type: std::str::FromStr::from_str( - ex::read_str(storage_type, idx).ok()?, - ) - .ok()?, - path_or_inline_dv: ex::read_str(path_or_inline_dv, idx).ok()?.to_string(), - offset: ex::read_primitive_opt(offset, idx), - size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?, - cardinality: ex::read_primitive(cardinality, idx).ok()?, - }) - } else { - None - } - }) + // Column might exist but have nullability set for the whole array, so we just return Nones + if d.null_count() == d.len() { + Box::new(|_| None) + } else { + Box::new(|idx: usize| { + d.is_valid(idx) + .then(|| { + if ex::read_str(storage_type, idx).is_ok() { + Some(DeletionVectorDescriptor { + storage_type: std::str::FromStr::from_str( + ex::read_str(storage_type, idx).ok()?, + ) + .ok()?, + path_or_inline_dv: ex::read_str(path_or_inline_dv, idx) + .ok()? + .to_string(), + offset: ex::read_primitive_opt(offset, idx), + size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?, + cardinality: ex::read_primitive(cardinality, idx).ok()?, + }) + } else { + None + } + }) + .flatten() + }) + } } else { Box::new(|_| None) }; @@ -210,22 +225,33 @@ pub(super) fn read_removes(array: &dyn ProvidesColumnByName) -> DeltaResult(d, "sizeInBytes")?; let cardinality = ex::extract_and_cast::(d, "cardinality")?; - Box::new(|idx: usize| { - if ex::read_str(storage_type, idx).is_ok() { - Some(DeletionVectorDescriptor { - storage_type: std::str::FromStr::from_str( - ex::read_str(storage_type, idx).ok()?, - ) - .ok()?, - path_or_inline_dv: ex::read_str(path_or_inline_dv, idx).ok()?.to_string(), - offset: ex::read_primitive_opt(offset, idx), - size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?, - cardinality: ex::read_primitive(cardinality, idx).ok()?, - }) - } else { - None - } - }) + // Column might exist but have nullability set for the whole array, so we just return Nones + if d.null_count() == d.len() { + Box::new(|_| None) + } else { + Box::new(|idx: usize| { + d.is_valid(idx) + .then(|| { + if ex::read_str(storage_type, idx).is_ok() { + Some(DeletionVectorDescriptor { + storage_type: std::str::FromStr::from_str( + ex::read_str(storage_type, idx).ok()?, + ) + .ok()?, + path_or_inline_dv: ex::read_str(path_or_inline_dv, idx) + .ok()? + .to_string(), + offset: ex::read_primitive_opt(offset, idx), + size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?, + cardinality: ex::read_primitive(cardinality, idx).ok()?, + }) + } else { + None + } + }) + .flatten() + }) + } } else { Box::new(|_| None) }; diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index 316ac6dbba..6267a7f3be 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -559,22 +559,32 @@ fn read_file_info<'a>(arr: &'a dyn ProvidesColumnByName) -> DeltaResult(d, "pathOrInlineDv")?; let offset = ex::extract_and_cast::(d, "offset")?; - Box::new(|idx: usize| { - if ex::read_str(storage_type, idx).is_ok() { - Ok(Some(DVInfo { - storage_type: ex::read_str(storage_type, idx)?, - path_or_inline_dv: ex::read_str(path_or_inline_dv, idx)?, - offset: ex::read_primitive_opt(offset, idx), - })) - } else { - Ok(None) - } - }) + // Column might exist but have nullability set for the whole array, so we just return Nones + if d.null_count() == d.len() { + Box::new(|_| Ok(None)) + } else { + Box::new(|idx: usize| { + if d.is_valid(idx) { + if ex::read_str(storage_type, idx).is_ok() { + Ok(Some(DVInfo { + storage_type: ex::read_str(storage_type, idx)?, + path_or_inline_dv: ex::read_str(path_or_inline_dv, idx)?, + offset: ex::read_primitive_opt(offset, idx), + })) + } else { + Ok(None) + } + } else { + Ok(None) + } + }) + } } else { Box::new(|_| Ok(None)) }; let mut adds = Vec::with_capacity(path.len()); + for idx in 0..path.len() { let value = path .is_valid(idx) @@ -587,6 +597,7 @@ fn read_file_info<'a>(arr: &'a dyn ProvidesColumnByName) -> DeltaResult Date: Wed, 4 Dec 2024 19:56:22 +0000 Subject: [PATCH 4/4] chore: bump to 0.22.3 for another release Signed-off-by: R. Tyler Croy --- crates/core/Cargo.toml | 2 +- crates/deltalake/Cargo.toml | 2 +- python/Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 24f4ac8777..57a9496070 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-core" -version = "0.22.2" +version = "0.22.3" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/crates/deltalake/Cargo.toml b/crates/deltalake/Cargo.toml index d7fdb50184..476f0b5d60 100644 --- a/crates/deltalake/Cargo.toml +++ b/crates/deltalake/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake" -version = "0.22.2" +version = "0.22.3" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/python/Cargo.toml b/python/Cargo.toml index fba55dcd31..bb6fbba621 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-python" -version = "0.22.2" +version = "0.22.3" authors = ["Qingping Hou ", "Will Jones "] homepage = "https://github.com/delta-io/delta-rs" license = "Apache-2.0"