From 9b95d45e4412f89c9a2c5e6bfc579ed6f7f2ba12 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Tue, 17 Dec 2024 18:22:50 +0000 Subject: [PATCH] chore: relocate peek_next_commit onto the logstore directly for easier use This will help retry logic associated with the logstore to directly find the appropriate next c ommit Related to #3306 Signed-off-by: R. Tyler Croy --- crates/core/src/logstore/mod.rs | 23 +++++++++++++++++++++++ crates/core/src/table/mod.rs | 33 ++++++++++----------------------- 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 69174d97ee..42007e9962 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -162,6 +162,16 @@ pub enum CommitOrBytes { LogBytes(Bytes), } +/// The next commit that's available from underlying storage +/// +#[derive(Debug)] +pub enum PeekCommit { + /// The next commit version and associated actions + New(i64, Vec), + /// Provided DeltaVersion is up to date + UpToDate, +} + /// Configuration parameters for a log store #[derive(Debug, Clone)] pub struct LogStoreConfig { @@ -217,6 +227,19 @@ pub trait LogStore: Sync + Send { /// Find earliest version currently stored in the delta log. async fn get_earliest_version(&self, start_version: i64) -> DeltaResult; + /// Get the list of actions for the next commit + async fn peek_next_commit(&self, current_version: i64) -> Result { + let next_version = current_version + 1; + let commit_log_bytes = match self.read_commit_entry(next_version).await { + Ok(Some(bytes)) => Ok(bytes), + Ok(None) => return Ok(PeekCommit::UpToDate), + Err(err) => Err(err), + }?; + + let actions = crate::logstore::get_actions(next_version, commit_log_bytes).await; + Ok(PeekCommit::New(next_version, actions.unwrap())) + } + /// Get underlying object store. fn object_store(&self) -> Arc; diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 20d04836ce..5317c34434 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -15,14 +15,16 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use self::builder::DeltaTableConfig; use self::state::DeltaTableState; use crate::kernel::{ - Action, CommitInfo, DataCheck, DataType, LogicalFile, Metadata, Protocol, StructType, - Transaction, + CommitInfo, DataCheck, DataType, LogicalFile, Metadata, Protocol, StructType, Transaction, }; -use crate::logstore::{self, extract_version_from_filename, LogStoreConfig, LogStoreRef}; +use crate::logstore::{extract_version_from_filename, LogStoreConfig, LogStoreRef}; use crate::partitions::PartitionFilter; use crate::storage::{commit_uri_from_version, ObjectStoreRef}; use crate::{DeltaResult, DeltaTableError}; +// NOTE: this use can go away when peek_next_commit is removed off of [DeltaTable] +pub use crate::logstore::PeekCommit; + pub mod builder; pub mod config; pub mod state; @@ -178,17 +180,6 @@ pub(crate) fn get_partition_col_data_types<'a>( .collect() } -/// The next commit that's available from underlying storage -/// TODO: Maybe remove this and replace it with Some/None and create a `Commit` struct to contain the next commit -/// -#[derive(Debug)] -pub enum PeekCommit { - /// The next commit version and associated actions - New(i64, Vec), - /// Provided DeltaVersion is up to date - UpToDate, -} - /// In memory representation of a Delta Table #[derive(Clone)] pub struct DeltaTable { @@ -332,20 +323,16 @@ impl DeltaTable { self.update_incremental(None).await } + #[deprecated( + since = "0.22.4", + note = "peek_next_commit has moved to the logstore, use table.log_store().peek_next_commit() instead please :)" + )] /// Get the list of actions for the next commit pub async fn peek_next_commit( &self, current_version: i64, ) -> Result { - let next_version = current_version + 1; - let commit_log_bytes = match self.log_store.read_commit_entry(next_version).await { - Ok(Some(bytes)) => Ok(bytes), - Ok(None) => return Ok(PeekCommit::UpToDate), - Err(err) => Err(err), - }?; - - let actions = logstore::get_actions(next_version, commit_log_bytes).await; - Ok(PeekCommit::New(next_version, actions.unwrap())) + self.log_store().peek_next_commit(current_version).await } /// Updates the DeltaTable to the latest version by incrementally applying newer versions.