From 28ad3950d90573fa8ff413c336b657b8561e1d41 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 27 Apr 2024 19:08:43 +0200 Subject: [PATCH] feat(rust): advance state in post commit (#2396) # Description We advance the state in the post commit now, so it's done in a single location as per suggestion from @Blajda here: https://github.com/delta-io/delta-rs/pull/2391#issuecomment-2041500757 This PR also supersedes this one: https://github.com/delta-io/delta-rs/pull/2280 # Related Issue(s) - fixes #2279 - fixes #2262 --- crates/core/src/operations/constraints.rs | 7 +- crates/core/src/operations/delete.rs | 35 ++---- .../core/src/operations/drop_constraints.rs | 7 +- crates/core/src/operations/merge/mod.rs | 37 +++--- crates/core/src/operations/transaction/mod.rs | 111 +++++++++--------- crates/core/src/operations/update.rs | 41 +++---- crates/core/src/operations/write.rs | 12 +- crates/core/tests/command_merge.rs | 12 +- 8 files changed, 111 insertions(+), 151 deletions(-) diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index 91539ef1a6..6ab7199f47 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -198,9 +198,10 @@ impl std::future::IntoFuture for ConstraintBuilder { .build(Some(&this.snapshot), this.log_store.clone(), operation)? .await?; - this.snapshot - .merge(commit.data.actions, &commit.data.operation, commit.version)?; - Ok(DeltaTable::new_with_state(this.log_store, this.snapshot)) + Ok(DeltaTable::new_with_state( + this.log_store, + commit.snapshot(), + )) }) } } diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 8d13d51b4e..c9a4792d11 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -186,16 +186,16 @@ async fn excute_non_empty_expr( async fn execute( predicate: Option, log_store: LogStoreRef, - snapshot: &DeltaTableState, + snapshot: DeltaTableState, state: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, -) -> DeltaResult<((Vec, i64, Option), DeleteMetrics)> { +) -> DeltaResult<(DeltaTableState, DeleteMetrics)> { let exec_start = Instant::now(); let mut metrics = DeleteMetrics::default(); let scan_start = Instant::now(); - let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files(&snapshot, log_store.clone(), &state, predicate.clone()).await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis(); let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); @@ -205,7 +205,7 @@ async fn execute( } else { let write_start = Instant::now(); let add = excute_non_empty_expr( - snapshot, + &snapshot, log_store.clone(), &state, &predicate, @@ -258,21 +258,14 @@ async fn execute( predicate: Some(fmt_expr_to_sql(&predicate)?), }; if actions.is_empty() { - return Ok(((actions, snapshot.version(), None), metrics)); + return Ok((snapshot.clone(), metrics)); } let commit = CommitBuilder::from(commit_properties) .with_actions(actions) - .build(Some(snapshot), log_store, operation)? + .build(Some(&snapshot), log_store, operation)? .await?; - Ok(( - ( - commit.data.actions, - commit.version, - Some(commit.data.operation), - ), - metrics, - )) + Ok((commit.snapshot(), metrics)) } impl std::future::IntoFuture for DeleteBuilder { @@ -305,22 +298,20 @@ impl std::future::IntoFuture for DeleteBuilder { None => None, }; - let ((actions, version, operation), metrics) = execute( + let (new_snapshot, metrics) = execute( predicate, this.log_store.clone(), - &this.snapshot, + this.snapshot, state, this.writer_properties, this.commit_properties, ) .await?; - if let Some(op) = &operation { - this.snapshot.merge(actions, op, version)?; - } - - let table = DeltaTable::new_with_state(this.log_store, this.snapshot); - Ok((table, metrics)) + Ok(( + DeltaTable::new_with_state(this.log_store, new_snapshot), + metrics, + )) }) } } diff --git a/crates/core/src/operations/drop_constraints.rs b/crates/core/src/operations/drop_constraints.rs index 081262753e..f1d320d2ff 100644 --- a/crates/core/src/operations/drop_constraints.rs +++ b/crates/core/src/operations/drop_constraints.rs @@ -91,9 +91,10 @@ impl std::future::IntoFuture for DropConstraintBuilder { .build(Some(&this.snapshot), this.log_store.clone(), operation)? .await?; - this.snapshot - .merge(commit.data.actions, &commit.data.operation, commit.version)?; - Ok(DeltaTable::new_with_state(this.log_store, this.snapshot)) + Ok(DeltaTable::new_with_state( + this.log_store, + commit.snapshot(), + )) }) } } diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 44c3b499c1..9d4d5aeb8d 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -932,7 +932,7 @@ async fn execute( predicate: Expression, source: DataFrame, log_store: LogStoreRef, - snapshot: &DeltaTableState, + snapshot: DeltaTableState, state: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, @@ -942,7 +942,7 @@ async fn execute( match_operations: Vec, not_match_target_operations: Vec, not_match_source_operations: Vec, -) -> DeltaResult<((Vec, i64, Option), MergeMetrics)> { +) -> DeltaResult<(DeltaTableState, MergeMetrics)> { let mut metrics = MergeMetrics::default(); let exec_start = Instant::now(); @@ -987,7 +987,7 @@ async fn execute( let scan_config = DeltaScanConfigBuilder::default() .with_file_column(true) .with_parquet_pushdown(false) - .build(snapshot)?; + .build(&snapshot)?; let target_provider = Arc::new(DeltaTableProvider::try_new( snapshot.clone(), @@ -1017,7 +1017,7 @@ async fn execute( } else { try_construct_early_filter( predicate.clone(), - snapshot, + &snapshot, &state, &source, &source_name, @@ -1370,7 +1370,7 @@ async fn execute( let rewrite_start = Instant::now(); let add_actions = write_execution_plan( - Some(snapshot), + Some(&snapshot), state.clone(), write, table_partition_cols.clone(), @@ -1449,21 +1449,14 @@ async fn execute( }; if actions.is_empty() { - return Ok(((actions, snapshot.version(), None), metrics)); + return Ok((snapshot, metrics)); } let commit = CommitBuilder::from(commit_properties) .with_actions(actions) - .build(Some(snapshot), log_store.clone(), operation)? + .build(Some(&snapshot), log_store.clone(), operation)? .await?; - Ok(( - ( - commit.data.actions, - commit.version, - Some(commit.data.operation), - ), - metrics, - )) + Ok((commit.snapshot(), metrics)) } fn remove_table_alias(expr: Expr, table_alias: &str) -> Expr { @@ -1521,11 +1514,11 @@ impl std::future::IntoFuture for MergeBuilder { session.state() }); - let ((actions, version, operation), metrics) = execute( + let (snapshot, metrics) = execute( this.predicate, this.source, this.log_store.clone(), - &this.snapshot, + this.snapshot, state, this.writer_properties, this.commit_properties, @@ -1538,12 +1531,10 @@ impl std::future::IntoFuture for MergeBuilder { ) .await?; - if let Some(op) = &operation { - this.snapshot.merge(actions, op, version)?; - } - let table = DeltaTable::new_with_state(this.log_store, this.snapshot); - - Ok((table, metrics)) + Ok(( + DeltaTable::new_with_state(this.log_store, snapshot), + metrics, + )) }) } } diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 6d5f7f731a..6606a8c339 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -207,7 +207,7 @@ pub trait TableReference: Send + Sync { fn metadata(&self) -> &Metadata; /// Try to cast this table reference to a `EagerSnapshot` - fn eager_snapshot(&self) -> Option<&EagerSnapshot>; + fn eager_snapshot(&self) -> &EagerSnapshot; } impl TableReference for EagerSnapshot { @@ -223,8 +223,8 @@ impl TableReference for EagerSnapshot { self.table_config() } - fn eager_snapshot(&self) -> Option<&EagerSnapshot> { - Some(self) + fn eager_snapshot(&self) -> &EagerSnapshot { + self } } @@ -241,8 +241,8 @@ impl TableReference for DeltaTableState { self.snapshot.metadata() } - fn eager_snapshot(&self) -> Option<&EagerSnapshot> { - Some(&self.snapshot) + fn eager_snapshot(&self) -> &EagerSnapshot { + &self.snapshot } } @@ -512,13 +512,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { // unwrap() is safe here due to the above check // TODO: refactor to only depend on TableReference Trait - let read_snapshot = - this.table_data - .unwrap() - .eager_snapshot() - .ok_or(DeltaTableError::Generic( - "Expected an instance of EagerSnapshot".to_owned(), - ))?; + let read_snapshot = this.table_data.unwrap().eager_snapshot(); let mut attempt_number = 1; while attempt_number <= this.max_retries { @@ -595,36 +589,49 @@ pub struct PostCommit<'a> { impl<'a> PostCommit<'a> { /// Runs the post commit activities - async fn run_post_commit_hook( - &self, - version: i64, - commit_data: &CommitData, - ) -> DeltaResult<()> { - if self.create_checkpoint { - self.create_checkpoint(&self.table_data, &self.log_store, version, commit_data) - .await? + async fn run_post_commit_hook(&self) -> DeltaResult { + if let Some(table) = self.table_data { + let mut snapshot = table.eager_snapshot().clone(); + if self.version - snapshot.version() > 1 { + // This may only occur during concurrent write actions. We need to update the state first to - 1 + // then we can advance. + snapshot + .update(self.log_store.clone(), Some(self.version - 1)) + .await?; + snapshot.advance(vec![&self.data])?; + } else { + snapshot.advance(vec![&self.data])?; + } + let state = DeltaTableState { + app_transaction_version: HashMap::new(), + snapshot, + }; + // Execute each hook + if self.create_checkpoint { + self.create_checkpoint(&state, &self.log_store, self.version) + .await?; + } + Ok(state) + } else { + let state = DeltaTableState::try_new( + &Path::default(), + self.log_store.object_store(), + Default::default(), + Some(self.version), + ) + .await?; + Ok(state) } - Ok(()) } async fn create_checkpoint( &self, - table: &Option<&'a dyn TableReference>, + table_state: &DeltaTableState, log_store: &LogStoreRef, version: i64, - commit_data: &CommitData, ) -> DeltaResult<()> { - if let Some(table) = table { - let checkpoint_interval = table.config().checkpoint_interval() as i64; - if ((version + 1) % checkpoint_interval) == 0 { - // We have to advance the snapshot otherwise we can't create a checkpoint - let mut snapshot = table.eager_snapshot().unwrap().clone(); - snapshot.advance(vec![commit_data])?; - let state = DeltaTableState { - app_transaction_version: HashMap::new(), - snapshot, - }; - create_checkpoint_for(version, &state, log_store.as_ref()).await? - } + let checkpoint_interval = table_state.config().checkpoint_interval() as i64; + if ((version + 1) % checkpoint_interval) == 0 { + create_checkpoint_for(version, table_state, log_store.as_ref()).await? } Ok(()) } @@ -632,22 +639,22 @@ impl<'a> PostCommit<'a> { /// A commit that successfully completed pub struct FinalizedCommit { - /// The winning version number of the commit + /// The new table state after a commmit + pub snapshot: DeltaTableState, + + /// Version of the finalized commit pub version: i64, - /// The data that was comitted to the log store - pub data: CommitData, } impl FinalizedCommit { - /// The materialized version of the commit + /// The new table state after a commmit + pub fn snapshot(&self) -> DeltaTableState { + self.snapshot.clone() + } + /// Version of the finalized commit pub fn version(&self) -> i64 { self.version } - - /// Data used to write the commit - pub fn data(&self) -> &CommitData { - &self.data - } } impl<'a> std::future::IntoFuture for PostCommit<'a> { @@ -658,15 +665,13 @@ impl<'a> std::future::IntoFuture for PostCommit<'a> { let this = self; Box::pin(async move { - match this.run_post_commit_hook(this.version, &this.data).await { - Ok(_) => { - return Ok(FinalizedCommit { - version: this.version, - data: this.data, - }) - } - Err(err) => return Err(err), - }; + match this.run_post_commit_hook().await { + Ok(snapshot) => Ok(FinalizedCommit { + snapshot, + version: this.version, + }), + Err(err) => Err(err), + } }) } } diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 9f4f6d51a3..9847eda104 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -41,7 +41,7 @@ use futures::future::BoxFuture; use parquet::file::properties::WriterProperties; use serde::Serialize; -use super::transaction::PROTOCOL; +use super::transaction::{FinalizedCommit, PROTOCOL}; use super::write::write_execution_plan; use super::{ datafusion_utils::Expression, @@ -168,12 +168,12 @@ async fn execute( predicate: Option, updates: HashMap, log_store: LogStoreRef, - snapshot: &DeltaTableState, + snapshot: DeltaTableState, state: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, safe_cast: bool, -) -> DeltaResult<((Vec, i64, Option), UpdateMetrics)> { +) -> DeltaResult<(DeltaTableState, UpdateMetrics)> { // Validate the predicate and update expressions. // // If the predicate is not set, then all files need to be updated. @@ -189,7 +189,7 @@ async fn execute( let version = snapshot.version(); if updates.is_empty() { - return Ok(((Vec::new(), version, None), metrics)); + return Ok((snapshot, metrics)); } let predicate = match predicate { @@ -214,11 +214,11 @@ async fn execute( let table_partition_cols = current_metadata.partition_columns.clone(); let scan_start = Instant::now(); - let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files(&snapshot, log_store.clone(), &state, predicate.clone()).await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis() as u64; if candidates.candidates.is_empty() { - return Ok(((Vec::new(), version, None), metrics)); + return Ok((snapshot, metrics)); } let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); @@ -226,7 +226,7 @@ async fn execute( let execution_props = state.execution_props(); // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches - let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) + let scan = DeltaScanBuilder::new(&snapshot, log_store.clone(), &state) .with_files(&candidates.candidates) .build() .await?; @@ -350,7 +350,7 @@ async fn execute( )?); let add_actions = write_execution_plan( - Some(snapshot), + Some(&snapshot), state.clone(), projection.clone(), table_partition_cols.clone(), @@ -416,17 +416,10 @@ async fn execute( let commit = CommitBuilder::from(commit_properties) .with_actions(actions) - .build(Some(snapshot), log_store, operation)? + .build(Some(&snapshot), log_store, operation)? .await?; - Ok(( - ( - commit.data.actions, - commit.version, - Some(commit.data.operation), - ), - metrics, - )) + Ok((commit.snapshot(), metrics)) } impl std::future::IntoFuture for UpdateBuilder { @@ -449,11 +442,11 @@ impl std::future::IntoFuture for UpdateBuilder { session.state() }); - let ((actions, version, operation), metrics) = execute( + let (snapshot, metrics) = execute( this.predicate, this.updates, this.log_store.clone(), - &this.snapshot, + this.snapshot, state, this.writer_properties, this.commit_properties, @@ -461,12 +454,10 @@ impl std::future::IntoFuture for UpdateBuilder { ) .await?; - if let Some(op) = &operation { - this.snapshot.merge(actions, op, version)?; - } - - let table = DeltaTable::new_with_state(this.log_store, this.snapshot); - Ok((table, metrics)) + Ok(( + DeltaTable::new_with_state(this.log_store, snapshot), + metrics, + )) }) } } diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 475abefbe2..8ecfb3078b 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -808,17 +808,7 @@ impl std::future::IntoFuture for WriteBuilder { )? .await?; - // TODO we do not have the table config available, but since we are merging only our newly - // created actions, it may be safe to assume, that we want to include all actions. - // then again, having only some tombstones may be misleading. - if let Some(mut snapshot) = this.snapshot { - snapshot.merge(commit.data.actions, &commit.data.operation, commit.version)?; - Ok(DeltaTable::new_with_state(this.log_store, snapshot)) - } else { - let mut table = DeltaTable::new(this.log_store, Default::default()); - table.update().await?; - Ok(table) - } + Ok(DeltaTable::new_with_state(this.log_store, commit.snapshot)) }) } } diff --git a/crates/core/tests/command_merge.rs b/crates/core/tests/command_merge.rs index c6e7f09f2f..988891f332 100644 --- a/crates/core/tests/command_merge.rs +++ b/crates/core/tests/command_merge.rs @@ -177,17 +177,7 @@ async fn test_merge_concurrent_different_partition() { // TODO: Currently it throws a Version mismatch error, but the merge commit was successfully // This bug needs to be fixed, see pull request #2280 - assert!(!matches!( - result.as_ref().unwrap_err(), - DeltaTableError::Transaction { .. } - )); - assert!(matches!( - result.as_ref().unwrap_err(), - DeltaTableError::Generic(_) - )); - if let DeltaTableError::Generic(msg) = result.unwrap_err() { - assert_eq!(msg, "Version mismatch"); - } + assert!(matches!(result.as_ref().is_ok(), true)); } #[tokio::test]