Skip to content

Commit

Permalink
feat: check constraints (delta-io#1915)
Browse files Browse the repository at this point in the history
# Description
This PR adds CHECK constraints on delta tables. I still have some
outstanding work to do on this, but this is my working draft PR for this
feature.

```rust
let constraint = table.add_constraint().with_constraint("id", "id < 100");
constraint.await?;
```

# Related Issue(s)
delta-io#1881 

# Documentation

<!---
Share links to useful documentation
--->

---------

Co-authored-by: Stephen Carman <[email protected]>
Co-authored-by: scarman-db <[email protected]>
  • Loading branch information
3 people authored Dec 14, 2023
1 parent 6d3caf9 commit 2d65e72
Show file tree
Hide file tree
Showing 12 changed files with 499 additions and 38 deletions.
94 changes: 71 additions & 23 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaR
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use arrow_array::types::UInt16Type;
use arrow_array::{DictionaryArray, StringArray};
use arrow_array::{Array, DictionaryArray, StringArray};
use arrow_cast::display::array_value_to_string;

use arrow_schema::Field;
use async_trait::async_trait;
use chrono::{NaiveDateTime, TimeZone, Utc};
Expand Down Expand Up @@ -66,17 +68,20 @@ use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_sql::planner::ParserOptions;

use itertools::Itertools;
use log::error;
use object_store::ObjectMeta;
use serde::{Deserialize, Serialize};
use url::Url;

use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, DataType as DeltaDataType, Invariant, PrimitiveType};
use crate::kernel::{Add, DataCheck, DataType as DeltaDataType, Invariant, PrimitiveType};
use crate::logstore::LogStoreRef;
use crate::protocol::{ColumnCountStat, ColumnValueStat};
use crate::table::builder::ensure_table_uri;
use crate::table::state::DeltaTableState;
use crate::table::Constraint;
use crate::{open_table, open_table_with_storage_options, DeltaTable};

const PATH_COLUMN: &str = "__delta_rs_path";
Expand Down Expand Up @@ -1017,15 +1022,43 @@ pub(crate) fn logical_expr_to_physical_expr(
/// Responsible for checking batches of data conform to table's invariants.
#[derive(Clone)]
pub struct DeltaDataChecker {
constraints: Vec<Constraint>,
invariants: Vec<Invariant>,
ctx: SessionContext,
}

impl DeltaDataChecker {
/// Create a new DeltaDataChecker with a specified set of invariants
pub fn new_with_invariants(invariants: Vec<Invariant>) -> Self {
Self {
invariants,
constraints: vec![],
ctx: SessionContext::new(),
}
}

/// Create a new DeltaDataChecker with a specified set of constraints
pub fn new_with_constraints(constraints: Vec<Constraint>) -> Self {
Self {
constraints,
invariants: vec![],
ctx: SessionContext::new(),
}
}

/// Create a new DeltaDataChecker
pub fn new(invariants: Vec<Invariant>) -> Self {
pub fn new(snapshot: &DeltaTableState) -> Self {
let metadata = snapshot.metadata();

let invariants = metadata
.and_then(|meta| meta.schema.get_invariants().ok())
.unwrap_or_default();
let constraints = metadata
.map(|meta| meta.get_constraints())
.unwrap_or_default();
Self {
invariants,
constraints,
ctx: SessionContext::new(),
}
}
Expand All @@ -1035,45 +1068,54 @@ impl DeltaDataChecker {
/// If it does not, it will return [DeltaTableError::InvalidData] with a list
/// of values that violated each invariant.
pub async fn check_batch(&self, record_batch: &RecordBatch) -> Result<(), DeltaTableError> {
self.enforce_invariants(record_batch).await
// TODO: for support for Protocol V3, check constraints
self.enforce_checks(record_batch, &self.invariants).await?;
self.enforce_checks(record_batch, &self.constraints).await
}

async fn enforce_invariants(&self, record_batch: &RecordBatch) -> Result<(), DeltaTableError> {
// Invariants are deprecated, so let's not pay the overhead for any of this
// if we can avoid it.
if self.invariants.is_empty() {
async fn enforce_checks<C: DataCheck>(
&self,
record_batch: &RecordBatch,
checks: &[C],
) -> Result<(), DeltaTableError> {
if checks.is_empty() {
return Ok(());
}

let table = MemTable::try_new(record_batch.schema(), vec![vec![record_batch.clone()]])?;
self.ctx.register_table("data", Arc::new(table))?;

let mut violations: Vec<String> = Vec::new();

for invariant in self.invariants.iter() {
if invariant.field_name.contains('.') {
for check in checks {
if check.get_name().contains('.') {
return Err(DeltaTableError::Generic(
"Support for column invariants on nested columns is not supported.".to_string(),
"Support for nested columns is not supported.".to_string(),
));
}

let sql = format!(
"SELECT {} FROM data WHERE not ({}) LIMIT 1",
invariant.field_name, invariant.invariant_sql
"SELECT {} FROM data WHERE NOT ({}) LIMIT 1",
check.get_name(),
check.get_expression()
);

let dfs: Vec<RecordBatch> = self.ctx.sql(&sql).await?.collect().await?;
if !dfs.is_empty() && dfs[0].num_rows() > 0 {
let value = format!("{:?}", dfs[0].column(0));
let value: String = dfs[0]
.columns()
.iter()
.map(|c| array_value_to_string(c, 0).unwrap_or(String::from("null")))
.join(", ");

let msg = format!(
"Invariant ({}) violated by value {}",
invariant.invariant_sql, value
"Check or Invariant ({}) violated by value in row: [{}]",
check.get_expression(),
value
);
violations.push(msg);
}
}

self.ctx.deregister_table("data")?;
if !violations.is_empty() {
Err(DeltaTableError::InvalidData { violations })
} else {
Expand Down Expand Up @@ -1747,7 +1789,7 @@ mod tests {
.unwrap();
// Empty invariants is okay
let invariants: Vec<Invariant> = vec![];
assert!(DeltaDataChecker::new(invariants)
assert!(DeltaDataChecker::new_with_invariants(invariants)
.check_batch(&batch)
.await
.is_ok());
Expand All @@ -1757,7 +1799,7 @@ mod tests {
Invariant::new("a", "a is not null"),
Invariant::new("b", "b < 1000"),
];
assert!(DeltaDataChecker::new(invariants)
assert!(DeltaDataChecker::new_with_invariants(invariants)
.check_batch(&batch)
.await
.is_ok());
Expand All @@ -1767,7 +1809,9 @@ mod tests {
Invariant::new("a", "a is null"),
Invariant::new("b", "b < 100"),
];
let result = DeltaDataChecker::new(invariants).check_batch(&batch).await;
let result = DeltaDataChecker::new_with_invariants(invariants)
.check_batch(&batch)
.await;
assert!(result.is_err());
assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
if let Err(DeltaTableError::InvalidData { violations }) = result {
Expand All @@ -1776,7 +1820,9 @@ mod tests {

// Irrelevant invariants return a different error
let invariants = vec![Invariant::new("c", "c > 2000")];
let result = DeltaDataChecker::new(invariants).check_batch(&batch).await;
let result = DeltaDataChecker::new_with_invariants(invariants)
.check_batch(&batch)
.await;
assert!(result.is_err());

// Nested invariants are unsupported
Expand All @@ -1790,7 +1836,9 @@ mod tests {
let batch = RecordBatch::try_new(schema, vec![inner]).unwrap();

let invariants = vec![Invariant::new("x.b", "x.b < 1000")];
let result = DeltaDataChecker::new(invariants).check_batch(&batch).await;
let result = DeltaDataChecker::new_with_invariants(invariants)
.check_batch(&batch)
.await;
assert!(result.is_err());
assert!(matches!(result, Err(DeltaTableError::Generic { .. })));
}
Expand Down
2 changes: 2 additions & 0 deletions crates/deltalake-core/src/kernel/actions/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,11 @@ pub struct Protocol {
pub min_writer_version: i32,
/// A collection of features that a client must implement in order to correctly
/// read this table (exist only when minReaderVersion is set to 3)
#[serde(skip_serializing_if = "Option::is_none")]
pub reader_features: Option<HashSet<ReaderFeatures>>,
/// A collection of features that a client must implement in order to correctly
/// write this table (exist only when minWriterVersion is set to 7)
#[serde(skip_serializing_if = "Option::is_none")]
pub writer_features: Option<HashSet<WriterFeatures>>,
}

Expand Down
8 changes: 8 additions & 0 deletions crates/deltalake-core/src/kernel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,11 @@ pub use actions::*;
pub use error::*;
pub use expressions::*;
pub use schema::*;

/// A trait for all kernel types that are used as part of data checking
pub trait DataCheck {
/// The name of the specific check
fn get_name(&self) -> &str;
/// The SQL expression to use for the check
fn get_expression(&self) -> &str;
}
11 changes: 11 additions & 0 deletions crates/deltalake-core/src/kernel/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::{collections::HashMap, fmt::Display};

use crate::kernel::DataCheck;
use serde::{Deserialize, Serialize};
use serde_json::Value;

Expand Down Expand Up @@ -97,6 +98,16 @@ impl Invariant {
}
}

impl DataCheck for Invariant {
fn get_name(&self) -> &str {
&self.field_name
}

fn get_expression(&self) -> &str {
&self.invariant_sql
}
}

/// Represents a struct field defined in the Delta table schema.
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Schema-Serialization-Format
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
Expand Down
Loading

0 comments on commit 2d65e72

Please sign in to comment.