From 5b46d032ea1ca70b03e507f450ddbefaaf681b29 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Thu, 19 Dec 2024 14:06:42 +0000 Subject: [PATCH 1/3] chore: create a new minor version with the MSRV upgrade a number of our transitive dependencies have forced the upgrade in patch versions of the Minimum Supported Rust Version (MSRV) and broke builds from crates.io and `main` on Rust 1.80. :unamused: This change upgrades us to 1.81 but bumps to 0.23 in the process. Signed-off-by: R. Tyler Croy --- .github/workflows/build.yml | 8 ++++---- .github/workflows/codecov.yml | 2 +- Cargo.toml | 2 +- crates/aws/Cargo.toml | 4 ++-- crates/azure/Cargo.toml | 4 ++-- crates/catalog-glue/Cargo.toml | 4 ++-- crates/catalog-unity/Cargo.toml | 4 ++-- crates/core/Cargo.toml | 4 ++-- crates/deltalake/Cargo.toml | 16 ++++++++-------- crates/gcp/Cargo.toml | 4 ++-- crates/hdfs/Cargo.toml | 4 ++-- crates/mount/Cargo.toml | 4 ++-- crates/test/Cargo.toml | 4 ++-- python/Cargo.toml | 2 +- 14 files changed, 33 insertions(+), 33 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index a807184c47..e206ab82e1 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -20,7 +20,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: '1.80' + toolchain: '1.81' override: true - name: Format @@ -42,7 +42,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: '1.80' + toolchain: '1.81' override: true - name: build and lint with clippy @@ -79,7 +79,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: '1.80' + toolchain: '1.81' override: true - name: Run tests @@ -114,7 +114,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: '1.80' + toolchain: '1.81' override: true # Install Java and Hadoop for HDFS integration tests diff --git a/.github/workflows/codecov.yml b/.github/workflows/codecov.yml index a8d9beabcd..f0e68536fc 100644 --- a/.github/workflows/codecov.yml +++ b/.github/workflows/codecov.yml @@ -20,7 +20,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: '1.80' + toolchain: '1.81' override: true - name: Install cargo-llvm-cov uses: taiki-e/install-action@cargo-llvm-cov diff --git a/Cargo.toml b/Cargo.toml index e1832c2349..ff564e8656 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ resolver = "2" [workspace.package] authors = ["Qingping Hou "] -rust-version = "1.80" +rust-version = "1.81" keywords = ["deltalake", "delta", "datalake"] readme = "README.md" edition = "2021" diff --git a/crates/aws/Cargo.toml b/crates/aws/Cargo.toml index 62fe6b0f42..4d273313ce 100644 --- a/crates/aws/Cargo.toml +++ b/crates/aws/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-aws" -version = "0.5.0" +version = "0.6.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.22.0", path = "../core" } +deltalake-core = { version = "0.23.0", path = "../core" } aws-smithy-runtime-api = { version="1.7" } aws-smithy-runtime = { version="1.7", optional = true} aws-credential-types = { version="1.2", features = ["hardcoded-credentials"]} diff --git a/crates/azure/Cargo.toml b/crates/azure/Cargo.toml index d80b2760ad..20fd7c7e66 100644 --- a/crates/azure/Cargo.toml +++ b/crates/azure/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-azure" -version = "0.5.0" +version = "0.6.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.22.0", path = "../core", features = [ +deltalake-core = { version = "0.23.0", path = "../core", features = [ "datafusion", ] } lazy_static = "1" diff --git a/crates/catalog-glue/Cargo.toml b/crates/catalog-glue/Cargo.toml index 17bb82404e..07495e1b03 100644 --- a/crates/catalog-glue/Cargo.toml +++ b/crates/catalog-glue/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-catalog-glue" -version = "0.6.0" +version = "0.7.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -15,7 +15,7 @@ rust-version.workspace = true async-trait = { workspace = true } aws-config = "1" aws-sdk-glue = "1" -deltalake-core = { version = "0.22.0", path = "../core" } +deltalake-core = { version = "0.23.0", path = "../core" } thiserror = { workspace = true } [dev-dependencies] diff --git a/crates/catalog-unity/Cargo.toml b/crates/catalog-unity/Cargo.toml index 8a0827386b..4ed44d9050 100644 --- a/crates/catalog-unity/Cargo.toml +++ b/crates/catalog-unity/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-catalog-unity" -version = "0.6.0" +version = "0.7.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -17,7 +17,7 @@ tokio.workspace = true serde.workspace = true serde_json.workspace = true thiserror.workspace = true -deltalake-core = { version = "0.22", path = "../core" } +deltalake-core = { version = "0.23", path = "../core" } reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json", "http2"] } reqwest-retry = "0.7" reqwest-middleware = "0.4.0" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index a32a89ea8c..0797b77b9a 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-core" -version = "0.22.4" +version = "0.23.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -128,4 +128,4 @@ datafusion = [ ] datafusion-ext = ["datafusion"] json = ["parquet/json"] -python = ["arrow/pyarrow"] \ No newline at end of file +python = ["arrow/pyarrow"] diff --git a/crates/deltalake/Cargo.toml b/crates/deltalake/Cargo.toml index 79121711de..1f0a8f7b21 100644 --- a/crates/deltalake/Cargo.toml +++ b/crates/deltalake/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake" -version = "0.22.4" +version = "0.23.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -16,13 +16,13 @@ rust-version.workspace = true features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity-experimental"] [dependencies] -deltalake-core = { version = "0.22.2", path = "../core" } -deltalake-aws = { version = "0.5.0", path = "../aws", default-features = false, optional = true } -deltalake-azure = { version = "0.5.0", path = "../azure", optional = true } -deltalake-gcp = { version = "0.6.0", path = "../gcp", optional = true } -deltalake-hdfs = { version = "0.6.0", path = "../hdfs", optional = true } -deltalake-catalog-glue = { version = "0.6.0", path = "../catalog-glue", optional = true } -deltalake-catalog-unity = { version = "0.6.0", path = "../catalog-unity", optional = true } +deltalake-core = { version = "0.23.0", path = "../core" } +deltalake-aws = { version = "0.6.0", path = "../aws", default-features = false, optional = true } +deltalake-azure = { version = "0.6.0", path = "../azure", optional = true } +deltalake-gcp = { version = "0.7.0", path = "../gcp", optional = true } +deltalake-hdfs = { version = "0.7.0", path = "../hdfs", optional = true } +deltalake-catalog-glue = { version = "0.7.0", path = "../catalog-glue", optional = true } +deltalake-catalog-unity = { version = "0.7.0", path = "../catalog-unity", optional = true } [features] # All of these features are just reflected into the core crate until that diff --git a/crates/gcp/Cargo.toml b/crates/gcp/Cargo.toml index e292138e9e..7507cefb41 100644 --- a/crates/gcp/Cargo.toml +++ b/crates/gcp/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-gcp" -version = "0.6.0" +version = "0.7.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.22.0", path = "../core" } +deltalake-core = { version = "0.23.0", path = "../core" } lazy_static = "1" # workspace depenndecies diff --git a/crates/hdfs/Cargo.toml b/crates/hdfs/Cargo.toml index 60e5b1ada2..3936c98278 100644 --- a/crates/hdfs/Cargo.toml +++ b/crates/hdfs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-hdfs" -version = "0.6.0" +version = "0.7.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.22.0", path = "../core" } +deltalake-core = { version = "0.23.0", path = "../core" } hdfs-native-object-store = "0.12" # workspace dependecies diff --git a/crates/mount/Cargo.toml b/crates/mount/Cargo.toml index 6a0e36c3cf..63e807aa9d 100644 --- a/crates/mount/Cargo.toml +++ b/crates/mount/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-mount" -version = "0.6.0" +version = "0.7.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.22.0", path = "../core", features = [ +deltalake-core = { version = "0.23.0", path = "../core", features = [ "datafusion", ] } lazy_static = "1" diff --git a/crates/test/Cargo.toml b/crates/test/Cargo.toml index 6b01e87539..569a9e2d31 100644 --- a/crates/test/Cargo.toml +++ b/crates/test/Cargo.toml @@ -1,13 +1,13 @@ [package] name = "deltalake-test" -version = "0.5.0" +version = "0.6.0" edition = "2021" publish = false [dependencies] bytes = { workspace = true } chrono = { workspace = true, default-features = false, features = ["clock"] } -deltalake-core = { version = "0.22.0", path = "../core" } +deltalake-core = { version = "0.23.0", path = "../core" } dotenvy = "0" fs_extra = "1.3.0" futures = { version = "0.3" } diff --git a/python/Cargo.toml b/python/Cargo.toml index c5f300cc0c..76173ceb7e 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-python" -version = "0.22.4" +version = "0.23.0" authors = ["Qingping Hou ", "Will Jones "] homepage = "https://github.com/delta-io/delta-rs" license = "Apache-2.0" From 9b95d45e4412f89c9a2c5e6bfc579ed6f7f2ba12 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Tue, 17 Dec 2024 18:22:50 +0000 Subject: [PATCH 2/3] chore: relocate peek_next_commit onto the logstore directly for easier use This will help retry logic associated with the logstore to directly find the appropriate next c ommit Related to #3306 Signed-off-by: R. Tyler Croy --- crates/core/src/logstore/mod.rs | 23 +++++++++++++++++++++++ crates/core/src/table/mod.rs | 33 ++++++++++----------------------- 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 69174d97ee..42007e9962 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -162,6 +162,16 @@ pub enum CommitOrBytes { LogBytes(Bytes), } +/// The next commit that's available from underlying storage +/// +#[derive(Debug)] +pub enum PeekCommit { + /// The next commit version and associated actions + New(i64, Vec), + /// Provided DeltaVersion is up to date + UpToDate, +} + /// Configuration parameters for a log store #[derive(Debug, Clone)] pub struct LogStoreConfig { @@ -217,6 +227,19 @@ pub trait LogStore: Sync + Send { /// Find earliest version currently stored in the delta log. async fn get_earliest_version(&self, start_version: i64) -> DeltaResult; + /// Get the list of actions for the next commit + async fn peek_next_commit(&self, current_version: i64) -> Result { + let next_version = current_version + 1; + let commit_log_bytes = match self.read_commit_entry(next_version).await { + Ok(Some(bytes)) => Ok(bytes), + Ok(None) => return Ok(PeekCommit::UpToDate), + Err(err) => Err(err), + }?; + + let actions = crate::logstore::get_actions(next_version, commit_log_bytes).await; + Ok(PeekCommit::New(next_version, actions.unwrap())) + } + /// Get underlying object store. fn object_store(&self) -> Arc; diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 20d04836ce..5317c34434 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -15,14 +15,16 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use self::builder::DeltaTableConfig; use self::state::DeltaTableState; use crate::kernel::{ - Action, CommitInfo, DataCheck, DataType, LogicalFile, Metadata, Protocol, StructType, - Transaction, + CommitInfo, DataCheck, DataType, LogicalFile, Metadata, Protocol, StructType, Transaction, }; -use crate::logstore::{self, extract_version_from_filename, LogStoreConfig, LogStoreRef}; +use crate::logstore::{extract_version_from_filename, LogStoreConfig, LogStoreRef}; use crate::partitions::PartitionFilter; use crate::storage::{commit_uri_from_version, ObjectStoreRef}; use crate::{DeltaResult, DeltaTableError}; +// NOTE: this use can go away when peek_next_commit is removed off of [DeltaTable] +pub use crate::logstore::PeekCommit; + pub mod builder; pub mod config; pub mod state; @@ -178,17 +180,6 @@ pub(crate) fn get_partition_col_data_types<'a>( .collect() } -/// The next commit that's available from underlying storage -/// TODO: Maybe remove this and replace it with Some/None and create a `Commit` struct to contain the next commit -/// -#[derive(Debug)] -pub enum PeekCommit { - /// The next commit version and associated actions - New(i64, Vec), - /// Provided DeltaVersion is up to date - UpToDate, -} - /// In memory representation of a Delta Table #[derive(Clone)] pub struct DeltaTable { @@ -332,20 +323,16 @@ impl DeltaTable { self.update_incremental(None).await } + #[deprecated( + since = "0.22.4", + note = "peek_next_commit has moved to the logstore, use table.log_store().peek_next_commit() instead please :)" + )] /// Get the list of actions for the next commit pub async fn peek_next_commit( &self, current_version: i64, ) -> Result { - let next_version = current_version + 1; - let commit_log_bytes = match self.log_store.read_commit_entry(next_version).await { - Ok(Some(bytes)) => Ok(bytes), - Ok(None) => return Ok(PeekCommit::UpToDate), - Err(err) => Err(err), - }?; - - let actions = logstore::get_actions(next_version, commit_log_bytes).await; - Ok(PeekCommit::New(next_version, actions.unwrap())) + self.log_store().peek_next_commit(current_version).await } /// Updates the DeltaTable to the latest version by incrementally applying newer versions. From 8e308e2a063970cca7d3c2ab63f2e33fc53639a6 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Wed, 18 Dec 2024 18:23:26 +0000 Subject: [PATCH 3/3] fix: allow more than 15 concurrent transactions to have been committed There are high-concurrency scenarios where multiple writers may have committed versions to the log since the current process' state was loaded. Because of the `snapshot.version() + 1` and retry logic in the transaction commit operation, this would result in failures if more than 15 versions had been committed since the table state had been loaded. This approach fetches the latest known version on the table prior to committing the transaction, and if the latest version has shifted, it will perform a conflict check with the versions that have since been committed. Fixes #3066 Signed-off-by: R. Tyler Croy Sponsored-by: Scribd Inc --- crates/core/src/lib.rs | 1 + crates/core/src/logstore/mod.rs | 2 +- .../src/operations/transaction/application.rs | 53 ------------- crates/core/src/operations/transaction/mod.rs | 77 +++++++++++-------- 4 files changed, 48 insertions(+), 85 deletions(-) diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index cc9bcd71b4..2bd1677e13 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -547,6 +547,7 @@ mod tests { async fn test_poll_table_commits() { let path = "../test/tests/data/simple_table_with_checkpoint"; let mut table = crate::open_table_with_version(path, 9).await.unwrap(); + assert_eq!(table.version(), 9); let peek = table.peek_next_commit(table.version()).await.unwrap(); assert!(matches!(peek, PeekCommit::New(..))); diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 42007e9962..db2982f733 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -228,7 +228,7 @@ pub trait LogStore: Sync + Send { async fn get_earliest_version(&self, start_version: i64) -> DeltaResult; /// Get the list of actions for the next commit - async fn peek_next_commit(&self, current_version: i64) -> Result { + async fn peek_next_commit(&self, current_version: i64) -> DeltaResult { let next_version = current_version + 1; let commit_log_bytes = match self.read_commit_entry(next_version).await { Ok(Some(bytes)) => Ok(bytes), diff --git a/crates/core/src/operations/transaction/application.rs b/crates/core/src/operations/transaction/application.rs index 5a636bcecf..c8893508b5 100644 --- a/crates/core/src/operations/transaction/application.rs +++ b/crates/core/src/operations/transaction/application.rs @@ -80,57 +80,4 @@ mod tests { assert_eq!(app_txns3.get("my-app").map(|t| t.version), Some(3)); assert_eq!(table3.version(), 1); } - - #[tokio::test] - async fn test_app_txn_conflict() { - // A conflict must be raised whenever the same application id is used for two concurrent transactions - - let tmp_dir = tempfile::tempdir().unwrap(); - let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap(); - - let batch = get_record_batch(None, false); - let table = DeltaOps::try_from_uri(tmp_path.to_str().unwrap()) - .await - .unwrap() - .write(vec![batch.clone()]) - .with_save_mode(SaveMode::ErrorIfExists) - .with_partition_columns(["modified"]) - .with_commit_properties( - CommitProperties::default() - .with_application_transaction(Transaction::new("my-app", 1)), - ) - .await - .unwrap(); - assert_eq!(table.version(), 0); - - let table2 = DeltaTableBuilder::from_uri(tmp_path.to_str().unwrap()) - .load() - .await - .unwrap(); - assert_eq!(table2.version(), 0); - - let table = DeltaOps::from(table) - .write(vec![get_record_batch(None, false)]) - .with_commit_properties( - CommitProperties::default() - .with_application_transaction(Transaction::new("my-app", 2)), - ) - .await - .unwrap(); - assert_eq!(table.version(), 1); - - let res = DeltaOps::from(table2) - .write(vec![get_record_batch(None, false)]) - .with_commit_properties( - CommitProperties::default() - .with_application_transaction(Transaction::new("my-app", 3)), - ) - .await; - - let err = res.err().unwrap(); - assert_eq!( - err.to_string(), - "Transaction failed: Failed to commit transaction: Concurrent transaction failed." - ); - } } diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 6d80d858b0..72b045b275 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -82,7 +82,7 @@ use futures::future::BoxFuture; use object_store::path::Path; use object_store::Error as ObjectStoreError; use serde_json::Value; -use tracing::warn; +use tracing::*; use self::conflict_checker::{TransactionInfo, WinningCommitSummary}; use crate::checkpoints::{cleanup_expired_logs_for, create_checkpoint_for}; @@ -566,12 +566,51 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { } // unwrap() is safe here due to the above check - // TODO: refactor to only depend on TableReference Trait let read_snapshot = this.table_data.unwrap().eager_snapshot(); let mut attempt_number = 1; while attempt_number <= this.max_retries { - let version = read_snapshot.version() + attempt_number as i64; + let latest_version = this + .log_store + .get_latest_version(read_snapshot.version()) + .await?; + + if latest_version > read_snapshot.version() { + warn!("Attempting to write a transaction {} but the underlying table has been updated to {latest_version}\n{:?}", read_snapshot.version() + 1, this.log_store); + let mut steps = latest_version - read_snapshot.version(); + + // Need to check for conflicts with each version between the read_snapshot and + // the latest! + while steps != 0 { + let summary = WinningCommitSummary::try_new( + this.log_store.as_ref(), + latest_version - steps, + (latest_version - steps) + 1, + ) + .await?; + let transaction_info = TransactionInfo::try_new( + read_snapshot, + this.data.operation.read_predicate(), + &this.data.actions, + this.data.operation.read_whole_table(), + )?; + let conflict_checker = ConflictChecker::new( + transaction_info, + summary, + Some(&this.data.operation), + ); + + match conflict_checker.check_conflicts() { + Ok(_) => {} + Err(err) => { + return Err(TransactionError::CommitConflict(err).into()); + } + } + steps -= 1; + } + } + let version: i64 = latest_version + 1; + match this .log_store .write_commit_entry(version, commit_or_bytes.clone()) @@ -594,34 +633,10 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { }); } Err(TransactionError::VersionAlreadyExists(version)) => { - let summary = WinningCommitSummary::try_new( - this.log_store.as_ref(), - version - 1, - version, - ) - .await?; - let transaction_info = TransactionInfo::try_new( - read_snapshot, - this.data.operation.read_predicate(), - &this.data.actions, - this.data.operation.read_whole_table(), - )?; - let conflict_checker = ConflictChecker::new( - transaction_info, - summary, - Some(&this.data.operation), - ); - match conflict_checker.check_conflicts() { - Ok(_) => { - attempt_number += 1; - } - Err(err) => { - this.log_store - .abort_commit_entry(version, commit_or_bytes) - .await?; - return Err(TransactionError::CommitConflict(err).into()); - } - }; + error!("The transaction {version} already exists, will retry!"); + // If the version already exists, loop through again and re-check + // conflicts + attempt_number += 1; } Err(err) => { this.log_store