diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index a807184c47..e206ab82e1 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -20,7 +20,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: '1.80' + toolchain: '1.81' override: true - name: Format @@ -42,7 +42,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: '1.80' + toolchain: '1.81' override: true - name: build and lint with clippy @@ -79,7 +79,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: '1.80' + toolchain: '1.81' override: true - name: Run tests @@ -114,7 +114,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: '1.80' + toolchain: '1.81' override: true # Install Java and Hadoop for HDFS integration tests diff --git a/.github/workflows/codecov.yml b/.github/workflows/codecov.yml index a8d9beabcd..f0e68536fc 100644 --- a/.github/workflows/codecov.yml +++ b/.github/workflows/codecov.yml @@ -20,7 +20,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: '1.80' + toolchain: '1.81' override: true - name: Install cargo-llvm-cov uses: taiki-e/install-action@cargo-llvm-cov diff --git a/Cargo.toml b/Cargo.toml index e1832c2349..ff564e8656 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ resolver = "2" [workspace.package] authors = ["Qingping Hou "] -rust-version = "1.80" +rust-version = "1.81" keywords = ["deltalake", "delta", "datalake"] readme = "README.md" edition = "2021" diff --git a/crates/aws/Cargo.toml b/crates/aws/Cargo.toml index 62fe6b0f42..4d273313ce 100644 --- a/crates/aws/Cargo.toml +++ b/crates/aws/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-aws" -version = "0.5.0" +version = "0.6.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.22.0", path = "../core" } +deltalake-core = { version = "0.23.0", path = "../core" } aws-smithy-runtime-api = { version="1.7" } aws-smithy-runtime = { version="1.7", optional = true} aws-credential-types = { version="1.2", features = ["hardcoded-credentials"]} diff --git a/crates/azure/Cargo.toml b/crates/azure/Cargo.toml index d80b2760ad..20fd7c7e66 100644 --- a/crates/azure/Cargo.toml +++ b/crates/azure/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-azure" -version = "0.5.0" +version = "0.6.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.22.0", path = "../core", features = [ +deltalake-core = { version = "0.23.0", path = "../core", features = [ "datafusion", ] } lazy_static = "1" diff --git a/crates/catalog-glue/Cargo.toml b/crates/catalog-glue/Cargo.toml index 17bb82404e..07495e1b03 100644 --- a/crates/catalog-glue/Cargo.toml +++ b/crates/catalog-glue/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-catalog-glue" -version = "0.6.0" +version = "0.7.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -15,7 +15,7 @@ rust-version.workspace = true async-trait = { workspace = true } aws-config = "1" aws-sdk-glue = "1" -deltalake-core = { version = "0.22.0", path = "../core" } +deltalake-core = { version = "0.23.0", path = "../core" } thiserror = { workspace = true } [dev-dependencies] diff --git a/crates/catalog-unity/Cargo.toml b/crates/catalog-unity/Cargo.toml index 8a0827386b..4ed44d9050 100644 --- a/crates/catalog-unity/Cargo.toml +++ b/crates/catalog-unity/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-catalog-unity" -version = "0.6.0" +version = "0.7.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -17,7 +17,7 @@ tokio.workspace = true serde.workspace = true serde_json.workspace = true thiserror.workspace = true -deltalake-core = { version = "0.22", path = "../core" } +deltalake-core = { version = "0.23", path = "../core" } reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json", "http2"] } reqwest-retry = "0.7" reqwest-middleware = "0.4.0" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index a32a89ea8c..0797b77b9a 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-core" -version = "0.22.4" +version = "0.23.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -128,4 +128,4 @@ datafusion = [ ] datafusion-ext = ["datafusion"] json = ["parquet/json"] -python = ["arrow/pyarrow"] \ No newline at end of file +python = ["arrow/pyarrow"] diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index cc9bcd71b4..2bd1677e13 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -547,6 +547,7 @@ mod tests { async fn test_poll_table_commits() { let path = "../test/tests/data/simple_table_with_checkpoint"; let mut table = crate::open_table_with_version(path, 9).await.unwrap(); + assert_eq!(table.version(), 9); let peek = table.peek_next_commit(table.version()).await.unwrap(); assert!(matches!(peek, PeekCommit::New(..))); diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 69174d97ee..db2982f733 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -162,6 +162,16 @@ pub enum CommitOrBytes { LogBytes(Bytes), } +/// The next commit that's available from underlying storage +/// +#[derive(Debug)] +pub enum PeekCommit { + /// The next commit version and associated actions + New(i64, Vec), + /// Provided DeltaVersion is up to date + UpToDate, +} + /// Configuration parameters for a log store #[derive(Debug, Clone)] pub struct LogStoreConfig { @@ -217,6 +227,19 @@ pub trait LogStore: Sync + Send { /// Find earliest version currently stored in the delta log. async fn get_earliest_version(&self, start_version: i64) -> DeltaResult; + /// Get the list of actions for the next commit + async fn peek_next_commit(&self, current_version: i64) -> DeltaResult { + let next_version = current_version + 1; + let commit_log_bytes = match self.read_commit_entry(next_version).await { + Ok(Some(bytes)) => Ok(bytes), + Ok(None) => return Ok(PeekCommit::UpToDate), + Err(err) => Err(err), + }?; + + let actions = crate::logstore::get_actions(next_version, commit_log_bytes).await; + Ok(PeekCommit::New(next_version, actions.unwrap())) + } + /// Get underlying object store. fn object_store(&self) -> Arc; diff --git a/crates/core/src/operations/transaction/application.rs b/crates/core/src/operations/transaction/application.rs index 5a636bcecf..c8893508b5 100644 --- a/crates/core/src/operations/transaction/application.rs +++ b/crates/core/src/operations/transaction/application.rs @@ -80,57 +80,4 @@ mod tests { assert_eq!(app_txns3.get("my-app").map(|t| t.version), Some(3)); assert_eq!(table3.version(), 1); } - - #[tokio::test] - async fn test_app_txn_conflict() { - // A conflict must be raised whenever the same application id is used for two concurrent transactions - - let tmp_dir = tempfile::tempdir().unwrap(); - let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap(); - - let batch = get_record_batch(None, false); - let table = DeltaOps::try_from_uri(tmp_path.to_str().unwrap()) - .await - .unwrap() - .write(vec![batch.clone()]) - .with_save_mode(SaveMode::ErrorIfExists) - .with_partition_columns(["modified"]) - .with_commit_properties( - CommitProperties::default() - .with_application_transaction(Transaction::new("my-app", 1)), - ) - .await - .unwrap(); - assert_eq!(table.version(), 0); - - let table2 = DeltaTableBuilder::from_uri(tmp_path.to_str().unwrap()) - .load() - .await - .unwrap(); - assert_eq!(table2.version(), 0); - - let table = DeltaOps::from(table) - .write(vec![get_record_batch(None, false)]) - .with_commit_properties( - CommitProperties::default() - .with_application_transaction(Transaction::new("my-app", 2)), - ) - .await - .unwrap(); - assert_eq!(table.version(), 1); - - let res = DeltaOps::from(table2) - .write(vec![get_record_batch(None, false)]) - .with_commit_properties( - CommitProperties::default() - .with_application_transaction(Transaction::new("my-app", 3)), - ) - .await; - - let err = res.err().unwrap(); - assert_eq!( - err.to_string(), - "Transaction failed: Failed to commit transaction: Concurrent transaction failed." - ); - } } diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 6d80d858b0..72b045b275 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -82,7 +82,7 @@ use futures::future::BoxFuture; use object_store::path::Path; use object_store::Error as ObjectStoreError; use serde_json::Value; -use tracing::warn; +use tracing::*; use self::conflict_checker::{TransactionInfo, WinningCommitSummary}; use crate::checkpoints::{cleanup_expired_logs_for, create_checkpoint_for}; @@ -566,12 +566,51 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { } // unwrap() is safe here due to the above check - // TODO: refactor to only depend on TableReference Trait let read_snapshot = this.table_data.unwrap().eager_snapshot(); let mut attempt_number = 1; while attempt_number <= this.max_retries { - let version = read_snapshot.version() + attempt_number as i64; + let latest_version = this + .log_store + .get_latest_version(read_snapshot.version()) + .await?; + + if latest_version > read_snapshot.version() { + warn!("Attempting to write a transaction {} but the underlying table has been updated to {latest_version}\n{:?}", read_snapshot.version() + 1, this.log_store); + let mut steps = latest_version - read_snapshot.version(); + + // Need to check for conflicts with each version between the read_snapshot and + // the latest! + while steps != 0 { + let summary = WinningCommitSummary::try_new( + this.log_store.as_ref(), + latest_version - steps, + (latest_version - steps) + 1, + ) + .await?; + let transaction_info = TransactionInfo::try_new( + read_snapshot, + this.data.operation.read_predicate(), + &this.data.actions, + this.data.operation.read_whole_table(), + )?; + let conflict_checker = ConflictChecker::new( + transaction_info, + summary, + Some(&this.data.operation), + ); + + match conflict_checker.check_conflicts() { + Ok(_) => {} + Err(err) => { + return Err(TransactionError::CommitConflict(err).into()); + } + } + steps -= 1; + } + } + let version: i64 = latest_version + 1; + match this .log_store .write_commit_entry(version, commit_or_bytes.clone()) @@ -594,34 +633,10 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { }); } Err(TransactionError::VersionAlreadyExists(version)) => { - let summary = WinningCommitSummary::try_new( - this.log_store.as_ref(), - version - 1, - version, - ) - .await?; - let transaction_info = TransactionInfo::try_new( - read_snapshot, - this.data.operation.read_predicate(), - &this.data.actions, - this.data.operation.read_whole_table(), - )?; - let conflict_checker = ConflictChecker::new( - transaction_info, - summary, - Some(&this.data.operation), - ); - match conflict_checker.check_conflicts() { - Ok(_) => { - attempt_number += 1; - } - Err(err) => { - this.log_store - .abort_commit_entry(version, commit_or_bytes) - .await?; - return Err(TransactionError::CommitConflict(err).into()); - } - }; + error!("The transaction {version} already exists, will retry!"); + // If the version already exists, loop through again and re-check + // conflicts + attempt_number += 1; } Err(err) => { this.log_store diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 20d04836ce..5317c34434 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -15,14 +15,16 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use self::builder::DeltaTableConfig; use self::state::DeltaTableState; use crate::kernel::{ - Action, CommitInfo, DataCheck, DataType, LogicalFile, Metadata, Protocol, StructType, - Transaction, + CommitInfo, DataCheck, DataType, LogicalFile, Metadata, Protocol, StructType, Transaction, }; -use crate::logstore::{self, extract_version_from_filename, LogStoreConfig, LogStoreRef}; +use crate::logstore::{extract_version_from_filename, LogStoreConfig, LogStoreRef}; use crate::partitions::PartitionFilter; use crate::storage::{commit_uri_from_version, ObjectStoreRef}; use crate::{DeltaResult, DeltaTableError}; +// NOTE: this use can go away when peek_next_commit is removed off of [DeltaTable] +pub use crate::logstore::PeekCommit; + pub mod builder; pub mod config; pub mod state; @@ -178,17 +180,6 @@ pub(crate) fn get_partition_col_data_types<'a>( .collect() } -/// The next commit that's available from underlying storage -/// TODO: Maybe remove this and replace it with Some/None and create a `Commit` struct to contain the next commit -/// -#[derive(Debug)] -pub enum PeekCommit { - /// The next commit version and associated actions - New(i64, Vec), - /// Provided DeltaVersion is up to date - UpToDate, -} - /// In memory representation of a Delta Table #[derive(Clone)] pub struct DeltaTable { @@ -332,20 +323,16 @@ impl DeltaTable { self.update_incremental(None).await } + #[deprecated( + since = "0.22.4", + note = "peek_next_commit has moved to the logstore, use table.log_store().peek_next_commit() instead please :)" + )] /// Get the list of actions for the next commit pub async fn peek_next_commit( &self, current_version: i64, ) -> Result { - let next_version = current_version + 1; - let commit_log_bytes = match self.log_store.read_commit_entry(next_version).await { - Ok(Some(bytes)) => Ok(bytes), - Ok(None) => return Ok(PeekCommit::UpToDate), - Err(err) => Err(err), - }?; - - let actions = logstore::get_actions(next_version, commit_log_bytes).await; - Ok(PeekCommit::New(next_version, actions.unwrap())) + self.log_store().peek_next_commit(current_version).await } /// Updates the DeltaTable to the latest version by incrementally applying newer versions. diff --git a/crates/deltalake/Cargo.toml b/crates/deltalake/Cargo.toml index 79121711de..1f0a8f7b21 100644 --- a/crates/deltalake/Cargo.toml +++ b/crates/deltalake/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake" -version = "0.22.4" +version = "0.23.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -16,13 +16,13 @@ rust-version.workspace = true features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity-experimental"] [dependencies] -deltalake-core = { version = "0.22.2", path = "../core" } -deltalake-aws = { version = "0.5.0", path = "../aws", default-features = false, optional = true } -deltalake-azure = { version = "0.5.0", path = "../azure", optional = true } -deltalake-gcp = { version = "0.6.0", path = "../gcp", optional = true } -deltalake-hdfs = { version = "0.6.0", path = "../hdfs", optional = true } -deltalake-catalog-glue = { version = "0.6.0", path = "../catalog-glue", optional = true } -deltalake-catalog-unity = { version = "0.6.0", path = "../catalog-unity", optional = true } +deltalake-core = { version = "0.23.0", path = "../core" } +deltalake-aws = { version = "0.6.0", path = "../aws", default-features = false, optional = true } +deltalake-azure = { version = "0.6.0", path = "../azure", optional = true } +deltalake-gcp = { version = "0.7.0", path = "../gcp", optional = true } +deltalake-hdfs = { version = "0.7.0", path = "../hdfs", optional = true } +deltalake-catalog-glue = { version = "0.7.0", path = "../catalog-glue", optional = true } +deltalake-catalog-unity = { version = "0.7.0", path = "../catalog-unity", optional = true } [features] # All of these features are just reflected into the core crate until that diff --git a/crates/gcp/Cargo.toml b/crates/gcp/Cargo.toml index e292138e9e..7507cefb41 100644 --- a/crates/gcp/Cargo.toml +++ b/crates/gcp/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-gcp" -version = "0.6.0" +version = "0.7.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.22.0", path = "../core" } +deltalake-core = { version = "0.23.0", path = "../core" } lazy_static = "1" # workspace depenndecies diff --git a/crates/hdfs/Cargo.toml b/crates/hdfs/Cargo.toml index 60e5b1ada2..3936c98278 100644 --- a/crates/hdfs/Cargo.toml +++ b/crates/hdfs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-hdfs" -version = "0.6.0" +version = "0.7.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.22.0", path = "../core" } +deltalake-core = { version = "0.23.0", path = "../core" } hdfs-native-object-store = "0.12" # workspace dependecies diff --git a/crates/mount/Cargo.toml b/crates/mount/Cargo.toml index 6a0e36c3cf..63e807aa9d 100644 --- a/crates/mount/Cargo.toml +++ b/crates/mount/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-mount" -version = "0.6.0" +version = "0.7.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.22.0", path = "../core", features = [ +deltalake-core = { version = "0.23.0", path = "../core", features = [ "datafusion", ] } lazy_static = "1" diff --git a/crates/test/Cargo.toml b/crates/test/Cargo.toml index 6b01e87539..569a9e2d31 100644 --- a/crates/test/Cargo.toml +++ b/crates/test/Cargo.toml @@ -1,13 +1,13 @@ [package] name = "deltalake-test" -version = "0.5.0" +version = "0.6.0" edition = "2021" publish = false [dependencies] bytes = { workspace = true } chrono = { workspace = true, default-features = false, features = ["clock"] } -deltalake-core = { version = "0.22.0", path = "../core" } +deltalake-core = { version = "0.23.0", path = "../core" } dotenvy = "0" fs_extra = "1.3.0" futures = { version = "0.3" } diff --git a/python/Cargo.toml b/python/Cargo.toml index c5f300cc0c..76173ceb7e 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-python" -version = "0.22.4" +version = "0.23.0" authors = ["Qingping Hou ", "Will Jones "] homepage = "https://github.com/delta-io/delta-rs" license = "Apache-2.0"