Skip to content

Commit

Permalink
Merge branch 'main' into chore/move-to-uv
Browse files Browse the repository at this point in the history
  • Loading branch information
thomas-chauvet authored Dec 5, 2024
2 parents 6bcc836 + 98f8b0b commit 33ad5cb
Show file tree
Hide file tree
Showing 20 changed files with 494 additions and 98 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ debug = true
debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { version = "0.4.1", features = ["sync-engine"] }
delta_kernel = { version = "0.4.1", features = ["default-engine"] }
#delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] }

# arrow
Expand Down
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-core"
version = "0.22.1"
version = "0.22.3"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down
86 changes: 82 additions & 4 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,9 +826,12 @@ impl TableProvider for DeltaTableProvider {

fn supports_filters_pushdown(
&self,
_filter: &[&Expr],
filter: &[&Expr],
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact])
Ok(filter
.iter()
.map(|_| TableProviderFilterPushDown::Inexact)
.collect())
}

fn statistics(&self) -> Option<Statistics> {
Expand Down Expand Up @@ -1150,11 +1153,12 @@ pub(crate) async fn execute_plan_to_batch(
Ok(concat_batches(&plan.schema(), data.iter())?)
}

/// Responsible for checking batches of data conform to table's invariants.
#[derive(Clone)]
/// Responsible for checking batches of data conform to table's invariants, constraints and nullability.
#[derive(Clone, Default)]
pub struct DeltaDataChecker {
constraints: Vec<Constraint>,
invariants: Vec<Invariant>,
non_nullable_columns: Vec<String>,
ctx: SessionContext,
}

Expand All @@ -1164,6 +1168,7 @@ impl DeltaDataChecker {
Self {
invariants: vec![],
constraints: vec![],
non_nullable_columns: vec![],
ctx: DeltaSessionContext::default().into(),
}
}
Expand All @@ -1173,6 +1178,7 @@ impl DeltaDataChecker {
Self {
invariants,
constraints: vec![],
non_nullable_columns: vec![],
ctx: DeltaSessionContext::default().into(),
}
}
Expand All @@ -1182,6 +1188,7 @@ impl DeltaDataChecker {
Self {
constraints,
invariants: vec![],
non_nullable_columns: vec![],
ctx: DeltaSessionContext::default().into(),
}
}
Expand All @@ -1202,9 +1209,21 @@ impl DeltaDataChecker {
pub fn new(snapshot: &DeltaTableState) -> Self {
let invariants = snapshot.schema().get_invariants().unwrap_or_default();
let constraints = snapshot.table_config().get_constraints();
let non_nullable_columns = snapshot
.schema()
.fields()
.filter_map(|f| {
if !f.is_nullable() {
Some(f.name().clone())
} else {
None
}
})
.collect_vec();
Self {
invariants,
constraints,
non_nullable_columns,
ctx: DeltaSessionContext::default().into(),
}
}
Expand All @@ -1214,10 +1233,35 @@ impl DeltaDataChecker {
/// If it does not, it will return [DeltaTableError::InvalidData] with a list
/// of values that violated each invariant.
pub async fn check_batch(&self, record_batch: &RecordBatch) -> Result<(), DeltaTableError> {
self.check_nullability(record_batch)?;
self.enforce_checks(record_batch, &self.invariants).await?;
self.enforce_checks(record_batch, &self.constraints).await
}

/// Return true if all the nullability checks are valid
fn check_nullability(&self, record_batch: &RecordBatch) -> Result<bool, DeltaTableError> {
let mut violations = Vec::new();
for col in self.non_nullable_columns.iter() {
if let Some(arr) = record_batch.column_by_name(col) {
if arr.null_count() > 0 {
violations.push(format!(
"Non-nullable column violation for {col}, found {} null values",
arr.null_count()
));
}
} else {
violations.push(format!(
"Non-nullable column violation for {col}, not found in batch!"
));
}
}
if !violations.is_empty() {
Err(DeltaTableError::InvalidData { violations })
} else {
Ok(true)
}
}

async fn enforce_checks<C: DataCheck>(
&self,
record_batch: &RecordBatch,
Expand Down Expand Up @@ -2598,4 +2642,38 @@ mod tests {

assert_eq!(actual.len(), 0);
}

#[tokio::test]
async fn test_check_nullability() -> DeltaResult<()> {
use arrow::array::StringArray;

let data_checker = DeltaDataChecker {
non_nullable_columns: vec!["zed".to_string(), "yap".to_string()],
..Default::default()
};

let arr: Arc<dyn Array> = Arc::new(StringArray::from(vec!["s"]));
let nulls: Arc<dyn Array> = Arc::new(StringArray::new_null(1));
let batch = RecordBatch::try_from_iter(vec![("a", arr), ("zed", nulls)]).unwrap();

let result = data_checker.check_nullability(&batch);
assert!(
result.is_err(),
"The result should have errored! {result:?}"
);

let arr: Arc<dyn Array> = Arc::new(StringArray::from(vec!["s"]));
let batch = RecordBatch::try_from_iter(vec![("zed", arr)]).unwrap();
let result = data_checker.check_nullability(&batch);
assert!(
result.is_err(),
"The result should have errored! {result:?}"
);

let arr: Arc<dyn Array> = Arc::new(StringArray::from(vec!["s"]));
let batch = RecordBatch::try_from_iter(vec![("zed", arr.clone()), ("yap", arr)]).unwrap();
let _ = data_checker.check_nullability(&batch)?;

Ok(())
}
}
50 changes: 23 additions & 27 deletions crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,23 +245,14 @@ impl LogicalFile<'_> {

/// Defines a deletion vector
pub fn deletion_vector(&self) -> Option<DeletionVectorView<'_>> {
if let Some(arr) = self.deletion_vector.as_ref() {
// With v0.22 and the upgrade to a more recent arrow. Reading nullable structs with
// non-nullable entries back out of parquet is resulting in the DeletionVector having
// an empty string rather than a null. The addition check on the value ensures that a
// [DeletionVectorView] is not created in this scenario
//
// <https://github.com/delta-io/delta-rs/issues/3030>
if arr.storage_type.is_valid(self.index)
&& !arr.storage_type.value(self.index).is_empty()
{
return Some(DeletionVectorView {
self.deletion_vector.as_ref().and_then(|arr| {
arr.storage_type
.is_valid(self.index)
.then_some(DeletionVectorView {
data: arr,
index: self.index,
});
}
}
None
})
})
}

/// The number of records stored in the data file.
Expand Down Expand Up @@ -380,18 +371,23 @@ impl<'a> FileStatsAccessor<'a> {
);
let deletion_vector = extract_and_cast_opt::<StructArray>(data, "add.deletionVector");
let deletion_vector = deletion_vector.and_then(|dv| {
let storage_type = extract_and_cast::<StringArray>(dv, "storageType").ok()?;
let path_or_inline_dv = extract_and_cast::<StringArray>(dv, "pathOrInlineDv").ok()?;
let size_in_bytes = extract_and_cast::<Int32Array>(dv, "sizeInBytes").ok()?;
let cardinality = extract_and_cast::<Int64Array>(dv, "cardinality").ok()?;
let offset = extract_and_cast_opt::<Int32Array>(dv, "offset");
Some(DeletionVector {
storage_type,
path_or_inline_dv,
size_in_bytes,
cardinality,
offset,
})
if dv.null_count() == dv.len() {
None
} else {
let storage_type = extract_and_cast::<StringArray>(dv, "storageType").ok()?;
let path_or_inline_dv =
extract_and_cast::<StringArray>(dv, "pathOrInlineDv").ok()?;
let size_in_bytes = extract_and_cast::<Int32Array>(dv, "sizeInBytes").ok()?;
let cardinality = extract_and_cast::<Int64Array>(dv, "cardinality").ok()?;
let offset = extract_and_cast_opt::<Int32Array>(dv, "offset");
Some(DeletionVector {
storage_type,
path_or_inline_dv,
size_in_bytes,
cardinality,
offset,
})
}
});

Ok(Self {
Expand Down
90 changes: 58 additions & 32 deletions crates/core/src/kernel/snapshot/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ pub(super) fn read_adds(array: &dyn ProvidesColumnByName) -> DeltaResult<Vec<Add
let mut result = Vec::new();

if let Some(arr) = ex::extract_and_cast_opt::<StructArray>(array, "add") {
// Stop early if all values are null
if arr.null_count() == arr.len() {
return Ok(vec![]);
}
let path = ex::extract_and_cast::<StringArray>(arr, "path")?;
let pvs = ex::extract_and_cast_opt::<MapArray>(arr, "partitionValues");
let size = ex::extract_and_cast::<Int64Array>(arr, "size")?;
Expand All @@ -94,22 +98,33 @@ pub(super) fn read_adds(array: &dyn ProvidesColumnByName) -> DeltaResult<Vec<Add
let size_in_bytes = ex::extract_and_cast::<Int32Array>(d, "sizeInBytes")?;
let cardinality = ex::extract_and_cast::<Int64Array>(d, "cardinality")?;

Box::new(|idx: usize| {
if ex::read_str(storage_type, idx).is_ok() {
Some(DeletionVectorDescriptor {
storage_type: std::str::FromStr::from_str(
ex::read_str(storage_type, idx).ok()?,
)
.ok()?,
path_or_inline_dv: ex::read_str(path_or_inline_dv, idx).ok()?.to_string(),
offset: ex::read_primitive_opt(offset, idx),
size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?,
cardinality: ex::read_primitive(cardinality, idx).ok()?,
})
} else {
None
}
})
// Column might exist but have nullability set for the whole array, so we just return Nones
if d.null_count() == d.len() {
Box::new(|_| None)
} else {
Box::new(|idx: usize| {
d.is_valid(idx)
.then(|| {
if ex::read_str(storage_type, idx).is_ok() {
Some(DeletionVectorDescriptor {
storage_type: std::str::FromStr::from_str(
ex::read_str(storage_type, idx).ok()?,
)
.ok()?,
path_or_inline_dv: ex::read_str(path_or_inline_dv, idx)
.ok()?
.to_string(),
offset: ex::read_primitive_opt(offset, idx),
size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?,
cardinality: ex::read_primitive(cardinality, idx).ok()?,
})
} else {
None
}
})
.flatten()
})
}
} else {
Box::new(|_| None)
};
Expand Down Expand Up @@ -210,22 +225,33 @@ pub(super) fn read_removes(array: &dyn ProvidesColumnByName) -> DeltaResult<Vec<
let size_in_bytes = ex::extract_and_cast::<Int32Array>(d, "sizeInBytes")?;
let cardinality = ex::extract_and_cast::<Int64Array>(d, "cardinality")?;

Box::new(|idx: usize| {
if ex::read_str(storage_type, idx).is_ok() {
Some(DeletionVectorDescriptor {
storage_type: std::str::FromStr::from_str(
ex::read_str(storage_type, idx).ok()?,
)
.ok()?,
path_or_inline_dv: ex::read_str(path_or_inline_dv, idx).ok()?.to_string(),
offset: ex::read_primitive_opt(offset, idx),
size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?,
cardinality: ex::read_primitive(cardinality, idx).ok()?,
})
} else {
None
}
})
// Column might exist but have nullability set for the whole array, so we just return Nones
if d.null_count() == d.len() {
Box::new(|_| None)
} else {
Box::new(|idx: usize| {
d.is_valid(idx)
.then(|| {
if ex::read_str(storage_type, idx).is_ok() {
Some(DeletionVectorDescriptor {
storage_type: std::str::FromStr::from_str(
ex::read_str(storage_type, idx).ok()?,
)
.ok()?,
path_or_inline_dv: ex::read_str(path_or_inline_dv, idx)
.ok()?
.to_string(),
offset: ex::read_primitive_opt(offset, idx),
size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?,
cardinality: ex::read_primitive(cardinality, idx).ok()?,
})
} else {
None
}
})
.flatten()
})
}
} else {
Box::new(|_| None)
};
Expand Down
Loading

0 comments on commit 33ad5cb

Please sign in to comment.