Skip to content

Commit

Permalink
Merge branch 'main' into fix_urlencoded_tombstones
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco authored Dec 20, 2023
2 parents c94c86b + bc9253c commit aff4057
Show file tree
Hide file tree
Showing 39 changed files with 1,956 additions and 321 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/docs_release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: docs_release

on:
pull_request:
types:
- closed
branches: [main]
paths:
- docs/**
- mkdocs.yml

jobs:
release-docs:
if: github.event.pull_request.merged == true
runs-on: ubuntu-latest
steps:
- name: Trigger the docs release event
uses: peter-evans/repository-dispatch@v2
with:
event-type: release-docs
client-payload: >
{
"tag": "${{ github.ref_name }}"
}
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc
| Version 2 | Column Invariants | ![done] |
| Version 3 | Enforce `delta.checkpoint.writeStatsAsJson` | [![open]][writer-rs] |
| Version 3 | Enforce `delta.checkpoint.writeStatsAsStruct` | [![open]][writer-rs] |
| Version 3 | CHECK constraints | [![open]][writer-rs] |
| Version 3 | CHECK constraints | [![semi-done]][check-constraints] |
| Version 4 | Change Data Feed | |
| Version 4 | Generated Columns | |
| Version 5 | Column Mapping | |
Expand All @@ -185,5 +185,6 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc
[merge-py]: https://github.com/delta-io/delta-rs/issues/1357
[merge-rs]: https://github.com/delta-io/delta-rs/issues/850
[writer-rs]: https://github.com/delta-io/delta-rs/issues/851
[check-constraints]: https://github.com/delta-io/delta-rs/issues/1881
[onelake-rs]: https://github.com/delta-io/delta-rs/issues/1418
[protocol]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md
26 changes: 26 additions & 0 deletions crates/deltalake-core/src/data_catalog/unity/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub enum ListTableSummariesResponse {
/// Successful response
Success {
/// Basic table infos
#[serde(default)]
tables: Vec<TableSummary>,
/// Continuation token
next_page_token: Option<String>,
Expand Down Expand Up @@ -441,6 +442,16 @@ pub(crate) mod tests {
}
"#;

pub(crate) const LIST_TABLES: &str = r#"
{
"tables": [{
"full_name": "catalog.schema.table_name",
"table_type": "MANAGED"
}]
}
"#;
pub(crate) const LIST_TABLES_EMPTY: &str = "{}";

#[test]
fn test_responses() {
let list_schemas: Result<ListSchemasResponse, _> =
Expand All @@ -458,6 +469,21 @@ pub(crate) mod tests {
GetTableResponse::Success { .. }
));

let list_tables: Result<ListTableSummariesResponse, _> = serde_json::from_str(LIST_TABLES);
assert!(list_tables.is_ok());
assert!(matches!(
list_tables.unwrap(),
ListTableSummariesResponse::Success { .. }
));

let list_tables: Result<ListTableSummariesResponse, _> =
serde_json::from_str(LIST_TABLES_EMPTY);
assert!(list_tables.is_ok());
assert!(matches!(
list_tables.unwrap(),
ListTableSummariesResponse::Success { .. }
));

let get_schema: Result<GetSchemaResponse, _> = serde_json::from_str(GET_SCHEMA_RESPONSE);
assert!(get_schema.is_ok());
assert!(matches!(get_schema.unwrap(), GetSchemaResponse::Success(_)))
Expand Down
15 changes: 12 additions & 3 deletions crates/deltalake-core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,10 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> {
mod test {
use arrow_schema::DataType as ArrowDataType;
use datafusion::prelude::SessionContext;
use datafusion_common::{DFSchema, ScalarValue};
use datafusion_common::{Column, DFSchema, ScalarValue};
use datafusion_expr::{col, decode, lit, substring, Cast, Expr, ExprSchemable};

use crate::delta_datafusion::DeltaSessionContext;
use crate::kernel::{DataType, PrimitiveType, StructField, StructType};
use crate::{DeltaOps, DeltaTable};

Expand Down Expand Up @@ -388,6 +389,11 @@ mod test {
DataType::Primitive(PrimitiveType::Integer),
true,
),
StructField::new(
"Value3".to_string(),
DataType::Primitive(PrimitiveType::Integer),
true,
),
StructField::new(
"modified".to_string(),
DataType::Primitive(PrimitiveType::String),
Expand Down Expand Up @@ -442,7 +448,10 @@ mod test {
}),
"arrow_cast(1, 'Int32')".to_string()
),
simple!(col("value").eq(lit(3_i64)), "value = 3".to_string()),
simple!(
Expr::Column(Column::from_qualified_name_ignore_case("Value3")).eq(lit(3_i64)),
"Value3 = 3".to_string()
),
simple!(col("active").is_true(), "active IS TRUE".to_string()),
simple!(col("active"), "active".to_string()),
simple!(col("active").eq(lit(true)), "active = true".to_string()),
Expand Down Expand Up @@ -536,7 +545,7 @@ mod test {
),
];

let session = SessionContext::new();
let session: SessionContext = DeltaSessionContext::default().into();

for test in tests {
let actual = fmt_expr_to_sql(&test.expr).unwrap();
Expand Down
128 changes: 104 additions & 24 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaR
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use arrow_array::types::UInt16Type;
use arrow_array::{DictionaryArray, StringArray};
use arrow_array::{Array, DictionaryArray, StringArray};
use arrow_cast::display::array_value_to_string;

use arrow_schema::Field;
use async_trait::async_trait;
use chrono::{NaiveDateTime, TimeZone, Utc};
Expand Down Expand Up @@ -66,17 +68,21 @@ use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_sql::planner::ParserOptions;
use futures::TryStreamExt;

use itertools::Itertools;
use log::error;
use object_store::ObjectMeta;
use serde::{Deserialize, Serialize};
use url::Url;

use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, DataType as DeltaDataType, Invariant, PrimitiveType};
use crate::kernel::{Add, DataCheck, DataType as DeltaDataType, Invariant, PrimitiveType};
use crate::logstore::LogStoreRef;
use crate::protocol::{ColumnCountStat, ColumnValueStat};
use crate::table::builder::ensure_table_uri;
use crate::table::state::DeltaTableState;
use crate::table::Constraint;
use crate::{open_table, open_table_with_storage_options, DeltaTable};

const PATH_COLUMN: &str = "__delta_rs_path";
Expand Down Expand Up @@ -1014,19 +1020,78 @@ pub(crate) fn logical_expr_to_physical_expr(
create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap()
}

pub(crate) async fn execute_plan_to_batch(
state: &SessionState,
plan: Arc<dyn ExecutionPlan>,
) -> DeltaResult<arrow::record_batch::RecordBatch> {
let data =
futures::future::try_join_all((0..plan.output_partitioning().partition_count()).map(|p| {
let plan_copy = plan.clone();
let task_context = state.task_ctx().clone();
async move {
let batch_stream = plan_copy.execute(p, task_context)?;

let schema = batch_stream.schema();

let batches = batch_stream.try_collect::<Vec<_>>().await?;

DataFusionResult::<_>::Ok(arrow::compute::concat_batches(&schema, batches.iter())?)
}
}))
.await?;

let batch = arrow::compute::concat_batches(&plan.schema(), data.iter())?;

Ok(batch)
}

/// Responsible for checking batches of data conform to table's invariants.
#[derive(Clone)]
pub struct DeltaDataChecker {
constraints: Vec<Constraint>,
invariants: Vec<Invariant>,
ctx: SessionContext,
}

impl DeltaDataChecker {
/// Create a new DeltaDataChecker with a specified set of invariants
pub fn new_with_invariants(invariants: Vec<Invariant>) -> Self {
Self {
invariants,
constraints: vec![],
ctx: DeltaSessionContext::default().into(),
}
}

/// Create a new DeltaDataChecker with a specified set of constraints
pub fn new_with_constraints(constraints: Vec<Constraint>) -> Self {
Self {
constraints,
invariants: vec![],
ctx: DeltaSessionContext::default().into(),
}
}

/// Specify the Datafusion context
pub fn with_session_context(mut self, context: SessionContext) -> Self {
self.ctx = context;
self
}

/// Create a new DeltaDataChecker
pub fn new(invariants: Vec<Invariant>) -> Self {
pub fn new(snapshot: &DeltaTableState) -> Self {
let metadata = snapshot.metadata();

let invariants = metadata
.and_then(|meta| meta.schema.get_invariants().ok())
.unwrap_or_default();
let constraints = metadata
.map(|meta| meta.get_constraints())
.unwrap_or_default();
Self {
invariants,
ctx: SessionContext::new(),
constraints,
ctx: DeltaSessionContext::default().into(),
}
}

Expand All @@ -1035,45 +1100,54 @@ impl DeltaDataChecker {
/// If it does not, it will return [DeltaTableError::InvalidData] with a list
/// of values that violated each invariant.
pub async fn check_batch(&self, record_batch: &RecordBatch) -> Result<(), DeltaTableError> {
self.enforce_invariants(record_batch).await
// TODO: for support for Protocol V3, check constraints
self.enforce_checks(record_batch, &self.invariants).await?;
self.enforce_checks(record_batch, &self.constraints).await
}

async fn enforce_invariants(&self, record_batch: &RecordBatch) -> Result<(), DeltaTableError> {
// Invariants are deprecated, so let's not pay the overhead for any of this
// if we can avoid it.
if self.invariants.is_empty() {
async fn enforce_checks<C: DataCheck>(
&self,
record_batch: &RecordBatch,
checks: &[C],
) -> Result<(), DeltaTableError> {
if checks.is_empty() {
return Ok(());
}

let table = MemTable::try_new(record_batch.schema(), vec![vec![record_batch.clone()]])?;
self.ctx.register_table("data", Arc::new(table))?;

let mut violations: Vec<String> = Vec::new();

for invariant in self.invariants.iter() {
if invariant.field_name.contains('.') {
for check in checks {
if check.get_name().contains('.') {
return Err(DeltaTableError::Generic(
"Support for column invariants on nested columns is not supported.".to_string(),
"Support for nested columns is not supported.".to_string(),
));
}

let sql = format!(
"SELECT {} FROM data WHERE not ({}) LIMIT 1",
invariant.field_name, invariant.invariant_sql
"SELECT {} FROM data WHERE NOT ({}) LIMIT 1",
check.get_name(),
check.get_expression()
);

let dfs: Vec<RecordBatch> = self.ctx.sql(&sql).await?.collect().await?;
if !dfs.is_empty() && dfs[0].num_rows() > 0 {
let value = format!("{:?}", dfs[0].column(0));
let value: String = dfs[0]
.columns()
.iter()
.map(|c| array_value_to_string(c, 0).unwrap_or(String::from("null")))
.join(", ");

let msg = format!(
"Invariant ({}) violated by value {}",
invariant.invariant_sql, value
"Check or Invariant ({}) violated by value in row: [{}]",
check.get_expression(),
value
);
violations.push(msg);
}
}

self.ctx.deregister_table("data")?;
if !violations.is_empty() {
Err(DeltaTableError::InvalidData { violations })
} else {
Expand Down Expand Up @@ -1747,7 +1821,7 @@ mod tests {
.unwrap();
// Empty invariants is okay
let invariants: Vec<Invariant> = vec![];
assert!(DeltaDataChecker::new(invariants)
assert!(DeltaDataChecker::new_with_invariants(invariants)
.check_batch(&batch)
.await
.is_ok());
Expand All @@ -1757,7 +1831,7 @@ mod tests {
Invariant::new("a", "a is not null"),
Invariant::new("b", "b < 1000"),
];
assert!(DeltaDataChecker::new(invariants)
assert!(DeltaDataChecker::new_with_invariants(invariants)
.check_batch(&batch)
.await
.is_ok());
Expand All @@ -1767,7 +1841,9 @@ mod tests {
Invariant::new("a", "a is null"),
Invariant::new("b", "b < 100"),
];
let result = DeltaDataChecker::new(invariants).check_batch(&batch).await;
let result = DeltaDataChecker::new_with_invariants(invariants)
.check_batch(&batch)
.await;
assert!(result.is_err());
assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
if let Err(DeltaTableError::InvalidData { violations }) = result {
Expand All @@ -1776,7 +1852,9 @@ mod tests {

// Irrelevant invariants return a different error
let invariants = vec![Invariant::new("c", "c > 2000")];
let result = DeltaDataChecker::new(invariants).check_batch(&batch).await;
let result = DeltaDataChecker::new_with_invariants(invariants)
.check_batch(&batch)
.await;
assert!(result.is_err());

// Nested invariants are unsupported
Expand All @@ -1790,7 +1868,9 @@ mod tests {
let batch = RecordBatch::try_new(schema, vec![inner]).unwrap();

let invariants = vec![Invariant::new("x.b", "x.b < 1000")];
let result = DeltaDataChecker::new(invariants).check_batch(&batch).await;
let result = DeltaDataChecker::new_with_invariants(invariants)
.check_batch(&batch)
.await;
assert!(result.is_err());
assert!(matches!(result, Err(DeltaTableError::Generic { .. })));
}
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/kernel/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::collections::HashMap;
use serde::{Deserialize, Serialize};

pub(crate) mod schemas;
mod serde_path;
pub(crate) mod serde_path;
pub(crate) mod types;

pub use types::*;
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/kernel/actions/serde_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ fn encode_path(path: &str) -> String {
percent_encode(path.as_bytes(), INVALID).to_string()
}

fn decode_path(path: &str) -> Result<String, Utf8Error> {
pub fn decode_path(path: &str) -> Result<String, Utf8Error> {
Ok(percent_decode_str(path).decode_utf8()?.to_string())
}

Expand Down
2 changes: 2 additions & 0 deletions crates/deltalake-core/src/kernel/actions/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,11 @@ pub struct Protocol {
pub min_writer_version: i32,
/// A collection of features that a client must implement in order to correctly
/// read this table (exist only when minReaderVersion is set to 3)
#[serde(skip_serializing_if = "Option::is_none")]
pub reader_features: Option<HashSet<ReaderFeatures>>,
/// A collection of features that a client must implement in order to correctly
/// write this table (exist only when minWriterVersion is set to 7)
#[serde(skip_serializing_if = "Option::is_none")]
pub writer_features: Option<HashSet<WriterFeatures>>,
}

Expand Down
Loading

0 comments on commit aff4057

Please sign in to comment.