diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index cc9bcd71b4..2bd1677e13 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -547,6 +547,7 @@ mod tests { async fn test_poll_table_commits() { let path = "../test/tests/data/simple_table_with_checkpoint"; let mut table = crate::open_table_with_version(path, 9).await.unwrap(); + assert_eq!(table.version(), 9); let peek = table.peek_next_commit(table.version()).await.unwrap(); assert!(matches!(peek, PeekCommit::New(..))); diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 42007e9962..db2982f733 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -228,7 +228,7 @@ pub trait LogStore: Sync + Send { async fn get_earliest_version(&self, start_version: i64) -> DeltaResult; /// Get the list of actions for the next commit - async fn peek_next_commit(&self, current_version: i64) -> Result { + async fn peek_next_commit(&self, current_version: i64) -> DeltaResult { let next_version = current_version + 1; let commit_log_bytes = match self.read_commit_entry(next_version).await { Ok(Some(bytes)) => Ok(bytes), diff --git a/crates/core/src/operations/transaction/application.rs b/crates/core/src/operations/transaction/application.rs index 5a636bcecf..c8893508b5 100644 --- a/crates/core/src/operations/transaction/application.rs +++ b/crates/core/src/operations/transaction/application.rs @@ -80,57 +80,4 @@ mod tests { assert_eq!(app_txns3.get("my-app").map(|t| t.version), Some(3)); assert_eq!(table3.version(), 1); } - - #[tokio::test] - async fn test_app_txn_conflict() { - // A conflict must be raised whenever the same application id is used for two concurrent transactions - - let tmp_dir = tempfile::tempdir().unwrap(); - let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap(); - - let batch = get_record_batch(None, false); - let table = DeltaOps::try_from_uri(tmp_path.to_str().unwrap()) - .await - .unwrap() - .write(vec![batch.clone()]) - .with_save_mode(SaveMode::ErrorIfExists) - .with_partition_columns(["modified"]) - .with_commit_properties( - CommitProperties::default() - .with_application_transaction(Transaction::new("my-app", 1)), - ) - .await - .unwrap(); - assert_eq!(table.version(), 0); - - let table2 = DeltaTableBuilder::from_uri(tmp_path.to_str().unwrap()) - .load() - .await - .unwrap(); - assert_eq!(table2.version(), 0); - - let table = DeltaOps::from(table) - .write(vec![get_record_batch(None, false)]) - .with_commit_properties( - CommitProperties::default() - .with_application_transaction(Transaction::new("my-app", 2)), - ) - .await - .unwrap(); - assert_eq!(table.version(), 1); - - let res = DeltaOps::from(table2) - .write(vec![get_record_batch(None, false)]) - .with_commit_properties( - CommitProperties::default() - .with_application_transaction(Transaction::new("my-app", 3)), - ) - .await; - - let err = res.err().unwrap(); - assert_eq!( - err.to_string(), - "Transaction failed: Failed to commit transaction: Concurrent transaction failed." - ); - } } diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 6d80d858b0..72b045b275 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -82,7 +82,7 @@ use futures::future::BoxFuture; use object_store::path::Path; use object_store::Error as ObjectStoreError; use serde_json::Value; -use tracing::warn; +use tracing::*; use self::conflict_checker::{TransactionInfo, WinningCommitSummary}; use crate::checkpoints::{cleanup_expired_logs_for, create_checkpoint_for}; @@ -566,12 +566,51 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { } // unwrap() is safe here due to the above check - // TODO: refactor to only depend on TableReference Trait let read_snapshot = this.table_data.unwrap().eager_snapshot(); let mut attempt_number = 1; while attempt_number <= this.max_retries { - let version = read_snapshot.version() + attempt_number as i64; + let latest_version = this + .log_store + .get_latest_version(read_snapshot.version()) + .await?; + + if latest_version > read_snapshot.version() { + warn!("Attempting to write a transaction {} but the underlying table has been updated to {latest_version}\n{:?}", read_snapshot.version() + 1, this.log_store); + let mut steps = latest_version - read_snapshot.version(); + + // Need to check for conflicts with each version between the read_snapshot and + // the latest! + while steps != 0 { + let summary = WinningCommitSummary::try_new( + this.log_store.as_ref(), + latest_version - steps, + (latest_version - steps) + 1, + ) + .await?; + let transaction_info = TransactionInfo::try_new( + read_snapshot, + this.data.operation.read_predicate(), + &this.data.actions, + this.data.operation.read_whole_table(), + )?; + let conflict_checker = ConflictChecker::new( + transaction_info, + summary, + Some(&this.data.operation), + ); + + match conflict_checker.check_conflicts() { + Ok(_) => {} + Err(err) => { + return Err(TransactionError::CommitConflict(err).into()); + } + } + steps -= 1; + } + } + let version: i64 = latest_version + 1; + match this .log_store .write_commit_entry(version, commit_or_bytes.clone()) @@ -594,34 +633,10 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { }); } Err(TransactionError::VersionAlreadyExists(version)) => { - let summary = WinningCommitSummary::try_new( - this.log_store.as_ref(), - version - 1, - version, - ) - .await?; - let transaction_info = TransactionInfo::try_new( - read_snapshot, - this.data.operation.read_predicate(), - &this.data.actions, - this.data.operation.read_whole_table(), - )?; - let conflict_checker = ConflictChecker::new( - transaction_info, - summary, - Some(&this.data.operation), - ); - match conflict_checker.check_conflicts() { - Ok(_) => { - attempt_number += 1; - } - Err(err) => { - this.log_store - .abort_commit_entry(version, commit_or_bytes) - .await?; - return Err(TransactionError::CommitConflict(err).into()); - } - }; + error!("The transaction {version} already exists, will retry!"); + // If the version already exists, loop through again and re-check + // conflicts + attempt_number += 1; } Err(err) => { this.log_store