From 12209c6a49ef4df755759b26cc76000bbef3cbb9 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Tue, 26 Nov 2024 22:39:02 +0000 Subject: [PATCH 1/3] fix: correct the schema definition of deletion vector Signed-off-by: R. Tyler Croy --- crates/core/src/kernel/models/fields.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/core/src/kernel/models/fields.rs b/crates/core/src/kernel/models/fields.rs index a5a6585060..eda3f375de 100644 --- a/crates/core/src/kernel/models/fields.rs +++ b/crates/core/src/kernel/models/fields.rs @@ -258,11 +258,11 @@ fn deletion_vector_field() -> StructField { StructField::new( "deletionVector", DataType::Struct(Box::new(StructType::new(vec![ - StructField::new("storageType", DataType::STRING, true), - StructField::new("pathOrInlineDv", DataType::STRING, true), + StructField::new("storageType", DataType::STRING, false), + StructField::new("pathOrInlineDv", DataType::STRING, false), StructField::new("offset", DataType::INTEGER, true), - StructField::new("sizeInBytes", DataType::INTEGER, true), - StructField::new("cardinality", DataType::LONG, true), + StructField::new("sizeInBytes", DataType::INTEGER, false), + StructField::new("cardinality", DataType::LONG, false), ]))), true, ) From e358857ec48203694b33628587d217949c766a09 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Mon, 25 Nov 2024 14:23:15 +0000 Subject: [PATCH 2/3] fix: workaround for Add actions being read slightly differently out of parquet files This workaround fixes #3030 but I'm not quite happy about how. I believe there is likely an upstream issue in arrow that I have not been able to reproduce where the nullable struct (deletionVector) which has non-nullable members (e.g. storageType) that are being read as empty strings rather than `null`. This issue started to appear with the v0.22 release which included an upgrade to datafusion 43 and arrow 53. I believe the latter is causing the issue here since it affects non-datafusion code paths. This workaround at least allows us to move forward and release a v0.22.1 while continuing to hunt down the issue. Signed-off-by: R. Tyler Croy --- Cargo.toml | 2 +- crates/core/src/kernel/snapshot/log_data.rs | 23 +++++++---- crates/core/src/protocol/checkpoints.rs | 43 ++++++++++++++++++++- python/tests/test_checkpoint.py | 30 ++++++++++++++ 4 files changed, 89 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index be3ab70677..a48f8c7894 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ debug = "line-tables-only" [workspace.dependencies] delta_kernel = { version = "0.4.1", features = ["sync-engine"] } -# delta_kernel = { path = "../delta-kernel-rs/kernel", version = "0.3.0" } +#delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] } # arrow arrow = { version = "53" } diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index a11f102def..562ca3da90 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -245,14 +245,23 @@ impl LogicalFile<'_> { /// Defines a deletion vector pub fn deletion_vector(&self) -> Option> { - self.deletion_vector.as_ref().and_then(|arr| { - arr.storage_type - .is_valid(self.index) - .then_some(DeletionVectorView { + if let Some(arr) = self.deletion_vector.as_ref() { + // With v0.22 and the upgrade to a more recent arrow. Reading nullable structs with + // non-nullable entries back out of parquet is resulting in the DeletionVector having + // an empty string rather than a null. The addition check on the value ensures that a + // [DeletionVectorView] is not created in this scenario + // + // + if arr.storage_type.is_valid(self.index) + && !arr.storage_type.value(self.index).is_empty() + { + return Some(DeletionVectorView { data: arr, index: self.index, - }) - }) + }); + } + } + None } /// The number of records stored in the data file. @@ -509,7 +518,7 @@ mod datafusion { fn collect_count(&self, name: &str) -> Precision { let num_records = extract_and_cast_opt::(self.stats, name); if let Some(num_records) = num_records { - if num_records.len() == 0 { + if num_records.is_empty() { Precision::Exact(0) } else if let Some(null_count_mulls) = num_records.nulls() { if null_count_mulls.null_count() > 0 { diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index 606642a3e5..42ab5355b7 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -567,6 +567,7 @@ mod tests { use crate::operations::DeltaOps; use crate::protocol::Metadata; use crate::writer::test_utils::get_delta_schema; + use crate::DeltaResult; #[tokio::test] async fn test_create_checkpoint_for() { @@ -1102,7 +1103,7 @@ mod tests { #[ignore = "This test is only useful if the batch size has been made small"] #[tokio::test] - async fn test_checkpoint_large_table() -> crate::DeltaResult<()> { + async fn test_checkpoint_large_table() -> DeltaResult<()> { use crate::writer::test_utils::get_arrow_schema; let table_schema = get_delta_schema(); @@ -1160,4 +1161,44 @@ mod tests { ); Ok(()) } + + /// + #[tokio::test] + async fn test_create_checkpoint_overwrite() -> DeltaResult<()> { + use crate::protocol::SaveMode; + use crate::writer::test_utils::get_arrow_schema; + + let batch = RecordBatch::try_new( + Arc::clone(&get_arrow_schema(&None)), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["C"])), + Arc::new(arrow::array::Int32Array::from(vec![30])), + Arc::new(arrow::array::StringArray::from(vec!["2021-02-03"])), + ], + ) + .unwrap(); + let table = DeltaOps::try_from_uri_with_storage_options("memory://", HashMap::default()) + .await? + .write(vec![batch]) + .await?; + assert_eq!(table.version(), 0); + + create_checkpoint_for(0, table.snapshot().unwrap(), table.log_store.as_ref()).await?; + + let batch = RecordBatch::try_new( + Arc::clone(&get_arrow_schema(&None)), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["A"])), + Arc::new(arrow::array::Int32Array::from(vec![0])), + Arc::new(arrow::array::StringArray::from(vec!["2021-02-02"])), + ], + ) + .unwrap(); + let table = DeltaOps(table) + .write(vec![batch]) + .with_save_mode(SaveMode::Overwrite) + .await?; + assert_eq!(table.version(), 1); + Ok(()) + } } diff --git a/python/tests/test_checkpoint.py b/python/tests/test_checkpoint.py index 5ce6656463..5961a57b09 100644 --- a/python/tests/test_checkpoint.py +++ b/python/tests/test_checkpoint.py @@ -468,3 +468,33 @@ def test_checkpoint_with_nullable_false(tmp_path: pathlib.Path): assert checkpoint_path.exists() assert DeltaTable(str(tmp_table_path)).to_pyarrow_table() == data + + +@pytest.mark.pandas +def test_checkpoint_with_multiple_writes(tmp_path: pathlib.Path): + import pandas as pd + + write_deltalake( + tmp_path, + pd.DataFrame( + { + "a": ["a"], + "b": [3], + } + ), + ) + DeltaTable(tmp_path).create_checkpoint() + + dt = DeltaTable(tmp_path) + print(dt.to_pandas()) + + write_deltalake( + tmp_path, + pd.DataFrame( + { + "a": ["a"], + "b": [100], + } + ), + mode="overwrite", + ) From 53ccd683a316823af4869232db8bf5d477f65b88 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Thu, 28 Nov 2024 15:53:42 +0000 Subject: [PATCH 3/3] chore: bump key versions for the upcoming 0.22.1 release Signed-off-by: R. Tyler Croy --- crates/core/Cargo.toml | 2 +- crates/deltalake/Cargo.toml | 2 +- python/Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 6b31bed778..948139dcc1 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-core" -version = "0.22.0" +version = "0.22.1" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/crates/deltalake/Cargo.toml b/crates/deltalake/Cargo.toml index 9647de92bb..3c5a13172e 100644 --- a/crates/deltalake/Cargo.toml +++ b/crates/deltalake/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake" -version = "0.22.0" +version = "0.22.1" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/python/Cargo.toml b/python/Cargo.toml index 53b5476cf5..c89f68b8c0 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-python" -version = "0.22.0" +version = "0.22.1" authors = ["Qingping Hou ", "Will Jones "] homepage = "https://github.com/delta-io/delta-rs" license = "Apache-2.0"