Skip to content

Commit

Permalink
fix: allow more than 15 concurrent transactions to have been committed
Browse files Browse the repository at this point in the history
There are high-concurrency scenarios where multiple writers may have
committed versions to the log since the current process' state was
loaded. Because of the `snapshot.version() + 1` and retry logic in the
transaction commit operation, this would result in failures if more than
15 versions had been committed since the table state had been loaded.

This approach fetches the latest known version on the table prior to
committing the transaction, and if the latest version has shifted, it
will perform a conflict check with the versions that have since been
committed.

Fixes #3066

Signed-off-by: R. Tyler Croy <[email protected]>
Sponsored-by: Scribd Inc
  • Loading branch information
rtyler committed Dec 20, 2024
1 parent b86509c commit b241bf6
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 85 deletions.
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ mod tests {
async fn test_poll_table_commits() {
let path = "../test/tests/data/simple_table_with_checkpoint";
let mut table = crate::open_table_with_version(path, 9).await.unwrap();
assert_eq!(table.version(), 9);
let peek = table.peek_next_commit(table.version()).await.unwrap();
assert!(matches!(peek, PeekCommit::New(..)));

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ pub trait LogStore: Sync + Send {
async fn get_earliest_version(&self, start_version: i64) -> DeltaResult<i64>;

/// Get the list of actions for the next commit
async fn peek_next_commit(&self, current_version: i64) -> Result<PeekCommit, DeltaTableError> {
async fn peek_next_commit(&self, current_version: i64) -> DeltaResult<PeekCommit> {
let next_version = current_version + 1;
let commit_log_bytes = match self.read_commit_entry(next_version).await {
Ok(Some(bytes)) => Ok(bytes),
Expand Down
53 changes: 0 additions & 53 deletions crates/core/src/operations/transaction/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,57 +80,4 @@ mod tests {
assert_eq!(app_txns3.get("my-app").map(|t| t.version), Some(3));
assert_eq!(table3.version(), 1);
}

#[tokio::test]
async fn test_app_txn_conflict() {
// A conflict must be raised whenever the same application id is used for two concurrent transactions

let tmp_dir = tempfile::tempdir().unwrap();
let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();

let batch = get_record_batch(None, false);
let table = DeltaOps::try_from_uri(tmp_path.to_str().unwrap())
.await
.unwrap()
.write(vec![batch.clone()])
.with_save_mode(SaveMode::ErrorIfExists)
.with_partition_columns(["modified"])
.with_commit_properties(
CommitProperties::default()
.with_application_transaction(Transaction::new("my-app", 1)),
)
.await
.unwrap();
assert_eq!(table.version(), 0);

let table2 = DeltaTableBuilder::from_uri(tmp_path.to_str().unwrap())
.load()
.await
.unwrap();
assert_eq!(table2.version(), 0);

let table = DeltaOps::from(table)
.write(vec![get_record_batch(None, false)])
.with_commit_properties(
CommitProperties::default()
.with_application_transaction(Transaction::new("my-app", 2)),
)
.await
.unwrap();
assert_eq!(table.version(), 1);

let res = DeltaOps::from(table2)
.write(vec![get_record_batch(None, false)])
.with_commit_properties(
CommitProperties::default()
.with_application_transaction(Transaction::new("my-app", 3)),
)
.await;

let err = res.err().unwrap();
assert_eq!(
err.to_string(),
"Transaction failed: Failed to commit transaction: Concurrent transaction failed."
);
}
}
77 changes: 46 additions & 31 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use futures::future::BoxFuture;
use object_store::path::Path;
use object_store::Error as ObjectStoreError;
use serde_json::Value;
use tracing::warn;
use tracing::*;

use self::conflict_checker::{TransactionInfo, WinningCommitSummary};
use crate::checkpoints::{cleanup_expired_logs_for, create_checkpoint_for};
Expand Down Expand Up @@ -566,12 +566,51 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
}

// unwrap() is safe here due to the above check
// TODO: refactor to only depend on TableReference Trait
let read_snapshot = this.table_data.unwrap().eager_snapshot();

let mut attempt_number = 1;
while attempt_number <= this.max_retries {
let version = read_snapshot.version() + attempt_number as i64;
let latest_version = this
.log_store
.get_latest_version(read_snapshot.version())
.await?;

if latest_version > read_snapshot.version() {
warn!("Attempting to write a transaction {} but the underlying table has been updated to {latest_version}\n{:?}", read_snapshot.version() + 1, this.log_store);
let mut steps = latest_version - read_snapshot.version();

// Need to check for conflicts with each version between the read_snapshot and
// the latest!
while steps != 0 {
let summary = WinningCommitSummary::try_new(
this.log_store.as_ref(),
latest_version - steps,
(latest_version - steps) + 1,
)
.await?;
let transaction_info = TransactionInfo::try_new(
read_snapshot,
this.data.operation.read_predicate(),
&this.data.actions,
this.data.operation.read_whole_table(),
)?;
let conflict_checker = ConflictChecker::new(
transaction_info,
summary,
Some(&this.data.operation),
);

match conflict_checker.check_conflicts() {
Ok(_) => {}
Err(err) => {
return Err(TransactionError::CommitConflict(err).into());
}
}
steps -= 1;
}
}
let version: i64 = latest_version + 1;

match this
.log_store
.write_commit_entry(version, commit_or_bytes.clone())
Expand All @@ -594,34 +633,10 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
});
}
Err(TransactionError::VersionAlreadyExists(version)) => {
let summary = WinningCommitSummary::try_new(
this.log_store.as_ref(),
version - 1,
version,
)
.await?;
let transaction_info = TransactionInfo::try_new(
read_snapshot,
this.data.operation.read_predicate(),
&this.data.actions,
this.data.operation.read_whole_table(),
)?;
let conflict_checker = ConflictChecker::new(
transaction_info,
summary,
Some(&this.data.operation),
);
match conflict_checker.check_conflicts() {
Ok(_) => {
attempt_number += 1;
}
Err(err) => {
this.log_store
.abort_commit_entry(version, commit_or_bytes)
.await?;
return Err(TransactionError::CommitConflict(err).into());
}
};
error!("The transaction {version} already exists, will retry!");
// If the version already exists, loop through again and re-check
// conflicts
attempt_number += 1;
}
Err(err) => {
this.log_store
Expand Down

0 comments on commit b241bf6

Please sign in to comment.