Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allow more than 15 concurrent transactions to have been committed #3067

Merged
merged 3 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: '1.80'
toolchain: '1.81'
override: true

- name: Format
Expand All @@ -42,7 +42,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: '1.80'
toolchain: '1.81'
override: true

- name: build and lint with clippy
Expand Down Expand Up @@ -79,7 +79,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: '1.80'
toolchain: '1.81'
override: true

- name: Run tests
Expand Down Expand Up @@ -114,7 +114,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: '1.80'
toolchain: '1.81'
override: true

# Install Java and Hadoop for HDFS integration tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: '1.80'
toolchain: '1.81'
override: true
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ resolver = "2"

[workspace.package]
authors = ["Qingping Hou <[email protected]>"]
rust-version = "1.80"
rust-version = "1.81"
keywords = ["deltalake", "delta", "datalake"]
readme = "README.md"
edition = "2021"
Expand Down
4 changes: 2 additions & 2 deletions crates/aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-aws"
version = "0.5.0"
version = "0.6.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.22.0", path = "../core" }
deltalake-core = { version = "0.23.0", path = "../core" }
aws-smithy-runtime-api = { version="1.7" }
aws-smithy-runtime = { version="1.7", optional = true}
aws-credential-types = { version="1.2", features = ["hardcoded-credentials"]}
Expand Down
4 changes: 2 additions & 2 deletions crates/azure/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-azure"
version = "0.5.0"
version = "0.6.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.22.0", path = "../core", features = [
deltalake-core = { version = "0.23.0", path = "../core", features = [
"datafusion",
] }
lazy_static = "1"
Expand Down
4 changes: 2 additions & 2 deletions crates/catalog-glue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-catalog-glue"
version = "0.6.0"
version = "0.7.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -15,7 +15,7 @@ rust-version.workspace = true
async-trait = { workspace = true }
aws-config = "1"
aws-sdk-glue = "1"
deltalake-core = { version = "0.22.0", path = "../core" }
deltalake-core = { version = "0.23.0", path = "../core" }
thiserror = { workspace = true }

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions crates/catalog-unity/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-catalog-unity"
version = "0.6.0"
version = "0.7.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -17,7 +17,7 @@ tokio.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
deltalake-core = { version = "0.22", path = "../core" }
deltalake-core = { version = "0.23", path = "../core" }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json", "http2"] }
reqwest-retry = "0.7"
reqwest-middleware = "0.4.0"
Expand Down
4 changes: 2 additions & 2 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-core"
version = "0.22.4"
version = "0.23.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down Expand Up @@ -128,4 +128,4 @@ datafusion = [
]
datafusion-ext = ["datafusion"]
json = ["parquet/json"]
python = ["arrow/pyarrow"]
python = ["arrow/pyarrow"]
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ mod tests {
async fn test_poll_table_commits() {
let path = "../test/tests/data/simple_table_with_checkpoint";
let mut table = crate::open_table_with_version(path, 9).await.unwrap();
assert_eq!(table.version(), 9);
let peek = table.peek_next_commit(table.version()).await.unwrap();
assert!(matches!(peek, PeekCommit::New(..)));

Expand Down
23 changes: 23 additions & 0 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,16 @@ pub enum CommitOrBytes {
LogBytes(Bytes),
}

/// The next commit that's available from underlying storage
///
#[derive(Debug)]
pub enum PeekCommit {
/// The next commit version and associated actions
New(i64, Vec<Action>),
/// Provided DeltaVersion is up to date
UpToDate,
}

/// Configuration parameters for a log store
#[derive(Debug, Clone)]
pub struct LogStoreConfig {
Expand Down Expand Up @@ -217,6 +227,19 @@ pub trait LogStore: Sync + Send {
/// Find earliest version currently stored in the delta log.
async fn get_earliest_version(&self, start_version: i64) -> DeltaResult<i64>;

/// Get the list of actions for the next commit
async fn peek_next_commit(&self, current_version: i64) -> DeltaResult<PeekCommit> {
let next_version = current_version + 1;
let commit_log_bytes = match self.read_commit_entry(next_version).await {
Ok(Some(bytes)) => Ok(bytes),
Ok(None) => return Ok(PeekCommit::UpToDate),
Err(err) => Err(err),
}?;

let actions = crate::logstore::get_actions(next_version, commit_log_bytes).await;
Ok(PeekCommit::New(next_version, actions.unwrap()))
}

/// Get underlying object store.
fn object_store(&self) -> Arc<dyn ObjectStore>;

Expand Down
53 changes: 0 additions & 53 deletions crates/core/src/operations/transaction/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,57 +80,4 @@ mod tests {
assert_eq!(app_txns3.get("my-app").map(|t| t.version), Some(3));
assert_eq!(table3.version(), 1);
}

#[tokio::test]
async fn test_app_txn_conflict() {
// A conflict must be raised whenever the same application id is used for two concurrent transactions

let tmp_dir = tempfile::tempdir().unwrap();
let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();

let batch = get_record_batch(None, false);
let table = DeltaOps::try_from_uri(tmp_path.to_str().unwrap())
.await
.unwrap()
.write(vec![batch.clone()])
.with_save_mode(SaveMode::ErrorIfExists)
.with_partition_columns(["modified"])
.with_commit_properties(
CommitProperties::default()
.with_application_transaction(Transaction::new("my-app", 1)),
)
.await
.unwrap();
assert_eq!(table.version(), 0);

let table2 = DeltaTableBuilder::from_uri(tmp_path.to_str().unwrap())
.load()
.await
.unwrap();
assert_eq!(table2.version(), 0);

let table = DeltaOps::from(table)
.write(vec![get_record_batch(None, false)])
.with_commit_properties(
CommitProperties::default()
.with_application_transaction(Transaction::new("my-app", 2)),
)
.await
.unwrap();
assert_eq!(table.version(), 1);

let res = DeltaOps::from(table2)
.write(vec![get_record_batch(None, false)])
.with_commit_properties(
CommitProperties::default()
.with_application_transaction(Transaction::new("my-app", 3)),
)
.await;

let err = res.err().unwrap();
assert_eq!(
err.to_string(),
"Transaction failed: Failed to commit transaction: Concurrent transaction failed."
);
}
}
77 changes: 46 additions & 31 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use futures::future::BoxFuture;
use object_store::path::Path;
use object_store::Error as ObjectStoreError;
use serde_json::Value;
use tracing::warn;
use tracing::*;

use self::conflict_checker::{TransactionInfo, WinningCommitSummary};
use crate::checkpoints::{cleanup_expired_logs_for, create_checkpoint_for};
Expand Down Expand Up @@ -566,12 +566,51 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
}

// unwrap() is safe here due to the above check
// TODO: refactor to only depend on TableReference Trait
let read_snapshot = this.table_data.unwrap().eager_snapshot();

let mut attempt_number = 1;
while attempt_number <= this.max_retries {
let version = read_snapshot.version() + attempt_number as i64;
let latest_version = this
.log_store
.get_latest_version(read_snapshot.version())
.await?;

if latest_version > read_snapshot.version() {
warn!("Attempting to write a transaction {} but the underlying table has been updated to {latest_version}\n{:?}", read_snapshot.version() + 1, this.log_store);
let mut steps = latest_version - read_snapshot.version();

// Need to check for conflicts with each version between the read_snapshot and
// the latest!
while steps != 0 {
let summary = WinningCommitSummary::try_new(
this.log_store.as_ref(),
latest_version - steps,
(latest_version - steps) + 1,
)
.await?;
let transaction_info = TransactionInfo::try_new(
read_snapshot,
this.data.operation.read_predicate(),
&this.data.actions,
this.data.operation.read_whole_table(),
)?;
let conflict_checker = ConflictChecker::new(
transaction_info,
summary,
Some(&this.data.operation),
);

match conflict_checker.check_conflicts() {
Ok(_) => {}
Err(err) => {
return Err(TransactionError::CommitConflict(err).into());
}
}
steps -= 1;
}
}
let version: i64 = latest_version + 1;

match this
.log_store
.write_commit_entry(version, commit_or_bytes.clone())
Expand All @@ -594,34 +633,10 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
});
}
Err(TransactionError::VersionAlreadyExists(version)) => {
let summary = WinningCommitSummary::try_new(
this.log_store.as_ref(),
version - 1,
version,
)
.await?;
let transaction_info = TransactionInfo::try_new(
read_snapshot,
this.data.operation.read_predicate(),
&this.data.actions,
this.data.operation.read_whole_table(),
)?;
let conflict_checker = ConflictChecker::new(
transaction_info,
summary,
Some(&this.data.operation),
);
match conflict_checker.check_conflicts() {
Ok(_) => {
attempt_number += 1;
}
Err(err) => {
this.log_store
.abort_commit_entry(version, commit_or_bytes)
.await?;
return Err(TransactionError::CommitConflict(err).into());
}
};
error!("The transaction {version} already exists, will retry!");
// If the version already exists, loop through again and re-check
// conflicts
attempt_number += 1;
}
Err(err) => {
this.log_store
Expand Down
Loading
Loading