From 340681312dfc80c71b641fcbb81bc9b50a1a058c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philippe=20No=C3=ABl?= <21990816+philippemnoel@users.noreply.github.com> Date: Tue, 30 Jan 2024 12:12:43 -0500 Subject: [PATCH 1/5] chore: upgrade to DataFusion 35.0 (#2121) # Description This PR upgrades `delta-rs` to using DataFusion 35.0, which was recently released. In order to do this, I had to fix a few breaking changes, and also upgrade Arrow to 50 and `sqlparser` to 0.41. # Related Issue(s) N/A # Documentation See here for the list of PRs which required code change: - https://github.com/apache/arrow-datafusion/pull/8703 - https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/dev/changelog/35.0.0.md?plain=1#L227 --------- Co-authored-by: Ming Ying --- Cargo.toml | 39 ++++---- crates/azure/src/config.rs | 1 + crates/azure/src/error.rs | 1 + crates/core/Cargo.toml | 2 +- crates/core/src/delta_datafusion/mod.rs | 8 +- crates/core/src/kernel/snapshot/log_data.rs | 2 +- crates/core/src/operations/delete.rs | 1 - crates/core/src/operations/merge/barrier.rs | 4 +- crates/core/src/operations/merge/mod.rs | 6 +- crates/core/src/operations/optimize.rs | 46 +++++++--- .../core/src/operations/transaction/state.rs | 15 +++- crates/core/src/operations/update.rs | 10 +-- crates/gcp/src/error.rs | 1 + crates/sql/src/parser.rs | 88 +------------------ 14 files changed, 80 insertions(+), 144 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5d03cd562d..cfcb4eaf3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,28 +19,27 @@ debug = "line-tables-only" [workspace.dependencies] # arrow -arrow = { version = "49" } -arrow-arith = { version = "49" } -arrow-array = { version = "49" } -arrow-buffer = { version = "49" } -arrow-cast = { version = "49" } -arrow-ipc = { version = "49" } -arrow-json = { version = "49" } -arrow-ord = { version = "49" } -arrow-row = { version = "49" } -arrow-schema = { version = "49" } -arrow-select = { version = "49" } -object_store = { version = "0.8" } -parquet = { version = "49" } +arrow = { version = "50" } +arrow-arith = { version = "50" } +arrow-array = { version = "50" } +arrow-buffer = { version = "50" } +arrow-cast = { version = "50" } +arrow-ipc = { version = "50" } +arrow-json = { version = "50" } +arrow-ord = { version = "50" } +arrow-row = { version = "50" } +arrow-schema = { version = "50" } +arrow-select = { version = "50" } +object_store = { version = "0.9" } +parquet = { version = "50" } # datafusion -datafusion = { version = "34" } -datafusion-expr = { version = "34" } -datafusion-common = { version = "34" } -datafusion-proto = { version = "34" } -datafusion-sql = { version = "34" } -datafusion-physical-expr = { version = "34" } - +datafusion = { version = "35" } +datafusion-expr = { version = "35" } +datafusion-common = { version = "35" } +datafusion-proto = { version = "35" } +datafusion-sql = { version = "35" } +datafusion-physical-expr = { version = "35" } # serde serde = { version = "1.0.194", features = ["derive"] } diff --git a/crates/azure/src/config.rs b/crates/azure/src/config.rs index ccb06f171a..d30272768e 100644 --- a/crates/azure/src/config.rs +++ b/crates/azure/src/config.rs @@ -35,6 +35,7 @@ enum AzureCredential { /// Authorizing with secret ClientSecret, /// Using a shared access signature + #[allow(dead_code)] ManagedIdentity, /// Using a shared access signature SasKey, diff --git a/crates/azure/src/error.rs b/crates/azure/src/error.rs index aca1321c3d..acc18f67f9 100644 --- a/crates/azure/src/error.rs +++ b/crates/azure/src/error.rs @@ -4,6 +4,7 @@ pub(crate) type Result = std::result::Result; #[derive(thiserror::Error, Debug)] pub(crate) enum Error { + #[allow(dead_code)] #[error("failed to parse config: {0}")] Parse(String), diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index f8d3778eca..9773f82c46 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -97,7 +97,7 @@ reqwest = { version = "0.11.18", default-features = false, features = [ "rustls-tls", "json", ], optional = true } -sqlparser = { version = "0.40", optional = true } +sqlparser = { version = "0.41", optional = true } [dev-dependencies] criterion = "0.5" diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index ca64c9ef63..d7a10edc1f 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -90,7 +90,7 @@ pub mod physical; impl From for DataFusionError { fn from(err: DeltaTableError) -> Self { match err { - DeltaTableError::Arrow { source } => DataFusionError::ArrowError(source), + DeltaTableError::Arrow { source } => DataFusionError::ArrowError(source, None), DeltaTableError::Io { source } => DataFusionError::IoError(source), DeltaTableError::ObjectStore { source } => DataFusionError::ObjectStore(source), DeltaTableError::Parquet { source } => DataFusionError::ParquetError(source), @@ -102,7 +102,7 @@ impl From for DataFusionError { impl From for DeltaTableError { fn from(err: DataFusionError) -> Self { match err { - DataFusionError::ArrowError(source) => DeltaTableError::Arrow { source }, + DataFusionError::ArrowError(source, _) => DeltaTableError::Arrow { source }, DataFusionError::IoError(source) => DeltaTableError::Io { source }, DataFusionError::ObjectStore(source) => DeltaTableError::ObjectStore { source }, DataFusionError::ParquetError(source) => DeltaTableError::Parquet { source }, @@ -430,7 +430,6 @@ impl<'a> DeltaScanBuilder<'a> { limit: self.limit, table_partition_cols, output_ordering: vec![], - infinite_source: false, }, logical_filter.as_ref(), ) @@ -808,7 +807,7 @@ pub(crate) fn logical_expr_to_physical_expr( ) -> Arc { let df_schema = schema.clone().to_dfschema().unwrap(); let execution_props = ExecutionProps::new(); - create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap() + create_physical_expr(expr, &df_schema, &execution_props).unwrap() } pub(crate) async fn execute_plan_to_batch( @@ -1238,7 +1237,6 @@ pub(crate) async fn find_files_scan<'a>( let predicate_expr = create_physical_expr( &Expr::IsTrue(Box::new(expression.clone())), &input_dfschema, - &input_schema, state.execution_props(), )?; diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 525f3db64b..b874b53421 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -245,7 +245,7 @@ impl LogicalFile<'_> { self.deletion_vector.as_ref().and_then(|arr| { arr.storage_type .is_valid(self.index) - .then(|| DeletionVectorView { + .then_some(DeletionVectorView { data: arr, index: self.index, }) diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 1e0f196aa3..2e3e99bde2 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -152,7 +152,6 @@ async fn excute_non_empty_expr( let predicate_expr = create_physical_expr( &negated_expression, &input_dfschema, - &input_schema, state.execution_props(), )?; let filter: Arc = diff --git a/crates/core/src/operations/merge/barrier.rs b/crates/core/src/operations/merge/barrier.rs index 6883f61253..f1df28c4a4 100644 --- a/crates/core/src/operations/merge/barrier.rs +++ b/crates/core/src/operations/merge/barrier.rs @@ -293,7 +293,9 @@ impl Stream for MergeBarrierStream { .iter() .map(|c| { arrow::compute::take(c.as_ref(), &indices, None) - .map_err(DataFusionError::ArrowError) + .map_err(|err| { + DataFusionError::ArrowError(err, None) + }) }) .collect::>>()?; diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index ffe2e78e38..07f65f4cf8 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -32,7 +32,6 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; -use arrow_schema::Schema as ArrowSchema; use async_trait::async_trait; use datafusion::datasource::provider_as_source; use datafusion::error::Result as DataFusionResult; @@ -657,11 +656,10 @@ impl ExtensionPlanner for MergeMetricExtensionPlanner { if let Some(barrier) = node.as_any().downcast_ref::() { let schema = barrier.input.schema(); - let exec_schema: ArrowSchema = schema.as_ref().to_owned().into(); return Ok(Some(Arc::new(MergeBarrierExec::new( physical_inputs.first().unwrap().clone(), barrier.file_column.clone(), - planner.create_physical_expr(&barrier.expr, schema, &exec_schema, session_state)?, + planner.create_physical_expr(&barrier.expr, schema, session_state)?, )))); } @@ -1418,9 +1416,7 @@ impl std::future::IntoFuture for MergeBuilder { PROTOCOL.can_write_to(&this.snapshot)?; let state = this.state.unwrap_or_else(|| { - //TODO: Datafusion's Hashjoin has some memory issues. Running with all cores results in a OoM. Can be removed when upstream improvemetns are made. let config: SessionConfig = DeltaSessionConfig::default().into(); - let config = config.with_target_partitions(1); let session = SessionContext::new_with_config(config); // If a user provides their own their DF state then they must register the store themselves diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 1fc0286754..c67b31a71b 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -552,7 +552,7 @@ impl MergePlan { use datafusion::prelude::{col, ParquetReadOptions}; use datafusion_common::Column; use datafusion_expr::expr::ScalarFunction; - use datafusion_expr::Expr; + use datafusion_expr::{Expr, ScalarUDF}; let locations = files .iter() @@ -578,7 +578,7 @@ impl MergePlan { .map(|col| Expr::Column(Column::from_qualified_name_ignore_case(col))) .collect_vec(); let expr = Expr::ScalarFunction(ScalarFunction::new_udf( - Arc::new(zorder::datafusion::zorder_key_udf()), + Arc::new(ScalarUDF::from(zorder::datafusion::ZOrderUDF)), cols, )); let df = df.with_column(ZORDER_KEY_COLUMN, expr)?; @@ -1139,10 +1139,10 @@ pub(super) mod zorder { use arrow_schema::DataType; use datafusion_common::DataFusionError; use datafusion_expr::{ - ColumnarValue, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, Signature, - TypeSignature, Volatility, + ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, Volatility, }; use itertools::Itertools; + use std::any::Any; pub const ZORDER_UDF_NAME: &str = "zorder_key"; @@ -1166,20 +1166,38 @@ pub(super) mod zorder { use url::Url; let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime); - ctx.register_udf(datafusion::zorder_key_udf()); + ctx.register_udf(ScalarUDF::from(datafusion::ZOrderUDF)); Ok(Self { columns, ctx }) } } - /// Get the DataFusion UDF struct for zorder_key - pub fn zorder_key_udf() -> ScalarUDF { - let signature = Signature { - type_signature: TypeSignature::VariadicAny, - volatility: Volatility::Immutable, - }; - let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Binary))); - let fun: ScalarFunctionImplementation = Arc::new(zorder_key_datafusion); - ScalarUDF::new(ZORDER_UDF_NAME, &signature, &return_type, &fun) + // DataFusion UDF impl for zorder_key + #[derive(Debug)] + pub struct ZOrderUDF; + + impl ScalarUDFImpl for ZOrderUDF { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + ZORDER_UDF_NAME + } + + fn signature(&self) -> &Signature { + &Signature { + type_signature: TypeSignature::VariadicAny, + volatility: Volatility::Immutable, + } + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Binary) + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + zorder_key_datafusion(args) + } } /// Datafusion zorder UDF body diff --git a/crates/core/src/operations/transaction/state.rs b/crates/core/src/operations/transaction/state.rs index d3f680fcea..ab778f2cb6 100644 --- a/crates/core/src/operations/transaction/state.rs +++ b/crates/core/src/operations/transaction/state.rs @@ -1,6 +1,7 @@ +use std::collections::HashSet; use std::sync::Arc; -use arrow::array::ArrayRef; +use arrow::array::{ArrayRef, BooleanArray}; use arrow::datatypes::{ DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; @@ -296,6 +297,12 @@ impl<'a> PruningStatistics for AddContainer<'a> { }); ScalarValue::iter_to_array(values).ok() } + + // This function is required since DataFusion 35.0, but is implemented as a no-op + // https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550 + fn contained(&self, _column: &Column, _value: &HashSet) -> Option { + None + } } impl PruningStatistics for DeltaTableState { @@ -333,6 +340,12 @@ impl PruningStatistics for DeltaTableState { let container = AddContainer::new(&files, partition_columns, self.arrow_schema().ok()?); container.null_counts(column) } + + // This function is required since DataFusion 35.0, but is implemented as a no-op + // https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550 + fn contained(&self, _column: &Column, _value: &HashSet) -> Option { + None + } } #[cfg(test)] diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 582a37da28..d07f3f9fc0 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -263,12 +263,7 @@ async fn execute( let predicate_null = when(predicate.clone(), lit(true)).otherwise(lit(ScalarValue::Boolean(None)))?; - let predicate_expr = create_physical_expr( - &predicate_null, - &input_dfschema, - &input_schema, - execution_props, - )?; + let predicate_expr = create_physical_expr(&predicate_null, &input_dfschema, execution_props)?; expressions.push((predicate_expr, "__delta_rs_update_predicate".to_string())); let projection_predicate: Arc = @@ -315,8 +310,7 @@ async fn execute( let expr = case(col("__delta_rs_update_predicate")) .when(lit(true), expr.to_owned()) .otherwise(col(column.to_owned()))?; - let predicate_expr = - create_physical_expr(&expr, &input_dfschema, &input_schema, execution_props)?; + let predicate_expr = create_physical_expr(&expr, &input_dfschema, execution_props)?; map.insert(column.name.clone(), expressions.len()); let c = "__delta_rs_".to_string() + &column.name; expressions.push((predicate_expr, c.clone())); diff --git a/crates/gcp/src/error.rs b/crates/gcp/src/error.rs index aca1321c3d..acc18f67f9 100644 --- a/crates/gcp/src/error.rs +++ b/crates/gcp/src/error.rs @@ -4,6 +4,7 @@ pub(crate) type Result = std::result::Result; #[derive(thiserror::Error, Debug)] pub(crate) enum Error { + #[allow(dead_code)] #[error("failed to parse config: {0}")] Parse(String), diff --git a/crates/sql/src/parser.rs b/crates/sql/src/parser.rs index 3287c87215..10e7252730 100644 --- a/crates/sql/src/parser.rs +++ b/crates/sql/src/parser.rs @@ -1,7 +1,7 @@ use std::collections::VecDeque; use std::fmt; -use datafusion_sql::parser::{DFParser, DescribeTableStmt, Statement as DFStatement}; +use datafusion_sql::parser::{DFParser, Statement as DFStatement}; use datafusion_sql::sqlparser::ast::{ObjectName, Value}; use datafusion_sql::sqlparser::dialect::{keywords::Keyword, Dialect, GenericDialect}; use datafusion_sql::sqlparser::parser::{Parser, ParserError}; @@ -138,10 +138,6 @@ impl<'a> DeltaParser<'a> { match self.parser.peek_token().token { Token::Word(w) => { match w.keyword { - Keyword::DESCRIBE => { - self.parser.next_token(); - self.parse_describe() - } Keyword::VACUUM => { self.parser.next_token(); self.parse_vacuum() @@ -167,50 +163,6 @@ impl<'a> DeltaParser<'a> { } } - /// Parse a SQL `DESCRIBE` statement - pub fn parse_describe(&mut self) -> Result { - match self.parser.peek_token().token { - Token::Word(w) => match w.keyword { - Keyword::DETAIL => { - self.parser.next_token(); - let table = self.parser.parse_object_name()?; - Ok(Statement::Describe(DescribeStatement { - table, - operation: DescribeOperation::Detail, - })) - } - Keyword::HISTORY => { - self.parser.next_token(); - let table = self.parser.parse_object_name()?; - Ok(Statement::Describe(DescribeStatement { - table, - operation: DescribeOperation::History, - })) - } - Keyword::FILES => { - self.parser.next_token(); - let table = self.parser.parse_object_name()?; - Ok(Statement::Describe(DescribeStatement { - table, - operation: DescribeOperation::Files, - })) - } - _ => { - let table = self.parser.parse_object_name()?; - Ok(Statement::Datafusion(DFStatement::DescribeTableStmt( - DescribeTableStmt { table_name: table }, - ))) - } - }, - _ => { - let table_name = self.parser.parse_object_name()?; - Ok(Statement::Datafusion(DFStatement::DescribeTableStmt( - DescribeTableStmt { table_name }, - ))) - } - } - } - pub fn parse_vacuum(&mut self) -> Result { let table_name = self.parser.parse_object_name()?; match self.parser.peek_token().token { @@ -287,44 +239,6 @@ mod tests { Ok(()) } - #[test] - fn test_parse_describe() { - let stmt = Statement::Describe(DescribeStatement { - table: ObjectName(vec![Ident { - value: "data_table".to_string(), - quote_style: None, - }]), - operation: DescribeOperation::History, - }); - assert!(expect_parse_ok("DESCRIBE HISTORY data_table", stmt).is_ok()); - - let stmt = Statement::Describe(DescribeStatement { - table: ObjectName(vec![Ident { - value: "data_table".to_string(), - quote_style: None, - }]), - operation: DescribeOperation::Detail, - }); - assert!(expect_parse_ok("DESCRIBE DETAIL data_table", stmt).is_ok()); - - let stmt = Statement::Describe(DescribeStatement { - table: ObjectName(vec![Ident { - value: "data_table".to_string(), - quote_style: None, - }]), - operation: DescribeOperation::Files, - }); - assert!(expect_parse_ok("DESCRIBE FILES data_table", stmt).is_ok()); - - let stmt = Statement::Datafusion(DFStatement::DescribeTableStmt(DescribeTableStmt { - table_name: ObjectName(vec![Ident { - value: "data_table".to_string(), - quote_style: None, - }]), - })); - assert!(expect_parse_ok("DESCRIBE data_table", stmt).is_ok()) - } - #[test] fn test_parse_vacuum() { let stmt = Statement::Vacuum(VacuumStatement { From 467afc57a2fa95ac37ee4241666040cf7c87b31d Mon Sep 17 00:00:00 2001 From: Thomas Peiselt Date: Tue, 30 Jan 2024 21:43:59 +0100 Subject: [PATCH 2/5] fix(s3): restore working test for DynamoDb log store repair log on read (#2120) # Description Make sure the read path for delta table commit entries passes through the log store, enabling it to ensure the invariants and potentially repair a broken commit in the context of S3 / DynamoDb log store implementation. This also adds another test in the context of S3 log store: repairing a log store on load was not implemented previously. Note that this a stopgap and not a complete solution: it comes with a performance penalty as we're triggering a redundant object store list operation just for the purpose of "triggering" the log store functionality. fixes #2109 --------- Co-authored-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Co-authored-by: R. Tyler Croy --- crates/aws/src/logstore.rs | 14 +++++++++++ crates/aws/tests/integration_s3_dynamodb.rs | 14 ++++++++++- .../core/src/kernel/snapshot/log_segment.rs | 13 ++++++++--- crates/core/src/kernel/snapshot/mod.rs | 23 +++++++++---------- crates/core/src/logstore/mod.rs | 5 ++++ crates/core/src/table/mod.rs | 6 +---- crates/core/src/table/state.rs | 5 ++-- 7 files changed, 57 insertions(+), 23 deletions(-) diff --git a/crates/aws/src/logstore.rs b/crates/aws/src/logstore.rs index 8e02659d87..123aadd2d1 100644 --- a/crates/aws/src/logstore.rs +++ b/crates/aws/src/logstore.rs @@ -166,6 +166,20 @@ impl LogStore for S3DynamoDbLogStore { self.table_path.clone() } + async fn refresh(&self) -> DeltaResult<()> { + let entry = self + .lock_client + .get_latest_entry(&self.table_path) + .await + .map_err(|err| DeltaTableError::GenericError { + source: Box::new(err), + })?; + if let Some(entry) = entry { + self.repair_entry(&entry).await?; + } + Ok(()) + } + async fn read_commit_entry(&self, version: i64) -> DeltaResult> { let entry = self .lock_client diff --git a/crates/aws/tests/integration_s3_dynamodb.rs b/crates/aws/tests/integration_s3_dynamodb.rs index ff8f0ae7e9..179c46fc5a 100644 --- a/crates/aws/tests/integration_s3_dynamodb.rs +++ b/crates/aws/tests/integration_s3_dynamodb.rs @@ -156,7 +156,6 @@ async fn test_repair_commit_entry() -> TestResult<()> { #[tokio::test] #[serial] -#[ignore = "https://github.com/delta-io/delta-rs/issues/2109"] async fn test_repair_on_update() -> TestResult<()> { let context = IntegrationContext::new(Box::new(S3Integration::default()))?; let mut table = prepare_table(&context, "repair_on_update").await?; @@ -168,6 +167,19 @@ async fn test_repair_on_update() -> TestResult<()> { Ok(()) } +#[tokio::test] +#[serial] +async fn test_repair_on_load() -> TestResult<()> { + let context = IntegrationContext::new(Box::new(S3Integration::default()))?; + let mut table = prepare_table(&context, "repair_on_update").await?; + let _entry = create_incomplete_commit_entry(&table, 1, "unfinished_commit").await?; + table.load_version(1).await?; + // table should fix the broken entry while loading a specific version + assert_eq!(table.version(), 1); + validate_lock_table_state(&table, 1).await?; + Ok(()) +} + const WORKERS: i64 = 3; const COMMITS: i64 = 15; diff --git a/crates/core/src/kernel/snapshot/log_segment.rs b/crates/core/src/kernel/snapshot/log_segment.rs index 66cc428c3f..6ad1690db1 100644 --- a/crates/core/src/kernel/snapshot/log_segment.rs +++ b/crates/core/src/kernel/snapshot/log_segment.rs @@ -18,6 +18,7 @@ use tracing::debug; use super::parse; use crate::kernel::{arrow::json, Action, ActionType, Metadata, Protocol, Schema, StructType}; +use crate::logstore::LogStore; use crate::operations::transaction::get_commit_bytes; use crate::protocol::DeltaOperation; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; @@ -148,15 +149,21 @@ impl LogSegment { table_root: &Path, start_version: i64, end_version: Option, - store: &dyn ObjectStore, + log_store: &dyn LogStore, ) -> DeltaResult { debug!( "try_new_slice: start_version: {}, end_version: {:?}", start_version, end_version ); + log_store.refresh().await?; let log_url = table_root.child("_delta_log"); - let (mut commit_files, checkpoint_files) = - list_log_files(store, &log_url, end_version, Some(start_version)).await?; + let (mut commit_files, checkpoint_files) = list_log_files( + log_store.object_store().as_ref(), + &log_url, + end_version, + Some(start_version), + ) + .await?; // remove all files above requested version if let Some(version) = end_version { commit_files.retain(|meta| meta.location.commit_version() <= Some(version)); diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 715fb2feec..d12018c245 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -30,6 +30,7 @@ use self::parse::{read_adds, read_removes}; use self::replay::{LogMapper, LogReplayScanner, ReplayStream}; use super::{Action, Add, CommitInfo, DataType, Metadata, Protocol, Remove, StructField}; use crate::kernel::StructType; +use crate::logstore::LogStore; use crate::table::config::TableConfig; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; @@ -108,16 +109,16 @@ impl Snapshot { /// Update the snapshot to the given version pub async fn update( &mut self, - store: Arc, + log_store: Arc, target_version: Option, ) -> DeltaResult<()> { - self.update_inner(store, target_version).await?; + self.update_inner(log_store, target_version).await?; Ok(()) } async fn update_inner( &mut self, - store: Arc, + log_store: Arc, target_version: Option, ) -> DeltaResult> { if let Some(version) = target_version { @@ -125,16 +126,14 @@ impl Snapshot { return Ok(None); } if version < self.version() { - return Err(DeltaTableError::Generic( - "Cannoit downgrade snapshot".into(), - )); + return Err(DeltaTableError::Generic("Cannot downgrade snapshot".into())); } } let log_segment = LogSegment::try_new_slice( &Path::default(), self.version() + 1, target_version, - store.as_ref(), + log_store.as_ref(), ) .await?; if log_segment.commit_files.is_empty() && log_segment.checkpoint_files.is_empty() { @@ -142,7 +141,7 @@ impl Snapshot { } let (protocol, metadata) = log_segment - .read_metadata(store.clone(), &self.config) + .read_metadata(log_store.object_store().clone(), &self.config) .await?; if let Some(protocol) = protocol { self.protocol = protocol; @@ -376,7 +375,7 @@ impl EagerSnapshot { /// Update the snapshot to the given version pub async fn update( &mut self, - store: Arc, + log_store: Arc, target_version: Option, ) -> DeltaResult<()> { if Some(self.version()) == target_version { @@ -384,12 +383,12 @@ impl EagerSnapshot { } let new_slice = self .snapshot - .update_inner(store.clone(), target_version) + .update_inner(log_store.clone(), target_version) .await?; if let Some(new_slice) = new_slice { let files = std::mem::take(&mut self.files); let log_stream = new_slice.commit_stream( - store.clone(), + log_store.object_store().clone(), &log_segment::COMMIT_SCHEMA, &self.snapshot.config, )?; @@ -398,7 +397,7 @@ impl EagerSnapshot { } else { new_slice .checkpoint_stream( - store, + log_store.object_store(), &log_segment::CHECKPOINT_SCHEMA, &self.snapshot.config, ) diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 5deaa9cd36..e6b4c6e2d4 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -165,6 +165,11 @@ pub trait LogStore: Sync + Send { /// Return the name of this LogStore implementation fn name(&self) -> String; + /// Trigger sync operation on log store to. + async fn refresh(&self) -> DeltaResult<()> { + Ok(()) + } + /// Read data for commit entry with the given version. async fn read_commit_entry(&self, version: i64) -> DeltaResult>; diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index ad260295ba..7615c72dc3 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -345,11 +345,7 @@ impl DeltaTable { self.version(), ); match self.state.as_mut() { - Some(state) => { - state - .update(self.log_store.object_store(), max_version) - .await - } + Some(state) => state.update(self.log_store.clone(), max_version).await, _ => { let state = DeltaTableState::try_new( &Path::default(), diff --git a/crates/core/src/table/state.rs b/crates/core/src/table/state.rs index 104ba2bd32..ab5c229c49 100644 --- a/crates/core/src/table/state.rs +++ b/crates/core/src/table/state.rs @@ -14,6 +14,7 @@ use crate::kernel::{ Action, Add, DataType, EagerSnapshot, LogDataHandler, LogicalFile, Metadata, Protocol, Remove, StructType, }; +use crate::logstore::LogStore; use crate::partitions::{DeltaTablePartition, PartitionFilter}; use crate::protocol::DeltaOperation; use crate::{DeltaResult, DeltaTableError}; @@ -196,10 +197,10 @@ impl DeltaTableState { /// Update the state of the table to the given version. pub async fn update( &mut self, - store: Arc, + log_store: Arc, version: Option, ) -> Result<(), DeltaTableError> { - self.snapshot.update(store, version).await?; + self.snapshot.update(log_store, version).await?; Ok(()) } From b1074077bbaa3b7b618b8540c21dd7593a7c0111 Mon Sep 17 00:00:00 2001 From: Niko Date: Wed, 31 Jan 2024 08:09:07 +0000 Subject: [PATCH 3/5] feat: implementation for replaceWhere (#1996) # Description First/naive implementation of `replaceWhere` for `write`. Code compiles and there is a test to verify the outcome. I would appreciate any feedback on improving the structure/implementation. For example, I copied the part of code from `delete` operation because there is no way to call that code in `delete` directly from `write` - should I look into extracting that code from `delete` to somewhere central? Seems to also works with partitions columns. # Related Issue(s) https://github.com/delta-io/delta-rs/issues/1957 # Documentation Added a section in docs --------- Signed-off-by: Nikolay Ulmasov Co-authored-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> --- crates/core/src/delta_datafusion/mod.rs | 6 + crates/core/src/operations/write.rs | 380 ++++++++++++++++++++++-- crates/core/src/writer/test_utils.rs | 18 ++ docs/_build/links.yml | 4 +- docs/index.md | 2 +- docs/src/python/operations.py | 21 ++ docs/src/rust/operations.rs | 32 ++ docs/usage/writing/index.md | 19 +- python/deltalake/writer.py | 30 +- python/docs/source/index.rst | 2 +- python/src/lib.rs | 2 +- python/tests/test_writer.py | 107 +++++-- 12 files changed, 578 insertions(+), 45 deletions(-) create mode 100644 docs/src/python/operations.py create mode 100644 docs/src/rust/operations.rs diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index d7a10edc1f..6ea60a0bda 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -877,6 +877,12 @@ impl DeltaDataChecker { self } + /// Add the specified set of constraints to the current DeltaDataChecker's constraints + pub fn with_extra_constraints(mut self, constraints: Vec) -> Self { + self.constraints.extend(constraints); + self + } + /// Create a new DeltaDataChecker pub fn new(snapshot: &DeltaTableState) -> Self { let invariants = snapshot.schema().get_invariants().unwrap_or_default(); diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index bf36b32459..bb976b5fb9 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -27,26 +27,36 @@ use std::collections::HashMap; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; use arrow_array::RecordBatch; use arrow_cast::can_cast_types; use arrow_schema::{DataType, Fields, SchemaRef as ArrowSchemaRef}; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; +use datafusion::physical_expr::create_physical_expr; +use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::{memory::MemoryExec, ExecutionPlan}; +use datafusion_common::DFSchema; +use datafusion_expr::Expr; use futures::future::BoxFuture; use futures::StreamExt; use parquet::file::properties::WriterProperties; +use super::datafusion_utils::Expression; use super::transaction::PROTOCOL; use super::writer::{DeltaWriter, WriterConfig}; use super::{transaction::commit, CreateBuilder}; +use crate::delta_datafusion::expr::fmt_expr_to_sql; +use crate::delta_datafusion::expr::parse_predicate_expression; use crate::delta_datafusion::DeltaDataChecker; +use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Action, Add, PartitionsExt, StructType}; +use crate::kernel::{Action, Add, PartitionsExt, Remove, StructType}; use crate::logstore::LogStoreRef; use crate::protocol::{DeltaOperation, SaveMode}; use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; +use crate::table::Constraint as DeltaConstraint; use crate::writer::record_batch::divide_by_partition_values; use crate::DeltaTable; @@ -79,7 +89,6 @@ impl From for DeltaTableError { } /// Write data into a DeltaTable -#[derive(Debug, Clone)] pub struct WriteBuilder { /// A snapshot of the to-be-loaded table's state snapshot: Option, @@ -94,7 +103,7 @@ pub struct WriteBuilder { /// Column names for table partitioning partition_columns: Option>, /// When using `Overwrite` mode, replace data that matches a predicate - predicate: Option, + predicate: Option, /// Size above which we will write a buffered parquet file to disk. target_file_size: Option, /// Number of records to be written in single batch to underlying writer @@ -154,7 +163,7 @@ impl WriteBuilder { } /// When using `Overwrite` mode, replace data that matches a predicate - pub fn with_replace_where(mut self, predicate: impl Into) -> Self { + pub fn with_replace_where(mut self, predicate: impl Into) -> Self { self.predicate = Some(predicate.into()); self } @@ -292,7 +301,8 @@ impl WriteBuilder { } #[allow(clippy::too_many_arguments)] -pub(crate) async fn write_execution_plan( +async fn write_execution_plan_with_predicate( + predicate: Option, snapshot: Option<&DeltaTableState>, state: SessionState, plan: Arc, @@ -318,6 +328,14 @@ pub(crate) async fn write_execution_plan( } else { DeltaDataChecker::empty() }; + let checker = match predicate { + Some(pred) => { + // TODO: get the name of the outer-most column? `*` will also work but would it be slower? + let chk = DeltaConstraint::new("*", &fmt_expr_to_sql(&pred)?); + checker.with_extra_constraints(vec![chk]) + } + _ => checker, + }; // Write data to disk let mut tasks = vec![]; @@ -363,6 +381,133 @@ pub(crate) async fn write_execution_plan( .collect::>()) } +#[allow(clippy::too_many_arguments)] +pub(crate) async fn write_execution_plan( + snapshot: Option<&DeltaTableState>, + state: SessionState, + plan: Arc, + partition_columns: Vec, + object_store: ObjectStoreRef, + target_file_size: Option, + write_batch_size: Option, + writer_properties: Option, + safe_cast: bool, + overwrite_schema: bool, +) -> DeltaResult> { + write_execution_plan_with_predicate( + None, + snapshot, + state, + plan, + partition_columns, + object_store, + target_file_size, + write_batch_size, + writer_properties, + safe_cast, + overwrite_schema, + ) + .await +} + +async fn execute_non_empty_expr( + snapshot: &DeltaTableState, + log_store: LogStoreRef, + state: SessionState, + partition_columns: Vec, + expression: &Expr, + rewrite: &[Add], + writer_properties: Option, +) -> DeltaResult> { + // For each identified file perform a parquet scan + filter + limit (1) + count. + // If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file. + + let input_schema = snapshot.input_schema()?; + let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; + + let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) + .with_files(rewrite) + .build() + .await?; + let scan = Arc::new(scan); + + // Apply the negation of the filter and rewrite files + let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone())))); + + let predicate_expr = create_physical_expr( + &negated_expression, + &input_dfschema, + state.execution_props(), + )?; + let filter: Arc = + Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); + + // We don't want to verify the predicate against existing data + let add_actions = write_execution_plan( + Some(snapshot), + state, + filter, + partition_columns, + log_store.object_store(), + Some(snapshot.table_config().target_file_size() as usize), + None, + writer_properties, + false, + false, + ) + .await?; + + Ok(add_actions) +} + +// This should only be called wth a valid predicate +async fn prepare_predicate_actions( + predicate: Expr, + log_store: LogStoreRef, + snapshot: &DeltaTableState, + state: SessionState, + partition_columns: Vec, + writer_properties: Option, + deletion_timestamp: i64, +) -> DeltaResult> { + let candidates = + find_files(snapshot, log_store.clone(), &state, Some(predicate.clone())).await?; + + let add = if candidates.partition_scan { + Vec::new() + } else { + execute_non_empty_expr( + snapshot, + log_store, + state, + partition_columns, + &predicate, + &candidates.candidates, + writer_properties, + ) + .await? + }; + let remove = candidates.candidates; + + let mut actions: Vec = add.into_iter().map(Action::Add).collect(); + + for action in remove { + actions.push(Action::Remove(Remove { + path: action.path, + deletion_timestamp: Some(deletion_timestamp), + data_change: true, + extended_file_metadata: Some(true), + partition_values: Some(action.partition_values), + size: Some(action.size), + deletion_vector: action.deletion_vector, + tags: None, + base_row_id: action.base_row_id, + default_row_commit_version: action.default_row_commit_version, + })) + } + Ok(actions) +} + impl std::future::IntoFuture for WriteBuilder { type Output = DeltaResult; type IntoFuture = BoxFuture<'static, Self::Output>; @@ -465,19 +610,37 @@ impl std::future::IntoFuture for WriteBuilder { Some(state) => state, None => { let ctx = SessionContext::new(); + register_store(this.log_store.clone(), ctx.runtime_env()); ctx.state() } }; - let add_actions = write_execution_plan( + let (predicate_str, predicate) = match this.predicate { + Some(predicate) => { + let pred = match predicate { + Expression::DataFusion(expr) => expr, + Expression::String(s) => { + let df_schema = DFSchema::try_from(schema.as_ref().to_owned())?; + parse_predicate_expression(&df_schema, s, &state)? + // this.snapshot.unwrap().parse_predicate_expression(s, &state)? + } + }; + (Some(fmt_expr_to_sql(&pred)?), Some(pred)) + } + _ => (None, None), + }; + + // Here we need to validate if the new data conforms to a predicate if one is provided + let add_actions = write_execution_plan_with_predicate( + predicate.clone(), this.snapshot.as_ref(), - state, + state.clone(), plan, partition_columns.clone(), this.log_store.object_store().clone(), this.target_file_size, this.write_batch_size, - this.writer_properties, + this.writer_properties.clone(), this.safe_cast, this.overwrite_schema, ) @@ -501,12 +664,26 @@ impl std::future::IntoFuture for WriteBuilder { actions.push(Action::Metadata(metadata)); } - match this.predicate { - Some(_pred) => { - return Err(DeltaTableError::Generic( - "Overwriting data based on predicate is not yet implemented" - .to_string(), - )); + let deletion_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as i64; + + match predicate { + Some(pred) => { + let predicate_actions = prepare_predicate_actions( + pred, + this.log_store.clone(), + snapshot, + state, + partition_columns.clone(), + this.writer_properties, + deletion_timestamp, + ) + .await?; + if !predicate_actions.is_empty() { + actions.extend(predicate_actions); + } } _ => { let remove_actions = snapshot @@ -515,8 +692,8 @@ impl std::future::IntoFuture for WriteBuilder { .map(|p| p.remove_action(true).into()); actions.extend(remove_actions); } - } - }; + }; + } } let operation = DeltaOperation::Write { @@ -526,8 +703,9 @@ impl std::future::IntoFuture for WriteBuilder { } else { None }, - predicate: this.predicate, + predicate: predicate_str, }; + let version = commit( this.log_store.as_ref(), &actions, @@ -577,10 +755,10 @@ mod tests { use super::*; use crate::operations::{collect_sendable_stream, DeltaOps}; use crate::protocol::SaveMode; - use crate::writer::test_utils::datafusion::get_data; use crate::writer::test_utils::datafusion::write_batch; + use crate::writer::test_utils::datafusion::{get_data, get_data_sorted}; use crate::writer::test_utils::{ - get_delta_schema, get_delta_schema_with_nested_struct, get_record_batch, + get_arrow_schema, get_delta_schema, get_delta_schema_with_nested_struct, get_record_batch, get_record_batch_with_nested_struct, setup_table_with_configuration, }; use crate::DeltaConfigKey; @@ -588,6 +766,7 @@ mod tests { use arrow::datatypes::Schema as ArrowSchema; use arrow_array::{Int32Array, StringArray, TimestampMicrosecondArray}; use arrow_schema::{DataType, TimeUnit}; + use datafusion::prelude::*; use datafusion::{assert_batches_eq, assert_batches_sorted_eq}; use serde_json::{json, Value}; @@ -940,4 +1119,167 @@ mod tests { assert_batches_eq!(&expected, &data); } + + #[tokio::test] + async fn test_replace_where() { + let schema = get_arrow_schema(&None); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "C"])), + Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])), + Arc::new(arrow::array::StringArray::from(vec![ + "2021-02-02", + "2021-02-03", + "2021-02-02", + "2021-02-04", + ])), + ], + ) + .unwrap(); + + let table = DeltaOps::new_in_memory() + .write(vec![batch]) + .with_save_mode(SaveMode::Append) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let batch_add = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["C"])), + Arc::new(arrow::array::Int32Array::from(vec![50])), + Arc::new(arrow::array::StringArray::from(vec!["2023-01-01"])), + ], + ) + .unwrap(); + + let table = DeltaOps(table) + .write(vec![batch_add]) + .with_save_mode(SaveMode::Overwrite) + .with_replace_where(col("id").eq(lit("C"))) + .await + .unwrap(); + assert_eq!(table.version(), 1); + + let expected = [ + "+----+-------+------------+", + "| id | value | modified |", + "+----+-------+------------+", + "| A | 0 | 2021-02-02 |", + "| B | 20 | 2021-02-03 |", + "| C | 50 | 2023-01-01 |", + "+----+-------+------------+", + ]; + let actual = get_data(&table).await; + assert_batches_sorted_eq!(&expected, &actual); + } + + #[tokio::test] + async fn test_replace_where_fail_not_matching_predicate() { + let schema = get_arrow_schema(&None); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "C"])), + Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])), + Arc::new(arrow::array::StringArray::from(vec![ + "2021-02-02", + "2021-02-03", + "2021-02-02", + "2021-02-04", + ])), + ], + ) + .unwrap(); + + let table = DeltaOps::new_in_memory() + .write(vec![batch]) + .with_save_mode(SaveMode::Append) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + // Take clones of these before an operation resulting in error, otherwise it will + // be impossible to refer to an in-memory table + let table_logstore = table.log_store.clone(); + let table_state = table.state.clone().unwrap(); + + // An attempt to write records non comforming to predicate should fail + let batch_fail = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["D"])), + Arc::new(arrow::array::Int32Array::from(vec![1000])), + Arc::new(arrow::array::StringArray::from(vec!["2023-01-01"])), + ], + ) + .unwrap(); + + let table = DeltaOps(table) + .write(vec![batch_fail]) + .with_save_mode(SaveMode::Overwrite) + .with_replace_where(col("id").eq(lit("C"))) + .await; + assert!(table.is_err()); + + // Verify that table state hasn't changed + let table = DeltaTable::new_with_state(table_logstore, table_state); + assert_eq!(table.get_latest_version().await.unwrap(), 0); + } + + #[tokio::test] + async fn test_replace_where_partitioned() { + let schema = get_arrow_schema(&None); + + let batch = get_record_batch(None, false); + + let table = DeltaOps::new_in_memory() + .write(vec![batch]) + .with_partition_columns(["id", "value"]) + .with_save_mode(SaveMode::Append) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let batch_add = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["A", "A", "A"])), + Arc::new(arrow::array::Int32Array::from(vec![11, 13, 15])), + Arc::new(arrow::array::StringArray::from(vec![ + "2024-02-02", + "2024-02-02", + "2024-02-01", + ])), + ], + ) + .unwrap(); + + let table = DeltaOps(table) + .write(vec![batch_add]) + .with_save_mode(SaveMode::Overwrite) + .with_replace_where(col("id").eq(lit("A"))) + .await + .unwrap(); + assert_eq!(table.version(), 1); + + let expected = [ + "+----+-------+------------+", + "| id | value | modified |", + "+----+-------+------------+", + "| A | 11 | 2024-02-02 |", + "| A | 13 | 2024-02-02 |", + "| A | 15 | 2024-02-01 |", + "| B | 2 | 2021-02-02 |", + "| B | 4 | 2021-02-01 |", + "| B | 8 | 2021-02-01 |", + "| B | 9 | 2021-02-01 |", + "+----+-------+------------+", + ]; + let actual = get_data_sorted(&table, "id,value,modified").await; + assert_batches_sorted_eq!(&expected, &actual); + } } diff --git a/crates/core/src/writer/test_utils.rs b/crates/core/src/writer/test_utils.rs index 03552aab84..093ad7cbd0 100644 --- a/crates/core/src/writer/test_utils.rs +++ b/crates/core/src/writer/test_utils.rs @@ -327,6 +327,24 @@ pub mod datafusion { .unwrap() } + pub async fn get_data_sorted(table: &DeltaTable, columns: &str) -> Vec { + let table = DeltaTable::new_with_state( + table.log_store.clone(), + table.state.as_ref().unwrap().clone(), + ); + let ctx = SessionContext::new(); + ctx.register_table("test", Arc::new(table)).unwrap(); + ctx.sql(&format!( + "select {} from test order by {}", + columns, columns + )) + .await + .unwrap() + .collect() + .await + .unwrap() + } + pub async fn write_batch(table: DeltaTable, batch: RecordBatch) -> DeltaTable { DeltaOps(table) .write(vec![batch.clone()]) diff --git a/docs/_build/links.yml b/docs/_build/links.yml index 605cda2d55..5a603059bc 100644 --- a/docs/_build/links.yml +++ b/docs/_build/links.yml @@ -1,4 +1,6 @@ python: DeltaTable: PYTHON_API_URL/delta_table + replaceWhere: https://delta-io.github.io/delta-rs/api/delta_writer/ rust: - DeltaTable: https://docs.rs/deltalake/latest/deltalake/table/struct.DeltaTable.html \ No newline at end of file + DeltaTable: https://docs.rs/deltalake/latest/deltalake/table/struct.DeltaTable.html + replaceWhere: https://docs.rs/deltalake/latest/deltalake/operations/write/struct.WriteBuilder.html#method.with_replace_where \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index 8dfa67f924..99b7dc6cb3 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,6 +1,6 @@ # The deltalake package -This is the documentation for the native Rust/Python implementation of Delta Lake. It is based on the delta-rs Rust library and requires no Spark or JVM dependencies. For the PySpark implementation, see [delta-spark](https://docs.delta.io/latest/api/python/index.html) instead. +This is the documentation for the native Rust/Python implementation of Delta Lake. It is based on the delta-rs Rust library and requires no Spark or JVM dependencies. For the PySpark implementation, see [delta-spark](https://docs.delta.io/latest/api/python/spark/index.html) instead. This module provides the capability to read, write, and manage [Delta Lake](https://delta.io/) tables with Python or Rust without Spark or Java. It uses [Apache Arrow](https://arrow.apache.org/) under the hood, so is compatible with other Arrow-native or integrated libraries such as [pandas](https://pandas.pydata.org/), [DuckDB](https://duckdb.org/), and [Polars](https://www.pola.rs/). diff --git a/docs/src/python/operations.py b/docs/src/python/operations.py new file mode 100644 index 0000000000..700b77421a --- /dev/null +++ b/docs/src/python/operations.py @@ -0,0 +1,21 @@ +def replace_where(): + # --8<-- [start:replace_where] + import pyarrow as pa + from deltalake import write_deltalake + + # Assuming there is already a table in this location with some records where `id = '1'` which we want to overwrite + table_path = "/tmp/my_table" + data = pa.table( + { + "id": pa.array(["1", "1"], pa.string()), + "value": pa.array([11, 12], pa.int64()), + } + ) + write_deltalake( + table_path, + data, + mode="overwrite", + predicate="id = '1'", + engine="rust", + ) + # --8<-- [end:replace_where] diff --git a/docs/src/rust/operations.rs b/docs/src/rust/operations.rs new file mode 100644 index 0000000000..55ab40604f --- /dev/null +++ b/docs/src/rust/operations.rs @@ -0,0 +1,32 @@ +#[tokio::main] +async fn main() -> Result<(), Box> { + // --8<-- [start:replace_where] + // Assuming there is already a table in this location with some records where `id = '1'` which we want to overwrite + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use arrow_array::RecordBatch; + import deltalake::protocol::SaveMode; + + let schema = ArrowSchema::new(vec![ + Field::new("id", DataType::Utf8, true), + Field::new("value", DataType::Int32, true), + ]); + + let data = RecordBatch::try_new( + &schema, + vec![ + Arc::new(arrow::array::StringArray::from(vec!["1", "1"])), + Arc::new(arrow::array::Int32Array::from(vec![11, 12])), + ], + ) + .unwrap(); + + let table = deltalake::open_table("/tmp/my_table").await.unwrap(); + let table = DeltaOps(table) + .write(vec![data]) + .with_save_mode(SaveMode::Overwrite) + .with_replace_where(col("id").eq(lit("1"))) + .await; + // --8<-- [end:replace_where] + + Ok(()) +} \ No newline at end of file diff --git a/docs/usage/writing/index.md b/docs/usage/writing/index.md index fe92572a81..dc8bb62389 100644 --- a/docs/usage/writing/index.md +++ b/docs/usage/writing/index.md @@ -50,4 +50,21 @@ that partition or else the method will raise an error. ``` This method could also be used to insert a new partition if one doesn't -already exist, making this operation idempotent. \ No newline at end of file +already exist, making this operation idempotent. + +## Overwriting part of the table data using a predicate + +!!! note + + This predicate is often called a `replaceWhere` predicate + +When you don’t specify the `predicate`, the overwrite save mode will replace the entire table. +Instead of replacing the entire table (which is costly!), you may want to overwrite only the specific parts of the table that should be changed. +In this case, you can use a `predicate` to overwrite only the relevant records or partitions. + +!!! note + + Data written must conform to the same predicate, i.e. not contain any records that don't match the `predicate` condition, + otherwise the operation will fail + +{{ code_example('operations', 'replace_where', ['replaceWhere'])}} \ No newline at end of file diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 45a35b64b6..d3b956cbfc 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -118,7 +118,35 @@ def write_deltalake( *, schema: Optional[Union[pa.Schema, DeltaSchema]] = ..., partition_by: Optional[Union[List[str], str]] = ..., - mode: Literal["error", "append", "overwrite", "ignore"] = ..., + mode: Literal["error", "append", "ignore"] = ..., + name: Optional[str] = ..., + description: Optional[str] = ..., + configuration: Optional[Mapping[str, Optional[str]]] = ..., + overwrite_schema: bool = ..., + storage_options: Optional[Dict[str, str]] = ..., + large_dtypes: bool = ..., + engine: Literal["rust"], + writer_properties: WriterProperties = ..., + custom_metadata: Optional[Dict[str, str]] = ..., +) -> None: + ... + + +@overload +def write_deltalake( + table_or_uri: Union[str, Path, DeltaTable], + data: Union[ + "pd.DataFrame", + ds.Dataset, + pa.Table, + pa.RecordBatch, + Iterable[pa.RecordBatch], + RecordBatchReader, + ], + *, + schema: Optional[Union[pa.Schema, DeltaSchema]] = ..., + partition_by: Optional[Union[List[str], str]] = ..., + mode: Literal["overwrite"], name: Optional[str] = ..., description: Optional[str] = ..., configuration: Optional[Mapping[str, Optional[str]]] = ..., diff --git a/python/docs/source/index.rst b/python/docs/source/index.rst index 271d3a85a5..0ab6ad86a7 100644 --- a/python/docs/source/index.rst +++ b/python/docs/source/index.rst @@ -16,7 +16,7 @@ Pandas_, DuckDB_, and Polars_. It is not yet as feature-complete as the PySpark implementation of Delta Lake. If you encounter a bug, please let us know in our `GitHub repo`_. -.. _delta-spark: https://docs.delta.io/latest/api/python/index.html +.. _delta-spark: https://docs.delta.io/latest/api/python/spark/index.html .. _Delta Lake: https://delta.io/ .. _Apache Arrow: https://arrow.apache.org/ .. _Pandas: https://pandas.pydata.org/ diff --git a/python/src/lib.rs b/python/src/lib.rs index 7829d73a75..bfed12bb8b 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1386,7 +1386,7 @@ fn write_to_deltalake( builder = builder.with_description(description); }; - if let Some(predicate) = &predicate { + if let Some(predicate) = predicate { builder = builder.with_replace_where(predicate); }; diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 337d68f931..9252dfdd41 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -869,6 +869,8 @@ def test_replace_where_overwrite( value_type: pa.DataType, filter_string: str, ): + table_path = tmp_path + sample_data = pa.table( { "p1": pa.array(["1", "1", "2", "2"], pa.string()), @@ -876,9 +878,9 @@ def test_replace_where_overwrite( "val": pa.array([1, 1, 1, 1], pa.int64()), } ) - write_deltalake(tmp_path, sample_data, mode="overwrite", partition_by=["p1", "p2"]) + write_deltalake(table_path, sample_data, mode="overwrite") - delta_table = DeltaTable(tmp_path) + delta_table = DeltaTable(table_path) assert ( delta_table.to_pyarrow_table().sort_by( [("p1", "ascending"), ("p2", "ascending")] @@ -890,36 +892,101 @@ def test_replace_where_overwrite( { "p1": pa.array(["1", "1"], pa.string()), "p2": pa.array([value_2, value_1], value_type), - "val": pa.array([2, 2], pa.int64()), + "val": pa.array([2, 3], pa.int64()), } ) expected_data = pa.table( { "p1": pa.array(["1", "1", "2", "2"], pa.string()), "p2": pa.array([value_1, value_2, value_1, value_2], value_type), - "val": pa.array([2, 2, 1, 1], pa.int64()), + "val": pa.array([3, 2, 1, 1], pa.int64()), } ) - with pytest.raises( - DeltaError, - match="Generic DeltaTable error: Overwriting data based on predicate is not yet implemented", - ): - write_deltalake( - tmp_path, - sample_data, - mode="overwrite", - predicate="`p1` = 1", - engine="rust", + write_deltalake( + table_path, + sample_data, + mode="overwrite", + predicate="p1 = '1'", + engine="rust", + ) + + delta_table.update_incremental() + assert ( + delta_table.to_pyarrow_table().sort_by( + [("p1", "ascending"), ("p2", "ascending")] ) + == expected_data + ) - delta_table.update_incremental() - assert ( - delta_table.to_pyarrow_table().sort_by( - [("p1", "ascending"), ("p2", "ascending")] - ) - == expected_data + +@pytest.mark.parametrize( + "value_1,value_2,value_type,filter_string", + [ + (1, 2, pa.int64(), "1"), + (False, True, pa.bool_(), "false"), + (date(2022, 1, 1), date(2022, 1, 2), pa.date32(), "2022-01-01"), + ], +) +def test_replace_where_overwrite_partitioned( + tmp_path: pathlib.Path, + value_1: Any, + value_2: Any, + value_type: pa.DataType, + filter_string: str, +): + table_path = tmp_path + + sample_data = pa.table( + { + "p1": pa.array(["1", "1", "2", "2"], pa.string()), + "p2": pa.array([value_1, value_2, value_1, value_2], value_type), + "val": pa.array([1, 1, 1, 1], pa.int64()), + } + ) + write_deltalake( + table_path, sample_data, mode="overwrite", partition_by=["p1", "p2"] + ) + + delta_table = DeltaTable(table_path) + assert ( + delta_table.to_pyarrow_table().sort_by( + [("p1", "ascending"), ("p2", "ascending")] ) + == sample_data + ) + + replace_data = pa.table( + { + "p1": pa.array(["1", "1"], pa.string()), + "p2": pa.array([value_2, value_1], value_type), + "val": pa.array([2, 3], pa.int64()), + } + ) + expected_data = pa.table( + { + "p1": pa.array(["1", "1", "2", "2"], pa.string()), + "p2": pa.array([value_1, value_2, value_1, value_2], value_type), + "val": pa.array([3, 2, 1, 1], pa.int64()), + } + ) + + write_deltalake( + table_path, + replace_data, + mode="overwrite", + partition_by=["p1", "p2"], + predicate="p1 = '1'", + engine="rust", + ) + + delta_table.update_incremental() + assert ( + delta_table.to_pyarrow_table().sort_by( + [("p1", "ascending"), ("p2", "ascending")] + ) + == expected_data + ) def test_partition_overwrite_with_new_partition( From 3ec28cce759b4d544f29e171aa4691b03fbfa518 Mon Sep 17 00:00:00 2001 From: emcake <3726783+emcake@users.noreply.github.com> Date: Thu, 1 Feb 2024 05:18:19 +0000 Subject: [PATCH 4/5] fix: made generalize_filter less permissive, also added more cases (#2149) # Description This fixes an observed bug where the partition generalization was failing. A minimal repro was: ```python from deltalake import DeltaTable, write_deltalake import pyarrow as pa import pandas as pd data = pd.DataFrame.from_dict( { "a": [], "b": [], "c": [], } ) schema = pa.schema( [ ("a", pa.string()), ("b", pa.int32()), ("c", pa.int32()), ] ) table = pa.Table.from_pandas(data, schema=schema) write_deltalake( "test", table, mode="overwrite", partition_by="a" ) new_data = pd.DataFrame.from_dict( { "a": ["a", "a", "a"], "b": [None, 2, 4], "c": [5, 6, 7], } ) new_table = pa.Table.from_pandas(new_data, schema) dt = DeltaTable("test") dt.merge( source=new_table, predicate="s.b IS NULL", source_alias="s", target_alias="t", ).when_matched_update_all().when_not_matched_insert_all().execute() ``` This would cause a DataFusion error: ``` _internal.DeltaError: Generic DeltaTable error: Optimizer rule 'simplify_expressions' failed caused by Schema error: No field named s.b. Valid fields are t.a, t.b, t.c, t.__delta_rs_path. ``` This was because when generalizing the match predicate to use as a partition filter, an expression `IsNull(Column('b', 's'))` was deemed to not reference the table `s`. This PR does two things: 1. **Tightens up the referencing logic.** Previous it conflated 'definitely does not reference' with 'don't know if it references or not'. This tightens up the logic and means the plan is less likely to generalize out a partition filter if we can't be sure that it can be generalized. The ability to generalize over arbitrary binary expressions has been tightened too - previously behaviour would permit that `generalizable_expression OR non_generalizable_expression` would reduce to `generalizable_expression`. This isn't correct in the case of partition filters, because this would cause us to leave out half the cases that should be extracted from the target table. 2. **Adds a couple of extra cases where we know if a target reference exists.** Namely, `is null` can now be checked for source table references and `literal` is now re-covered, as previously it was working by taking advantage of looser logic that has since been tightened. Co-authored-by: David Blajda --- crates/core/src/operations/merge/mod.rs | 143 ++++++++++++++++++++---- 1 file changed, 121 insertions(+), 22 deletions(-) diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 07f65f4cf8..b1f89c4c12 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -51,7 +51,7 @@ use datafusion_common::{Column, DFSchema, ScalarValue, TableReference}; use datafusion_expr::expr::Placeholder; use datafusion_expr::{col, conditional_expressions::CaseBuilder, lit, when, Expr, JoinType}; use datafusion_expr::{ - BinaryExpr, Distinct, Extension, Filter, LogicalPlan, LogicalPlanBuilder, Projection, + BinaryExpr, Distinct, Extension, Filter, LogicalPlan, LogicalPlanBuilder, Operator, Projection, UserDefinedLogicalNode, UNNAMED_TABLE, }; use futures::future::BoxFuture; @@ -697,16 +697,34 @@ fn generalize_filter( target_name: &TableReference, placeholders: &mut HashMap, ) -> Option { - fn references_table(expr: &Expr, table: &TableReference) -> Option { - match expr { + #[derive(Debug)] + enum ReferenceTableCheck { + HasReference(String), + NoReference, + Unknown, + } + impl ReferenceTableCheck { + fn has_reference(&self) -> bool { + match self { + ReferenceTableCheck::HasReference(_) => true, + _ => false, + } + } + } + fn references_table(expr: &Expr, table: &TableReference) -> ReferenceTableCheck { + let res = match expr { Expr::Alias(alias) => references_table(&alias.expr, table), - Expr::Column(col) => col.relation.as_ref().and_then(|rel| { - if rel == table { - Some(col.name.to_owned()) - } else { - None - } - }), + Expr::Column(col) => col + .relation + .as_ref() + .map(|rel| { + if rel == table { + ReferenceTableCheck::HasReference(col.name.to_owned()) + } else { + ReferenceTableCheck::NoReference + } + }) + .unwrap_or(ReferenceTableCheck::NoReference), Expr::Negative(neg) => references_table(neg, table), Expr::Cast(cast) => references_table(&cast.expr, table), Expr::TryCast(try_cast) => references_table(&try_cast.expr, table), @@ -714,17 +732,22 @@ fn generalize_filter( if func.args.len() == 1 { references_table(&func.args[0], table) } else { - None + ReferenceTableCheck::Unknown } } - _ => None, - } + Expr::IsNull(inner) => references_table(&inner, table), + Expr::Literal(_) => ReferenceTableCheck::NoReference, + _ => ReferenceTableCheck::Unknown, + }; + res } match predicate { Expr::BinaryExpr(binary) => { - if references_table(&binary.right, source_name).is_some() { - if let Some(left_target) = references_table(&binary.left, target_name) { + if references_table(&binary.right, source_name).has_reference() { + if let ReferenceTableCheck::HasReference(left_target) = + references_table(&binary.left, target_name) + { if partition_columns.contains(&left_target) { let placeholder_name = format!("{left_target}_{}", placeholders.len()); @@ -745,8 +768,10 @@ fn generalize_filter( } return None; } - if references_table(&binary.left, source_name).is_some() { - if let Some(right_target) = references_table(&binary.right, target_name) { + if references_table(&binary.left, source_name).has_reference() { + if let ReferenceTableCheck::HasReference(right_target) = + references_table(&binary.right, target_name) + { if partition_columns.contains(&right_target) { let placeholder_name = format!("{right_target}_{}", placeholders.len()); @@ -783,19 +808,45 @@ fn generalize_filter( placeholders, ); - match (left, right) { + let res = match (left, right) { (None, None) => None, - (None, Some(r)) => Some(r), - (Some(l), None) => Some(l), + (None, Some(one_side)) | (Some(one_side), None) => { + // in the case of an AND clause, it's safe to generalize the filter down to just one side of the AND. + // this is because this filter will be more permissive than the actual predicate, so we know that + // we will catch all data that could be matched by the predicate. For OR this is not the case - we + // could potentially eliminate one side of the predicate and the filter would only match half the + // cases that would have satisfied the match predicate. + match binary.op { + Operator::And => Some(one_side), + Operator::Or => None, + _ => None, + } + } (Some(l), Some(r)) => Expr::BinaryExpr(BinaryExpr { left: l.into(), op: binary.op, right: r.into(), }) .into(), - } + }; + res } - other => Some(other), + other => match references_table(&other, source_name) { + ReferenceTableCheck::HasReference(col) => { + let placeholder_name = format!("{col}_{}", placeholders.len()); + + let placeholder = Expr::Placeholder(datafusion_expr::expr::Placeholder { + id: placeholder_name.clone(), + data_type: None, + }); + + placeholders.insert(placeholder_name, other); + + Some(placeholder) + } + ReferenceTableCheck::NoReference => Some(other), + ReferenceTableCheck::Unknown => None, + }, } } @@ -1484,6 +1535,7 @@ mod tests { use datafusion_expr::Expr; use datafusion_expr::LogicalPlanBuilder; use datafusion_expr::Operator; + use itertools::Itertools; use serde_json::json; use std::collections::HashMap; use std::ops::Neg; @@ -2430,6 +2482,51 @@ mod tests { assert_eq!(generalized, expected_filter); } + #[tokio::test] + async fn test_generalize_filter_with_partitions_nulls() { + let source = TableReference::parse_str("source"); + let target = TableReference::parse_str("target"); + + let source_id = col(Column::new(source.clone().into(), "id")); + let target_id = col(Column::new(target.clone().into(), "id")); + + // source.id = target.id OR (source.id is null and target.id is null) + let parsed_filter = (source_id.clone().eq(target_id.clone())) + .or(source_id.clone().is_null().and(target_id.clone().is_null())); + + let mut placeholders = HashMap::default(); + + let generalized = generalize_filter( + parsed_filter, + &vec!["id".to_owned()], + &source, + &target, + &mut placeholders, + ) + .unwrap(); + + // id_1 = target.id OR (id_2 and target.id is null) + let expected_filter = Expr::Placeholder(Placeholder { + id: "id_0".to_owned(), + data_type: None, + }) + .eq(target_id.clone()) + .or(Expr::Placeholder(Placeholder { + id: "id_1".to_owned(), + data_type: None, + }) + .and(target_id.clone().is_null())); + + assert!(placeholders.len() == 2); + + let captured_expressions = placeholders.values().collect_vec(); + + assert!(captured_expressions.contains(&&source_id)); + assert!(captured_expressions.contains(&&source_id.is_null())); + + assert_eq!(generalized, expected_filter); + } + #[tokio::test] async fn test_generalize_filter_with_partitions_captures_expression() { // Check that when generalizing the filter, the placeholder map captures the expression needed to make the statement the same @@ -2474,6 +2571,7 @@ mod tests { let source = TableReference::parse_str("source"); let target = TableReference::parse_str("target"); + // source.id = target.id and target.id = 'C' let parsed_filter = col(Column::new(source.clone().into(), "id")) .eq(col(Column::new(target.clone().into(), "id"))) .and(col(Column::new(target.clone().into(), "id")).eq(lit("C"))); @@ -2489,6 +2587,7 @@ mod tests { ) .unwrap(); + // id_0 = target.id and target.id = 'C' let expected_filter = Expr::Placeholder(Placeholder { id: "id_0".to_owned(), data_type: None, From bdb03a38d9734244bea4bf380f7141f15fc8acd8 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Wed, 31 Jan 2024 23:50:35 -0800 Subject: [PATCH 5/5] fix: properly load field metadata from a table with identity columns Fixes #2152 --- crates/core/src/kernel/models/schema.rs | 16 ++++++++++++++++ crates/core/src/lib.rs | 9 +++++++++ .../_delta_log/00000000000000000000.json | 3 +++ 3 files changed, 28 insertions(+) create mode 100644 crates/test/tests/data/issue-2152/_delta_log/00000000000000000000.json diff --git a/crates/core/src/kernel/models/schema.rs b/crates/core/src/kernel/models/schema.rs index a208c2e8cc..874bade71d 100644 --- a/crates/core/src/kernel/models/schema.rs +++ b/crates/core/src/kernel/models/schema.rs @@ -25,6 +25,8 @@ pub enum MetadataValue { Number(i32), /// A string value String(String), + /// A Boolean value + Boolean(bool), } impl From for MetadataValue { @@ -45,6 +47,12 @@ impl From for MetadataValue { } } +impl From for MetadataValue { + fn from(value: bool) -> Self { + Self::Boolean(value) + } +} + impl From for MetadataValue { fn from(value: Value) -> Self { Self::String(value.to_string()) @@ -184,6 +192,7 @@ impl StructField { let phys_name = self.get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName); match phys_name { None => Ok(&self.name), + Some(MetadataValue::Boolean(_)) => Ok(&self.name), Some(MetadataValue::String(s)) => Ok(s), Some(MetadataValue::Number(_)) => Err(Error::MetadataError( "Unexpected type for physical name".to_string(), @@ -850,4 +859,11 @@ mod tests { Invariant::new("a_map.value.element.d", "a_map.value.element.d < 4") ); } + + /// + #[test] + fn test_identity_columns() { + let buf = r#"{"type":"struct","fields":[{"name":"ID_D_DATE","type":"long","nullable":true,"metadata":{"delta.identity.start":1,"delta.identity.step":1,"delta.identity.allowExplicitInsert":false}},{"name":"TXT_DateKey","type":"string","nullable":true,"metadata":{}}]}"#; + let schema: StructType = serde_json::from_str(buf).expect("Failed to load"); + } } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index fa5478e34a..b24faf248e 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -676,4 +676,13 @@ mod tests { DeltaTableError::InvalidTableLocation(_expected_error_msg), )) } + + /// + #[tokio::test] + async fn test_identity_column() { + let path = "../test/tests/data/issue-2152"; + let _ = crate::open_table(path) + .await + .expect("Failed to load the table"); + } } diff --git a/crates/test/tests/data/issue-2152/_delta_log/00000000000000000000.json b/crates/test/tests/data/issue-2152/_delta_log/00000000000000000000.json new file mode 100644 index 0000000000..45256b6236 --- /dev/null +++ b/crates/test/tests/data/issue-2152/_delta_log/00000000000000000000.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1706770085847,"operation":"CREATE TABLE","operationParameters":{"partitionBy":"[]","description":null,"isManaged":"true","properties":"{}","statsOnLoad":false},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"tags":{"restoresDeletedRows":"false"},"engineInfo":"Databricks-Runtime/14.1.x-photon-scala2.12","txnId":"5ba2d1f4-09e0-4013-920a-92b057185128"}} +{"metaData":{"id":"7791991a-60e9-4a8f-bff0-ccffec779dc4","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"ID_D_DATE\",\"type\":\"long\",\"nullable\":true,\"metadata\":{\"delta.identity.start\":1,\"delta.identity.step\":1,\"delta.identity.allowExplicitInsert\":false}},{\"name\":\"TXT_DateKey\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1706770085202}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":6}}