Skip to content

Commit

Permalink
chore: relocate peek_next_commit onto the logstore directly for easie…
Browse files Browse the repository at this point in the history
…r use

This will help retry logic associated with the logstore to directly find
the appropriate next c ommit

Related to #3306

Signed-off-by: R. Tyler Croy <[email protected]>
  • Loading branch information
rtyler committed Dec 19, 2024
1 parent 5b46d03 commit 9b95d45
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 23 deletions.
23 changes: 23 additions & 0 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,16 @@ pub enum CommitOrBytes {
LogBytes(Bytes),
}

/// The next commit that's available from underlying storage
///
#[derive(Debug)]
pub enum PeekCommit {
/// The next commit version and associated actions
New(i64, Vec<Action>),
/// Provided DeltaVersion is up to date
UpToDate,
}

/// Configuration parameters for a log store
#[derive(Debug, Clone)]
pub struct LogStoreConfig {
Expand Down Expand Up @@ -217,6 +227,19 @@ pub trait LogStore: Sync + Send {
/// Find earliest version currently stored in the delta log.
async fn get_earliest_version(&self, start_version: i64) -> DeltaResult<i64>;

/// Get the list of actions for the next commit
async fn peek_next_commit(&self, current_version: i64) -> Result<PeekCommit, DeltaTableError> {
let next_version = current_version + 1;
let commit_log_bytes = match self.read_commit_entry(next_version).await {
Ok(Some(bytes)) => Ok(bytes),
Ok(None) => return Ok(PeekCommit::UpToDate),
Err(err) => Err(err),
}?;

let actions = crate::logstore::get_actions(next_version, commit_log_bytes).await;
Ok(PeekCommit::New(next_version, actions.unwrap()))
}

/// Get underlying object store.
fn object_store(&self) -> Arc<dyn ObjectStore>;

Expand Down
33 changes: 10 additions & 23 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer};
use self::builder::DeltaTableConfig;
use self::state::DeltaTableState;
use crate::kernel::{
Action, CommitInfo, DataCheck, DataType, LogicalFile, Metadata, Protocol, StructType,
Transaction,
CommitInfo, DataCheck, DataType, LogicalFile, Metadata, Protocol, StructType, Transaction,
};
use crate::logstore::{self, extract_version_from_filename, LogStoreConfig, LogStoreRef};
use crate::logstore::{extract_version_from_filename, LogStoreConfig, LogStoreRef};
use crate::partitions::PartitionFilter;
use crate::storage::{commit_uri_from_version, ObjectStoreRef};
use crate::{DeltaResult, DeltaTableError};

// NOTE: this use can go away when peek_next_commit is removed off of [DeltaTable]
pub use crate::logstore::PeekCommit;

pub mod builder;
pub mod config;
pub mod state;
Expand Down Expand Up @@ -178,17 +180,6 @@ pub(crate) fn get_partition_col_data_types<'a>(
.collect()
}

/// The next commit that's available from underlying storage
/// TODO: Maybe remove this and replace it with Some/None and create a `Commit` struct to contain the next commit
///
#[derive(Debug)]
pub enum PeekCommit {
/// The next commit version and associated actions
New(i64, Vec<Action>),
/// Provided DeltaVersion is up to date
UpToDate,
}

/// In memory representation of a Delta Table
#[derive(Clone)]
pub struct DeltaTable {
Expand Down Expand Up @@ -332,20 +323,16 @@ impl DeltaTable {
self.update_incremental(None).await
}

#[deprecated(
since = "0.22.4",
note = "peek_next_commit has moved to the logstore, use table.log_store().peek_next_commit() instead please :)"
)]
/// Get the list of actions for the next commit
pub async fn peek_next_commit(
&self,
current_version: i64,
) -> Result<PeekCommit, DeltaTableError> {
let next_version = current_version + 1;
let commit_log_bytes = match self.log_store.read_commit_entry(next_version).await {
Ok(Some(bytes)) => Ok(bytes),
Ok(None) => return Ok(PeekCommit::UpToDate),
Err(err) => Err(err),
}?;

let actions = logstore::get_actions(next_version, commit_log_bytes).await;
Ok(PeekCommit::New(next_version, actions.unwrap()))
self.log_store().peek_next_commit(current_version).await
}

/// Updates the DeltaTable to the latest version by incrementally applying newer versions.
Expand Down

0 comments on commit 9b95d45

Please sign in to comment.