Skip to content

Commit

Permalink
fix: add null checks when working with structs
Browse files Browse the repository at this point in the history
Signed-off-by: R. Tyler Croy <[email protected]>
  • Loading branch information
ion-elgreco authored and rtyler committed Dec 4, 2024
1 parent c5f1ff4 commit 8867512
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 71 deletions.
50 changes: 23 additions & 27 deletions crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,23 +245,14 @@ impl LogicalFile<'_> {

/// Defines a deletion vector
pub fn deletion_vector(&self) -> Option<DeletionVectorView<'_>> {
if let Some(arr) = self.deletion_vector.as_ref() {
// With v0.22 and the upgrade to a more recent arrow. Reading nullable structs with
// non-nullable entries back out of parquet is resulting in the DeletionVector having
// an empty string rather than a null. The addition check on the value ensures that a
// [DeletionVectorView] is not created in this scenario
//
// <https://github.com/delta-io/delta-rs/issues/3030>
if arr.storage_type.is_valid(self.index)
&& !arr.storage_type.value(self.index).is_empty()
{
return Some(DeletionVectorView {
self.deletion_vector.as_ref().and_then(|arr| {
arr.storage_type
.is_valid(self.index)
.then_some(DeletionVectorView {
data: arr,
index: self.index,
});
}
}
None
})
})
}

/// The number of records stored in the data file.
Expand Down Expand Up @@ -380,18 +371,23 @@ impl<'a> FileStatsAccessor<'a> {
);
let deletion_vector = extract_and_cast_opt::<StructArray>(data, "add.deletionVector");
let deletion_vector = deletion_vector.and_then(|dv| {
let storage_type = extract_and_cast::<StringArray>(dv, "storageType").ok()?;
let path_or_inline_dv = extract_and_cast::<StringArray>(dv, "pathOrInlineDv").ok()?;
let size_in_bytes = extract_and_cast::<Int32Array>(dv, "sizeInBytes").ok()?;
let cardinality = extract_and_cast::<Int64Array>(dv, "cardinality").ok()?;
let offset = extract_and_cast_opt::<Int32Array>(dv, "offset");
Some(DeletionVector {
storage_type,
path_or_inline_dv,
size_in_bytes,
cardinality,
offset,
})
if dv.null_count() == dv.len() {
None
} else {
let storage_type = extract_and_cast::<StringArray>(dv, "storageType").ok()?;
let path_or_inline_dv =
extract_and_cast::<StringArray>(dv, "pathOrInlineDv").ok()?;
let size_in_bytes = extract_and_cast::<Int32Array>(dv, "sizeInBytes").ok()?;
let cardinality = extract_and_cast::<Int64Array>(dv, "cardinality").ok()?;
let offset = extract_and_cast_opt::<Int32Array>(dv, "offset");
Some(DeletionVector {
storage_type,
path_or_inline_dv,
size_in_bytes,
cardinality,
offset,
})
}
});

Ok(Self {
Expand Down
90 changes: 58 additions & 32 deletions crates/core/src/kernel/snapshot/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ pub(super) fn read_adds(array: &dyn ProvidesColumnByName) -> DeltaResult<Vec<Add
let mut result = Vec::new();

if let Some(arr) = ex::extract_and_cast_opt::<StructArray>(array, "add") {
// Stop early if all values are null
if arr.null_count() == arr.len() {
return Ok(vec![]);
}
let path = ex::extract_and_cast::<StringArray>(arr, "path")?;
let pvs = ex::extract_and_cast_opt::<MapArray>(arr, "partitionValues");
let size = ex::extract_and_cast::<Int64Array>(arr, "size")?;
Expand All @@ -94,22 +98,33 @@ pub(super) fn read_adds(array: &dyn ProvidesColumnByName) -> DeltaResult<Vec<Add
let size_in_bytes = ex::extract_and_cast::<Int32Array>(d, "sizeInBytes")?;
let cardinality = ex::extract_and_cast::<Int64Array>(d, "cardinality")?;

Box::new(|idx: usize| {
if ex::read_str(storage_type, idx).is_ok() {
Some(DeletionVectorDescriptor {
storage_type: std::str::FromStr::from_str(
ex::read_str(storage_type, idx).ok()?,
)
.ok()?,
path_or_inline_dv: ex::read_str(path_or_inline_dv, idx).ok()?.to_string(),
offset: ex::read_primitive_opt(offset, idx),
size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?,
cardinality: ex::read_primitive(cardinality, idx).ok()?,
})
} else {
None
}
})
// Column might exist but have nullability set for the whole array, so we just return Nones
if d.null_count() == d.len() {
Box::new(|_| None)
} else {
Box::new(|idx: usize| {
d.is_valid(idx)
.then(|| {
if ex::read_str(storage_type, idx).is_ok() {
Some(DeletionVectorDescriptor {
storage_type: std::str::FromStr::from_str(
ex::read_str(storage_type, idx).ok()?,
)
.ok()?,
path_or_inline_dv: ex::read_str(path_or_inline_dv, idx)
.ok()?
.to_string(),
offset: ex::read_primitive_opt(offset, idx),
size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?,
cardinality: ex::read_primitive(cardinality, idx).ok()?,
})
} else {
None
}
})
.flatten()
})
}
} else {
Box::new(|_| None)
};
Expand Down Expand Up @@ -210,22 +225,33 @@ pub(super) fn read_removes(array: &dyn ProvidesColumnByName) -> DeltaResult<Vec<
let size_in_bytes = ex::extract_and_cast::<Int32Array>(d, "sizeInBytes")?;
let cardinality = ex::extract_and_cast::<Int64Array>(d, "cardinality")?;

Box::new(|idx: usize| {
if ex::read_str(storage_type, idx).is_ok() {
Some(DeletionVectorDescriptor {
storage_type: std::str::FromStr::from_str(
ex::read_str(storage_type, idx).ok()?,
)
.ok()?,
path_or_inline_dv: ex::read_str(path_or_inline_dv, idx).ok()?.to_string(),
offset: ex::read_primitive_opt(offset, idx),
size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?,
cardinality: ex::read_primitive(cardinality, idx).ok()?,
})
} else {
None
}
})
// Column might exist but have nullability set for the whole array, so we just return Nones
if d.null_count() == d.len() {
Box::new(|_| None)
} else {
Box::new(|idx: usize| {
d.is_valid(idx)
.then(|| {
if ex::read_str(storage_type, idx).is_ok() {
Some(DeletionVectorDescriptor {
storage_type: std::str::FromStr::from_str(
ex::read_str(storage_type, idx).ok()?,
)
.ok()?,
path_or_inline_dv: ex::read_str(path_or_inline_dv, idx)
.ok()?
.to_string(),
offset: ex::read_primitive_opt(offset, idx),
size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?,
cardinality: ex::read_primitive(cardinality, idx).ok()?,
})
} else {
None
}
})
.flatten()
})
}
} else {
Box::new(|_| None)
};
Expand Down
33 changes: 22 additions & 11 deletions crates/core/src/kernel/snapshot/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,22 +559,32 @@ fn read_file_info<'a>(arr: &'a dyn ProvidesColumnByName) -> DeltaResult<Vec<Opti
let path_or_inline_dv = ex::extract_and_cast::<StringArray>(d, "pathOrInlineDv")?;
let offset = ex::extract_and_cast::<Int32Array>(d, "offset")?;

Box::new(|idx: usize| {
if ex::read_str(storage_type, idx).is_ok() {
Ok(Some(DVInfo {
storage_type: ex::read_str(storage_type, idx)?,
path_or_inline_dv: ex::read_str(path_or_inline_dv, idx)?,
offset: ex::read_primitive_opt(offset, idx),
}))
} else {
Ok(None)
}
})
// Column might exist but have nullability set for the whole array, so we just return Nones
if d.null_count() == d.len() {
Box::new(|_| Ok(None))
} else {
Box::new(|idx: usize| {
if d.is_valid(idx) {
if ex::read_str(storage_type, idx).is_ok() {
Ok(Some(DVInfo {
storage_type: ex::read_str(storage_type, idx)?,
path_or_inline_dv: ex::read_str(path_or_inline_dv, idx)?,
offset: ex::read_primitive_opt(offset, idx),
}))
} else {
Ok(None)
}
} else {
Ok(None)
}
})
}
} else {
Box::new(|_| Ok(None))
};

let mut adds = Vec::with_capacity(path.len());

for idx in 0..path.len() {
let value = path
.is_valid(idx)
Expand All @@ -587,6 +597,7 @@ fn read_file_info<'a>(arr: &'a dyn ProvidesColumnByName) -> DeltaResult<Vec<Opti
.transpose()?;
adds.push(value);
}

Ok(adds)
}

Expand Down
1 change: 0 additions & 1 deletion python/tests/test_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,6 @@ def test_checkpoint_with_nullable_false(tmp_path: pathlib.Path):
@pytest.mark.pandas
def test_checkpoint_with_multiple_writes(tmp_path: pathlib.Path):
import pandas as pd

write_deltalake(
tmp_path,
pd.DataFrame(
Expand Down

0 comments on commit 8867512

Please sign in to comment.