Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: check constraints #1915

Merged
merged 19 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
aec1add
feat: Add CHECK constraint functionality
scarman-db Nov 24, 2023
34a3180
feat: check constraints
scarman-db Nov 26, 2023
0c44f10
Merge remote-tracking branch 'upstream/main' into check-constraints
hntd187 Nov 26, 2023
3192887
feat: CHECK constraints
hntd187 Nov 26, 2023
d6020e8
feat: CHECK constraints, added check on table for initial add of cons…
hntd187 Dec 2, 2023
916e1e7
feat: CHECK constraints, added check on table for initial add of cons…
hntd187 Dec 3, 2023
e139f33
Merge branch 'main' into check-constraints
scarman-db Dec 4, 2023
305bae9
feat: check constraints
scarman-db Dec 4, 2023
8f913e7
feat: CHECK constraints, added check on table for initial add of cons…
hntd187 Dec 6, 2023
c83c7cf
Merge branch 'main' into check-constraints
hntd187 Dec 6, 2023
fba0b50
feat: CHECK constraints, added check on table for initial add of cons…
hntd187 Dec 6, 2023
811a6d4
feat: CHECK constraints addressing a lint
hntd187 Dec 6, 2023
e906f3c
feat: check constraints, removed physical test table, rewrote tests t…
scarman-db Dec 8, 2023
b475f29
Merge branch 'main' into check-constraints
scarman-db Dec 9, 2023
c5e23c1
feat: check constraints, fix a lint
scarman-db Dec 9, 2023
9d88746
Merge branch 'main' into check-constraints
hntd187 Dec 13, 2023
1efd1fe
feat: CHECK constraints addressing a lint
hntd187 Dec 13, 2023
e4f59a3
feat: CHECK constraints, back out limiting features behind feature fl…
hntd187 Dec 14, 2023
62894a0
Merge branch 'main' into check-constraints
ion-elgreco Dec 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 71 additions & 23 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaR
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use arrow_array::types::UInt16Type;
use arrow_array::{DictionaryArray, StringArray};
use arrow_array::{Array, DictionaryArray, StringArray};
use arrow_cast::display::array_value_to_string;

use arrow_schema::Field;
use async_trait::async_trait;
use chrono::{NaiveDateTime, TimeZone, Utc};
Expand Down Expand Up @@ -66,17 +68,20 @@ use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_sql::planner::ParserOptions;

use itertools::Itertools;
use log::error;
use object_store::ObjectMeta;
use serde::{Deserialize, Serialize};
use url::Url;

use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, DataType as DeltaDataType, Invariant, PrimitiveType};
use crate::kernel::{Add, DataCheck, DataType as DeltaDataType, Invariant, PrimitiveType};
use crate::logstore::LogStoreRef;
use crate::protocol::{ColumnCountStat, ColumnValueStat};
use crate::table::builder::ensure_table_uri;
use crate::table::state::DeltaTableState;
use crate::table::Constraint;
use crate::{open_table, open_table_with_storage_options, DeltaTable};

const PATH_COLUMN: &str = "__delta_rs_path";
Expand Down Expand Up @@ -1017,15 +1022,43 @@ pub(crate) fn logical_expr_to_physical_expr(
/// Responsible for checking batches of data conform to table's invariants.
#[derive(Clone)]
pub struct DeltaDataChecker {
constraints: Vec<Constraint>,
invariants: Vec<Invariant>,
ctx: SessionContext,
}

impl DeltaDataChecker {
/// Create a new DeltaDataChecker with a specified set of invariants
pub fn new_with_invariants(invariants: Vec<Invariant>) -> Self {
Self {
invariants,
constraints: vec![],
ctx: SessionContext::new(),
}
}

/// Create a new DeltaDataChecker with a specified set of constraints
pub fn new_with_constraints(constraints: Vec<Constraint>) -> Self {
Self {
constraints,
invariants: vec![],
ctx: SessionContext::new(),
}
}

/// Create a new DeltaDataChecker
pub fn new(invariants: Vec<Invariant>) -> Self {
pub fn new(snapshot: &DeltaTableState) -> Self {
let metadata = snapshot.metadata();

let invariants = metadata
.and_then(|meta| meta.schema.get_invariants().ok())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just had a quick look, but i think get_invariants only errors when there are invariants defined, but in an illegal format. This is something we should return as an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was copied from the place it originally was prior to invariant enforcement in the data checker. Perhaps since this is not related to check constraints and slightly alters what would happen normally can we have a followup PR on it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. but maybe add a TODO?

.unwrap_or_default();
let constraints = metadata
.map(|meta| meta.get_constraints())
.unwrap_or_default();
Self {
invariants,
constraints,
ctx: SessionContext::new(),
}
}
Expand All @@ -1035,45 +1068,54 @@ impl DeltaDataChecker {
/// If it does not, it will return [DeltaTableError::InvalidData] with a list
/// of values that violated each invariant.
pub async fn check_batch(&self, record_batch: &RecordBatch) -> Result<(), DeltaTableError> {
self.enforce_invariants(record_batch).await
// TODO: for support for Protocol V3, check constraints
self.enforce_checks(record_batch, &self.invariants).await?;
self.enforce_checks(record_batch, &self.constraints).await
}

async fn enforce_invariants(&self, record_batch: &RecordBatch) -> Result<(), DeltaTableError> {
// Invariants are deprecated, so let's not pay the overhead for any of this
// if we can avoid it.
if self.invariants.is_empty() {
async fn enforce_checks<C: DataCheck>(
&self,
record_batch: &RecordBatch,
checks: &[C],
) -> Result<(), DeltaTableError> {
if checks.is_empty() {
return Ok(());
}

let table = MemTable::try_new(record_batch.schema(), vec![vec![record_batch.clone()]])?;
self.ctx.register_table("data", Arc::new(table))?;

let mut violations: Vec<String> = Vec::new();

for invariant in self.invariants.iter() {
if invariant.field_name.contains('.') {
for check in checks {
if check.get_name().contains('.') {
return Err(DeltaTableError::Generic(
"Support for column invariants on nested columns is not supported.".to_string(),
"Support for nested columns is not supported.".to_string(),
));
}

let sql = format!(
"SELECT {} FROM data WHERE not ({}) LIMIT 1",
invariant.field_name, invariant.invariant_sql
"SELECT {} FROM data WHERE NOT ({}) LIMIT 1",
check.get_name(),
check.get_expression()
);

let dfs: Vec<RecordBatch> = self.ctx.sql(&sql).await?.collect().await?;
if !dfs.is_empty() && dfs[0].num_rows() > 0 {
let value = format!("{:?}", dfs[0].column(0));
let value: String = dfs[0]
.columns()
.iter()
.map(|c| array_value_to_string(c, 0).unwrap_or(String::from("null")))
.join(", ");

let msg = format!(
"Invariant ({}) violated by value {}",
invariant.invariant_sql, value
"Check or Invariant ({}) violated by value in row: [{}]",
check.get_expression(),
value
);
violations.push(msg);
}
}

self.ctx.deregister_table("data")?;
if !violations.is_empty() {
Err(DeltaTableError::InvalidData { violations })
} else {
Expand Down Expand Up @@ -1747,7 +1789,7 @@ mod tests {
.unwrap();
// Empty invariants is okay
let invariants: Vec<Invariant> = vec![];
assert!(DeltaDataChecker::new(invariants)
assert!(DeltaDataChecker::new_with_invariants(invariants)
.check_batch(&batch)
.await
.is_ok());
Expand All @@ -1757,7 +1799,7 @@ mod tests {
Invariant::new("a", "a is not null"),
Invariant::new("b", "b < 1000"),
];
assert!(DeltaDataChecker::new(invariants)
assert!(DeltaDataChecker::new_with_invariants(invariants)
.check_batch(&batch)
.await
.is_ok());
Expand All @@ -1767,7 +1809,9 @@ mod tests {
Invariant::new("a", "a is null"),
Invariant::new("b", "b < 100"),
];
let result = DeltaDataChecker::new(invariants).check_batch(&batch).await;
let result = DeltaDataChecker::new_with_invariants(invariants)
.check_batch(&batch)
.await;
assert!(result.is_err());
assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
if let Err(DeltaTableError::InvalidData { violations }) = result {
Expand All @@ -1776,7 +1820,9 @@ mod tests {

// Irrelevant invariants return a different error
let invariants = vec![Invariant::new("c", "c > 2000")];
let result = DeltaDataChecker::new(invariants).check_batch(&batch).await;
let result = DeltaDataChecker::new_with_invariants(invariants)
.check_batch(&batch)
.await;
assert!(result.is_err());

// Nested invariants are unsupported
Expand All @@ -1790,7 +1836,9 @@ mod tests {
let batch = RecordBatch::try_new(schema, vec![inner]).unwrap();

let invariants = vec![Invariant::new("x.b", "x.b < 1000")];
let result = DeltaDataChecker::new(invariants).check_batch(&batch).await;
let result = DeltaDataChecker::new_with_invariants(invariants)
.check_batch(&batch)
.await;
assert!(result.is_err());
assert!(matches!(result, Err(DeltaTableError::Generic { .. })));
}
Expand Down
2 changes: 2 additions & 0 deletions crates/deltalake-core/src/kernel/actions/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,11 @@ pub struct Protocol {
pub min_writer_version: i32,
/// A collection of features that a client must implement in order to correctly
/// read this table (exist only when minReaderVersion is set to 3)
#[serde(skip_serializing_if = "Option::is_none")]
pub reader_features: Option<HashSet<ReaderFeatures>>,
/// A collection of features that a client must implement in order to correctly
/// write this table (exist only when minWriterVersion is set to 7)
#[serde(skip_serializing_if = "Option::is_none")]
pub writer_features: Option<HashSet<WriterFeatures>>,
}

Expand Down
8 changes: 8 additions & 0 deletions crates/deltalake-core/src/kernel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,11 @@ pub use actions::*;
pub use error::*;
pub use expressions::*;
pub use schema::*;

/// A trait for all kernel types that are used as part of data checking
pub trait DataCheck {
/// The name of the specific check
fn get_name(&self) -> &str;
/// The SQL expression to use for the check
fn get_expression(&self) -> &str;
}
11 changes: 11 additions & 0 deletions crates/deltalake-core/src/kernel/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::{collections::HashMap, fmt::Display};

use crate::kernel::DataCheck;
use serde::{Deserialize, Serialize};
use serde_json::Value;

Expand Down Expand Up @@ -97,6 +98,16 @@ impl Invariant {
}
}

impl DataCheck for Invariant {
fn get_name(&self) -> &str {
&self.field_name
}

fn get_expression(&self) -> &str {
&self.invariant_sql
}
}

/// Represents a struct field defined in the Delta table schema.
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Schema-Serialization-Format
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
Expand Down
Loading
Loading