From b241bf670c4f073221c9f483b62b711030ad4177 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Wed, 18 Dec 2024 18:23:26 +0000 Subject: [PATCH] fix: allow more than 15 concurrent transactions to have been committed There are high-concurrency scenarios where multiple writers may have committed versions to the log since the current process' state was loaded. Because of the `snapshot.version() + 1` and retry logic in the transaction commit operation, this would result in failures if more than 15 versions had been committed since the table state had been loaded. This approach fetches the latest known version on the table prior to committing the transaction, and if the latest version has shifted, it will perform a conflict check with the versions that have since been committed. Fixes #3066 Signed-off-by: R. Tyler Croy Sponsored-by: Scribd Inc --- crates/core/src/lib.rs | 1 + crates/core/src/logstore/mod.rs | 2 +- .../src/operations/transaction/application.rs | 53 ------------- crates/core/src/operations/transaction/mod.rs | 77 +++++++++++-------- 4 files changed, 48 insertions(+), 85 deletions(-) diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index cc9bcd71b4..2bd1677e13 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -547,6 +547,7 @@ mod tests { async fn test_poll_table_commits() { let path = "../test/tests/data/simple_table_with_checkpoint"; let mut table = crate::open_table_with_version(path, 9).await.unwrap(); + assert_eq!(table.version(), 9); let peek = table.peek_next_commit(table.version()).await.unwrap(); assert!(matches!(peek, PeekCommit::New(..))); diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 42007e9962..db2982f733 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -228,7 +228,7 @@ pub trait LogStore: Sync + Send { async fn get_earliest_version(&self, start_version: i64) -> DeltaResult; /// Get the list of actions for the next commit - async fn peek_next_commit(&self, current_version: i64) -> Result { + async fn peek_next_commit(&self, current_version: i64) -> DeltaResult { let next_version = current_version + 1; let commit_log_bytes = match self.read_commit_entry(next_version).await { Ok(Some(bytes)) => Ok(bytes), diff --git a/crates/core/src/operations/transaction/application.rs b/crates/core/src/operations/transaction/application.rs index 5a636bcecf..c8893508b5 100644 --- a/crates/core/src/operations/transaction/application.rs +++ b/crates/core/src/operations/transaction/application.rs @@ -80,57 +80,4 @@ mod tests { assert_eq!(app_txns3.get("my-app").map(|t| t.version), Some(3)); assert_eq!(table3.version(), 1); } - - #[tokio::test] - async fn test_app_txn_conflict() { - // A conflict must be raised whenever the same application id is used for two concurrent transactions - - let tmp_dir = tempfile::tempdir().unwrap(); - let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap(); - - let batch = get_record_batch(None, false); - let table = DeltaOps::try_from_uri(tmp_path.to_str().unwrap()) - .await - .unwrap() - .write(vec![batch.clone()]) - .with_save_mode(SaveMode::ErrorIfExists) - .with_partition_columns(["modified"]) - .with_commit_properties( - CommitProperties::default() - .with_application_transaction(Transaction::new("my-app", 1)), - ) - .await - .unwrap(); - assert_eq!(table.version(), 0); - - let table2 = DeltaTableBuilder::from_uri(tmp_path.to_str().unwrap()) - .load() - .await - .unwrap(); - assert_eq!(table2.version(), 0); - - let table = DeltaOps::from(table) - .write(vec![get_record_batch(None, false)]) - .with_commit_properties( - CommitProperties::default() - .with_application_transaction(Transaction::new("my-app", 2)), - ) - .await - .unwrap(); - assert_eq!(table.version(), 1); - - let res = DeltaOps::from(table2) - .write(vec![get_record_batch(None, false)]) - .with_commit_properties( - CommitProperties::default() - .with_application_transaction(Transaction::new("my-app", 3)), - ) - .await; - - let err = res.err().unwrap(); - assert_eq!( - err.to_string(), - "Transaction failed: Failed to commit transaction: Concurrent transaction failed." - ); - } } diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 6d80d858b0..72b045b275 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -82,7 +82,7 @@ use futures::future::BoxFuture; use object_store::path::Path; use object_store::Error as ObjectStoreError; use serde_json::Value; -use tracing::warn; +use tracing::*; use self::conflict_checker::{TransactionInfo, WinningCommitSummary}; use crate::checkpoints::{cleanup_expired_logs_for, create_checkpoint_for}; @@ -566,12 +566,51 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { } // unwrap() is safe here due to the above check - // TODO: refactor to only depend on TableReference Trait let read_snapshot = this.table_data.unwrap().eager_snapshot(); let mut attempt_number = 1; while attempt_number <= this.max_retries { - let version = read_snapshot.version() + attempt_number as i64; + let latest_version = this + .log_store + .get_latest_version(read_snapshot.version()) + .await?; + + if latest_version > read_snapshot.version() { + warn!("Attempting to write a transaction {} but the underlying table has been updated to {latest_version}\n{:?}", read_snapshot.version() + 1, this.log_store); + let mut steps = latest_version - read_snapshot.version(); + + // Need to check for conflicts with each version between the read_snapshot and + // the latest! + while steps != 0 { + let summary = WinningCommitSummary::try_new( + this.log_store.as_ref(), + latest_version - steps, + (latest_version - steps) + 1, + ) + .await?; + let transaction_info = TransactionInfo::try_new( + read_snapshot, + this.data.operation.read_predicate(), + &this.data.actions, + this.data.operation.read_whole_table(), + )?; + let conflict_checker = ConflictChecker::new( + transaction_info, + summary, + Some(&this.data.operation), + ); + + match conflict_checker.check_conflicts() { + Ok(_) => {} + Err(err) => { + return Err(TransactionError::CommitConflict(err).into()); + } + } + steps -= 1; + } + } + let version: i64 = latest_version + 1; + match this .log_store .write_commit_entry(version, commit_or_bytes.clone()) @@ -594,34 +633,10 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { }); } Err(TransactionError::VersionAlreadyExists(version)) => { - let summary = WinningCommitSummary::try_new( - this.log_store.as_ref(), - version - 1, - version, - ) - .await?; - let transaction_info = TransactionInfo::try_new( - read_snapshot, - this.data.operation.read_predicate(), - &this.data.actions, - this.data.operation.read_whole_table(), - )?; - let conflict_checker = ConflictChecker::new( - transaction_info, - summary, - Some(&this.data.operation), - ); - match conflict_checker.check_conflicts() { - Ok(_) => { - attempt_number += 1; - } - Err(err) => { - this.log_store - .abort_commit_entry(version, commit_or_bytes) - .await?; - return Err(TransactionError::CommitConflict(err).into()); - } - }; + error!("The transaction {version} already exists, will retry!"); + // If the version already exists, loop through again and re-check + // conflicts + attempt_number += 1; } Err(err) => { this.log_store