diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index 19d7a510ef..645c62c53c 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -1924,7 +1924,8 @@ mod tests { .build(&table.state) .unwrap(); - let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap(); + let storage = table.object_store(); + let provider = DeltaTableProvider::try_new(table.state, storage, config).unwrap(); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(provider)).unwrap(); @@ -1983,7 +1984,8 @@ mod tests { let config = DeltaScanConfigBuilder::new().build(&table.state).unwrap(); - let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap(); + let storage = table.object_store(); + let provider = DeltaTableProvider::try_new(table.state, storage, config).unwrap(); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(provider)).unwrap(); diff --git a/crates/deltalake-core/src/lib.rs b/crates/deltalake-core/src/lib.rs index d683b906dd..644da2dcac 100644 --- a/crates/deltalake-core/src/lib.rs +++ b/crates/deltalake-core/src/lib.rs @@ -85,6 +85,7 @@ compile_error!( pub mod data_catalog; pub mod errors; pub mod kernel; +pub mod logstore; pub mod operations; pub mod protocol; pub mod schema; diff --git a/crates/deltalake-core/src/logstore/default_logstore.rs b/crates/deltalake-core/src/logstore/default_logstore.rs new file mode 100644 index 0000000000..d27b3650f8 --- /dev/null +++ b/crates/deltalake-core/src/logstore/default_logstore.rs @@ -0,0 +1,115 @@ +//! Default implementation of [`LogStore`] for storage backends with atomic put-if-absent operation +use std::cmp::max; + +use bytes::Bytes; +use futures::StreamExt; +use lazy_static::lazy_static; +use log::debug; +use object_store::{path::Path, Error as ObjectStoreError, ObjectStore}; +use regex::Regex; + +use super::LogStore; +use crate::{ + operations::transaction::TransactionError, + protocol::{get_last_checkpoint, ProtocolError}, + storage::{commit_uri_from_version, ObjectStoreRef}, + DeltaResult, DeltaTableError, +}; + +/// Default [`LogStore`] implementation +#[derive(Debug, Clone)] +pub struct DefaultLogStore { + pub(crate) storage: ObjectStoreRef, +} + +#[async_trait::async_trait] +impl LogStore for DefaultLogStore { + async fn read_commit_entry(&self, version: i64) -> DeltaResult { + let commit_uri = commit_uri_from_version(version); + let data = self.storage.get(&commit_uri).await?.bytes().await?; + Ok(data) + + // TODO: return actual actions instead + // let actions = Self::get_actions(next_version, commit_log_bytes).await; + } + + /// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists] + /// if the given `version` already exists. The caller should handle the retry logic itself. + /// This is low-level transaction API. If user does not want to maintain the commit loop then + /// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction` + /// with retry logic. + async fn write_commit_entry( + &self, + version: i64, + tmp_commit: &Path, + ) -> Result<(), TransactionError> { + // move temporary commit file to delta log directory + // rely on storage to fail if the file already exists - + self.storage + .rename_if_not_exists(tmp_commit, &commit_uri_from_version(version)) + .await + .map_err(|err| match err { + ObjectStoreError::AlreadyExists { .. } => { + TransactionError::VersionAlreadyExists(version) + } + _ => TransactionError::from(err), + })?; + Ok(()) + } + + async fn get_latest_version(&self, current_version: i64) -> DeltaResult { + let version_start = match get_last_checkpoint(&self.storage).await { + Ok(last_check_point) => last_check_point.version, + Err(ProtocolError::CheckpointNotFound) => { + // no checkpoint + -1 + } + Err(e) => { + return Err(DeltaTableError::from(e)); + } + }; + + debug!("latest checkpoint version: {version_start}"); + + let version_start = max(current_version, version_start); + + lazy_static! { + static ref DELTA_LOG_REGEX: Regex = + Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint).*$").unwrap(); + } + + // list files to find max version + let version = async { + let mut max_version: i64 = version_start; + let prefix = Some(self.storage.log_path()); + let offset_path = commit_uri_from_version(max_version); + let mut files = self.storage.list_with_offset(prefix, &offset_path).await?; + // let mut files = self.storage.list_with_offset(prefix, &offset_path).await?; + + while let Some(obj_meta) = files.next().await { + let obj_meta = obj_meta?; + if let Some(captures) = DELTA_LOG_REGEX.captures(obj_meta.location.as_ref()) { + let log_version = captures.get(1).unwrap().as_str().parse().unwrap(); + // listing may not be ordered + max_version = max(max_version, log_version); + // also cache timestamp for version, for faster time-travel + // TODO: temporarily disabled because `version_timestamp` is not available in the [`LogStore`] + // self.version_timestamp + // .insert(log_version, obj_meta.last_modified.timestamp()); + } + } + + if max_version < 0 { + return Err(DeltaTableError::not_a_table(self.storage.root_uri())); + } + + Ok::(max_version) + } + .await?; + Ok(version) + } + + fn object_store(&self) -> ObjectStoreRef { + self.storage.clone() + } +} diff --git a/crates/deltalake-core/src/logstore/mod.rs b/crates/deltalake-core/src/logstore/mod.rs new file mode 100644 index 0000000000..51d89ecff4 --- /dev/null +++ b/crates/deltalake-core/src/logstore/mod.rs @@ -0,0 +1,53 @@ +//! Delta log store. +use std::sync::Arc; + +use crate::{ + errors::DeltaResult, operations::transaction::TransactionError, storage::ObjectStoreRef, +}; +use bytes::Bytes; +use object_store::path::Path; + +pub mod default_logstore; + +/// Sharable reference to [`LogStore`] +pub type LogStoreRef = Arc; + +/// Trait for critical operations required to read and write commit entries in Delta logs. +/// +/// The correctness is predicated on the atomicity and durability guarantees of +/// the implementation of this interface. Specifically, +/// +/// - Atomic visibility: Any commit created via `write_commit_entry` must become visible atomically. +/// - Mutual exclusion: Only one writer must be able to create a commit for a specific version. +/// - Consistent listing: Once a commit entry for version `v` has been written, any future call to +/// `get_latest_version` must return a version >= `v`, i.e. the underlying file system entry must +/// become visible immediately. +#[async_trait::async_trait] +pub trait LogStore: Sync + Send { + /// Read data for commit entry with the given version. + /// TODO: return the actual commit data, i.e. Vec, instead? + async fn read_commit_entry(&self, version: i64) -> DeltaResult; + + /// Write list of actions as delta commit entry for given version. + /// + /// This operation can be retried with a higher version in case the write + /// fails with `TransactionError::VersionAlreadyExists`. + async fn write_commit_entry( + &self, + version: i64, + tmp_commit: &Path, + ) -> Result<(), TransactionError>; + + /// Find latest version currently stored in the delta log. + async fn get_latest_version(&self, start_version: i64) -> DeltaResult; + + /// Get underlying object store. + fn object_store(&self) -> ObjectStoreRef; +} + +// TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders +impl std::fmt::Debug for dyn LogStore + '_ { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.object_store().fmt(f) + } +} diff --git a/crates/deltalake-core/src/operations/create.rs b/crates/deltalake-core/src/operations/create.rs index 1dc9fdf8b2..c8c24a9d00 100644 --- a/crates/deltalake-core/src/operations/create.rs +++ b/crates/deltalake-core/src/operations/create.rs @@ -11,8 +11,8 @@ use super::transaction::commit; use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, DataType, Metadata, Protocol, StructField, StructType}; +use crate::logstore::{LogStore, LogStoreRef}; use crate::protocol::{DeltaOperation, SaveMode}; -use crate::storage::DeltaObjectStore; use crate::table::builder::ensure_table_uri; use crate::table::config::DeltaConfigKey; use crate::table::DeltaTableMetaData; @@ -55,7 +55,7 @@ pub struct CreateBuilder { partition_columns: Option>, storage_options: Option>, actions: Vec, - object_store: Option>, + log_store: Option, configuration: HashMap>, metadata: Option>, } @@ -78,7 +78,7 @@ impl CreateBuilder { partition_columns: None, storage_options: None, actions: Default::default(), - object_store: None, + log_store: None, configuration: Default::default(), metadata: Default::default(), } @@ -198,9 +198,9 @@ impl CreateBuilder { self } - /// Provide a [`DeltaObjectStore`] instance, that points at table location - pub fn with_object_store(mut self, object_store: Arc) -> Self { - self.object_store = Some(object_store); + /// Provide a [`LogStore`] instance, that points at table location + pub fn with_log_store(mut self, log_store: Arc) -> Self { + self.log_store = Some(log_store); self } @@ -219,12 +219,12 @@ impl CreateBuilder { return Err(CreateError::MissingSchema.into()); } - let (storage_url, table) = if let Some(object_store) = self.object_store { + let (storage_url, table) = if let Some(log_store) = self.log_store { ( - ensure_table_uri(object_store.root_uri())? + ensure_table_uri(log_store.object_store().root_uri())? .as_str() .to_string(), - DeltaTable::new(object_store, Default::default()), + DeltaTable::new(log_store, Default::default()), ) } else { let storage_url = ensure_table_uri(self.location.ok_or(CreateError::MissingLocation)?)?; @@ -311,7 +311,7 @@ impl std::future::IntoFuture for CreateBuilder { }; let version = commit( - table.object_store().as_ref(), + table.log_store.as_ref(), &actions, operation, table_state, @@ -443,11 +443,11 @@ mod tests { assert_eq!(table.version(), 0); let first_id = table.get_metadata().unwrap().id.clone(); - let object_store = table.object_store(); + let log_store = table.log_store; // Check an error is raised when a table exists at location let table = CreateBuilder::new() - .with_object_store(object_store.clone()) + .with_log_store(log_store.clone()) .with_columns(schema.fields().clone()) .with_save_mode(SaveMode::ErrorIfExists) .await; @@ -455,7 +455,7 @@ mod tests { // Check current table is returned when ignore option is chosen. let table = CreateBuilder::new() - .with_object_store(object_store.clone()) + .with_log_store(log_store.clone()) .with_columns(schema.fields().clone()) .with_save_mode(SaveMode::Ignore) .await @@ -464,7 +464,7 @@ mod tests { // Check table is overwritten let table = CreateBuilder::new() - .with_object_store(object_store.clone()) + .with_log_store(log_store.clone()) .with_columns(schema.fields().iter().cloned()) .with_save_mode(SaveMode::Overwrite) .await diff --git a/crates/deltalake-core/src/operations/delete.rs b/crates/deltalake-core/src/operations/delete.rs index 7f8be1f293..b697d0cdbe 100644 --- a/crates/deltalake-core/src/operations/delete.rs +++ b/crates/deltalake-core/src/operations/delete.rs @@ -21,6 +21,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Instant, SystemTime, UNIX_EPOCH}; +use crate::logstore::LogStoreRef; use datafusion::execution::context::{SessionContext, SessionState}; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_plan::filter::FilterExec; @@ -40,7 +41,7 @@ use crate::kernel::{Action, Add, Remove}; use crate::operations::transaction::commit; use crate::operations::write::write_execution_plan; use crate::protocol::DeltaOperation; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; +use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -54,7 +55,7 @@ pub struct DeleteBuilder { /// A snapshot of the table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan state: Option, /// Properties passed to underlying parquet writer for when files are rewritten @@ -84,11 +85,11 @@ pub struct DeleteMetrics { impl DeleteBuilder { /// Create a new [`DeleteBuilder`] - pub fn new(object_store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { predicate: None, snapshot, - store: object_store, + log_store, state: None, app_metadata: None, writer_properties: None, @@ -187,7 +188,7 @@ async fn excute_non_empty_expr( async fn execute( predicate: Option, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, state: SessionState, writer_properties: Option, @@ -197,7 +198,13 @@ async fn execute( let mut metrics = DeleteMetrics::default(); let scan_start = Instant::now(); - let candidates = find_files(snapshot, object_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files( + snapshot, + log_store.object_store().clone(), + &state, + predicate.clone(), + ) + .await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_micros(); let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); @@ -208,7 +215,7 @@ async fn execute( let write_start = Instant::now(); let add = excute_non_empty_expr( snapshot, - object_store.clone(), + log_store.object_store().clone(), &state, &predicate, &mut metrics, @@ -254,7 +261,7 @@ async fn execute( predicate: Some(fmt_expr_to_sql(&predicate)?), }; version = commit( - object_store.as_ref(), + log_store.as_ref(), &actions, operation, snapshot, @@ -278,7 +285,7 @@ impl std::future::IntoFuture for DeleteBuilder { let session = SessionContext::new(); // If a user provides their own their DF state then they must register the store themselves - register_store(this.store.clone(), session.runtime_env()); + register_store(this.log_store.object_store().clone(), session.runtime_env()); session.state() }); @@ -295,7 +302,7 @@ impl std::future::IntoFuture for DeleteBuilder { let ((actions, version), metrics) = execute( predicate, - this.store.clone(), + this.log_store.clone(), &this.snapshot, state, this.writer_properties, @@ -305,7 +312,7 @@ impl std::future::IntoFuture for DeleteBuilder { this.snapshot .merge(DeltaTableState::from_actions(actions, version)?, true, true); - let table = DeltaTable::new_with_state(this.store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/filesystem_check.rs b/crates/deltalake-core/src/operations/filesystem_check.rs index 65716bbfe1..826dd2975e 100644 --- a/crates/deltalake-core/src/operations/filesystem_check.rs +++ b/crates/deltalake-core/src/operations/filesystem_check.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; use std::fmt::Debug; -use std::sync::Arc; use std::time::SystemTime; use std::time::UNIX_EPOCH; @@ -27,9 +26,9 @@ use url::{ParseError, Url}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, Add, Remove}; +use crate::logstore::LogStoreRef; use crate::operations::transaction::commit; use crate::protocol::DeltaOperation; -use crate::storage::DeltaObjectStore; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -40,7 +39,7 @@ pub struct FileSystemCheckBuilder { /// A snapshot of the to-be-checked table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Don't remove actions to the table log. Just determine which files can be removed dry_run: bool, } @@ -56,7 +55,7 @@ pub struct FileSystemCheckMetrics { struct FileSystemCheckPlan { /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Files that no longer exists in undlying ObjectStore but have active add actions pub files_to_remove: Vec, } @@ -74,10 +73,10 @@ fn is_absolute_path(path: &str) -> DeltaResult { impl FileSystemCheckBuilder { /// Create a new [`FileSystemCheckBuilder`] - pub fn new(store: Arc, state: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, state: DeltaTableState) -> Self { FileSystemCheckBuilder { snapshot: state, - store, + log_store, dry_run: false, } } @@ -91,7 +90,7 @@ impl FileSystemCheckBuilder { async fn create_fsck_plan(&self) -> DeltaResult { let mut files_relative: HashMap<&str, &Add> = HashMap::with_capacity(self.snapshot.files().len()); - let store = self.store.clone(); + let log_store = self.log_store.clone(); for active in self.snapshot.files() { if is_absolute_path(&active.path)? { @@ -103,7 +102,8 @@ impl FileSystemCheckBuilder { } } - let mut files = self.store.list(None).await?; + let object_store = log_store.object_store(); + let mut files = object_store.list(None).await?; while let Some(result) = files.next().await { let file = result?; files_relative.remove(file.location.as_ref()); @@ -120,7 +120,7 @@ impl FileSystemCheckBuilder { Ok(FileSystemCheckPlan { files_to_remove, - store, + log_store, }) } } @@ -156,7 +156,7 @@ impl FileSystemCheckPlan { } commit( - self.store.as_ref(), + self.log_store.as_ref(), &actions, DeltaOperation::FileSystemCheck {}, snapshot, @@ -183,7 +183,7 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { let plan = this.create_fsck_plan().await?; if this.dry_run { return Ok(( - DeltaTable::new_with_state(this.store, this.snapshot), + DeltaTable::new_with_state(this.log_store, this.snapshot), FileSystemCheckMetrics { files_removed: plan.files_to_remove.into_iter().map(|f| f.path).collect(), dry_run: true, @@ -192,7 +192,7 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { } let metrics = plan.execute(&this.snapshot).await?; - let mut table = DeltaTable::new_with_state(this.store, this.snapshot); + let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot); table.update().await?; Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/load.rs b/crates/deltalake-core/src/operations/load.rs index 7baa59e3e1..1a4c5c4cc6 100644 --- a/crates/deltalake-core/src/operations/load.rs +++ b/crates/deltalake-core/src/operations/load.rs @@ -7,7 +7,7 @@ use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use futures::future::BoxFuture; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::storage::DeltaObjectStore; +use crate::logstore::LogStoreRef; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -16,17 +16,17 @@ pub struct LoadBuilder { /// A snapshot of the to-be-loaded table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// A sub-selection of columns to be loaded columns: Option>, } impl LoadBuilder { /// Create a new [`LoadBuilder`] - pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, columns: None, } } @@ -46,7 +46,7 @@ impl std::future::IntoFuture for LoadBuilder { let this = self; Box::pin(async move { - let table = DeltaTable::new_with_state(this.store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); let schema = table.state.arrow_schema()?; let projection = this .columns diff --git a/crates/deltalake-core/src/operations/merge.rs b/crates/deltalake-core/src/operations/merge.rs index 57621cb316..eb28e14f69 100644 --- a/crates/deltalake-core/src/operations/merge.rs +++ b/crates/deltalake-core/src/operations/merge.rs @@ -68,10 +68,10 @@ use super::transaction::commit; use crate::delta_datafusion::expr::{fmt_expr_to_sql, parse_predicate_expression}; use crate::delta_datafusion::{register_store, DeltaScanBuilder}; use crate::kernel::{Action, Remove}; +use crate::logstore::LogStoreRef; use crate::operations::datafusion_utils::MetricObserverExec; use crate::operations::write::write_execution_plan; use crate::protocol::{DeltaOperation, MergePredicate}; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableError}; @@ -107,7 +107,7 @@ pub struct MergeBuilder { /// The source data source: DataFrame, /// Delta object store for handling data files - object_store: Arc, + log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan state: Option, /// Properties passed to underlying parquet writer for when files are rewritten @@ -122,7 +122,7 @@ pub struct MergeBuilder { impl MergeBuilder { /// Create a new [`MergeBuilder`] pub fn new>( - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: DeltaTableState, predicate: E, source: DataFrame, @@ -132,7 +132,7 @@ impl MergeBuilder { predicate, source, snapshot, - object_store, + log_store, source_alias: None, target_alias: None, state: None, @@ -561,7 +561,7 @@ pub struct MergeMetrics { async fn execute( predicate: Expression, source: DataFrame, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, state: SessionState, writer_properties: Option, @@ -590,7 +590,7 @@ async fn execute( // predicates also need to be considered when pruning let target = Arc::new( - DeltaScanBuilder::new(snapshot, object_store.clone(), &state) + DeltaScanBuilder::new(snapshot, log_store.object_store(), &state) .with_schema(snapshot.input_schema()?) .build() .await?, @@ -1126,7 +1126,7 @@ async fn execute( state.clone(), projection.clone(), table_partition_cols.clone(), - object_store.clone(), + log_store.object_store().clone(), Some(snapshot.table_config().target_file_size() as usize), None, writer_properties, @@ -1188,7 +1188,7 @@ async fn execute( not_matched_by_source_predicates: not_match_source_operations, }; version = commit( - object_store.as_ref(), + log_store.as_ref(), &actions, operation, snapshot, @@ -1212,7 +1212,7 @@ impl std::future::IntoFuture for MergeBuilder { let session = SessionContext::new(); // If a user provides their own their DF state then they must register the store themselves - register_store(this.object_store.clone(), session.runtime_env()); + register_store(this.log_store.object_store().clone(), session.runtime_env()); session.state() }); @@ -1220,7 +1220,7 @@ impl std::future::IntoFuture for MergeBuilder { let ((actions, version), metrics) = execute( this.predicate, this.source, - this.object_store.clone(), + this.log_store.clone(), &this.snapshot, state, this.writer_properties, @@ -1236,7 +1236,7 @@ impl std::future::IntoFuture for MergeBuilder { this.snapshot .merge(DeltaTableState::from_actions(actions, version)?, true, true); - let table = DeltaTable::new_with_state(this.object_store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/mod.rs b/crates/deltalake-core/src/operations/mod.rs index 35301f067e..d008e5874a 100644 --- a/crates/deltalake-core/src/operations/mod.rs +++ b/crates/deltalake-core/src/operations/mod.rs @@ -107,60 +107,60 @@ impl DeltaOps { /// ``` #[must_use] pub fn create(self) -> CreateBuilder { - CreateBuilder::default().with_object_store(self.0.object_store()) + CreateBuilder::default().with_log_store(self.0.log_store.clone()) } /// Load data from a DeltaTable #[cfg(feature = "datafusion")] #[must_use] pub fn load(self) -> LoadBuilder { - LoadBuilder::new(self.0.object_store(), self.0.state) + LoadBuilder::new(self.0.log_store.clone(), self.0.state) } /// Write data to Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn write(self, batches: impl IntoIterator) -> WriteBuilder { - WriteBuilder::new(self.0.object_store(), self.0.state).with_input_batches(batches) + WriteBuilder::new(self.0.log_store.clone(), self.0.state).with_input_batches(batches) } /// Vacuum stale files from delta table #[must_use] pub fn vacuum(self) -> VacuumBuilder { - VacuumBuilder::new(self.0.object_store(), self.0.state) + VacuumBuilder::new(self.0.log_store.clone(), self.0.state) } /// Audit active files with files present on the filesystem #[must_use] pub fn filesystem_check(self) -> FileSystemCheckBuilder { - FileSystemCheckBuilder::new(self.0.object_store(), self.0.state) + FileSystemCheckBuilder::new(self.0.log_store.clone(), self.0.state) } /// Audit active files with files present on the filesystem #[cfg(all(feature = "arrow", feature = "parquet"))] #[must_use] pub fn optimize<'a>(self) -> OptimizeBuilder<'a> { - OptimizeBuilder::new(self.0.object_store(), self.0.state) + OptimizeBuilder::new(self.0.log_store.clone(), self.0.state) } /// Delete data from Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn delete(self) -> DeleteBuilder { - DeleteBuilder::new(self.0.object_store(), self.0.state) + DeleteBuilder::new(self.0.log_store.clone(), self.0.state) } /// Update data from Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn update(self) -> UpdateBuilder { - UpdateBuilder::new(self.0.object_store(), self.0.state) + UpdateBuilder::new(self.0.log_store.clone(), self.0.state) } /// Restore delta table to a specified version or datetime #[must_use] pub fn restore(self) -> RestoreBuilder { - RestoreBuilder::new(self.0.object_store(), self.0.state) + RestoreBuilder::new(self.0.log_store.clone(), self.0.state) } /// Update data from Delta table @@ -172,7 +172,7 @@ impl DeltaOps { predicate: E, ) -> MergeBuilder { MergeBuilder::new( - self.0.object_store(), + self.0.log_store.clone(), self.0.state, predicate.into(), source, diff --git a/crates/deltalake-core/src/operations/optimize.rs b/crates/deltalake-core/src/operations/optimize.rs index 7feecd1e56..254ff3ce03 100644 --- a/crates/deltalake-core/src/operations/optimize.rs +++ b/crates/deltalake-core/src/operations/optimize.rs @@ -42,6 +42,7 @@ use super::transaction::commit; use super::writer::{PartitionWriter, PartitionWriterConfig}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, Remove}; +use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; @@ -155,7 +156,7 @@ pub struct OptimizeBuilder<'a> { /// A snapshot of the to-be-optimized table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: ObjectStoreRef, + log_store: LogStoreRef, /// Filters to select specific table partitions to be optimized filters: &'a [PartitionFilter], /// Desired file size after bin-packing files @@ -177,10 +178,10 @@ pub struct OptimizeBuilder<'a> { impl<'a> OptimizeBuilder<'a> { /// Create a new [`OptimizeBuilder`] - pub fn new(store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, filters: &[], target_size: None, writer_properties: None, @@ -274,14 +275,14 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { )?; let metrics = plan .execute( - this.store.clone(), + this.log_store.clone(), &this.snapshot, this.max_concurrent_tasks, this.max_spill_size, this.min_commit_interval, ) .await?; - let mut table = DeltaTable::new_with_state(this.store, this.snapshot); + let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot); table.update().await?; Ok((table, metrics)) }) @@ -584,7 +585,7 @@ impl MergePlan { /// Perform the operations outlined in the plan. pub async fn execute( mut self, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, max_concurrent_tasks: usize, #[allow(unused_variables)] // used behind a feature flag @@ -607,12 +608,13 @@ impl MergePlan { for file in files.iter() { debug!(" file {}", file.location); } - let object_store_ref = object_store.clone(); + let object_store_ref = log_store.clone(); let batch_stream = futures::stream::iter(files.clone()) .then(move |file| { let object_store_ref = object_store_ref.clone(); async move { - let file_reader = ParquetObjectReader::new(object_store_ref, file); + let file_reader = + ParquetObjectReader::new(object_store_ref.object_store(), file); ParquetRecordBatchStreamBuilder::new(file_reader) .await? .build() @@ -625,7 +627,7 @@ impl MergePlan { self.task_parameters.clone(), partition, files, - object_store.clone(), + log_store.object_store().clone(), futures::future::ready(Ok(batch_stream)), )); util::flatten_join_error(rewrite_result) @@ -635,13 +637,15 @@ impl MergePlan { #[cfg(not(feature = "datafusion"))] let exec_context = Arc::new(zorder::ZOrderExecContext::new( zorder_columns, - object_store.clone(), + log_store.object_store().clone(), // If there aren't enough bins to use all threads, then instead // use threads within the bins. This is important for the case where // the table is un-partitioned, in which case the entire table is just // one big bin. bins.len() <= num_cpus::get(), )); + let object_store = log_store.object_store().clone(); + #[cfg(feature = "datafusion")] let exec_context = Arc::new(zorder::ZOrderExecContext::new( zorder_columns, @@ -649,7 +653,6 @@ impl MergePlan { max_spill_size, )?); let task_parameters = self.task_parameters.clone(); - let object_store = object_store.clone(); futures::stream::iter(bins) .map(move |(partition, files)| { let batch_stream = Self::read_zorder(files.clone(), exec_context.clone()); @@ -671,7 +674,7 @@ impl MergePlan { let mut stream = stream.buffer_unordered(max_concurrent_tasks); - let mut table = DeltaTable::new_with_state(object_store.clone(), snapshot.clone()); + let mut table = DeltaTable::new_with_state(log_store.clone(), snapshot.clone()); // Actions buffered so far. These will be flushed either at the end // or when we reach the commit interval. @@ -720,7 +723,7 @@ impl MergePlan { //// TODO: Check for remove actions on optimized partitions. If a //// optimized partition was updated then abort the commit. Requires (#593). commit( - table.object_store().as_ref(), + table.log_store.as_ref(), &actions, self.task_parameters.input_parameters.clone().into(), table.get_state(), diff --git a/crates/deltalake-core/src/operations/restore.rs b/crates/deltalake-core/src/operations/restore.rs index a356b5b312..c391de6f04 100644 --- a/crates/deltalake-core/src/operations/restore.rs +++ b/crates/deltalake-core/src/operations/restore.rs @@ -10,7 +10,8 @@ //! 5) If ignore_missing_files option is false (default value) check availability of AddFile //! in file system. //! 6) Commit Protocol, all RemoveFile and AddFile actions -//! into delta log using `try_commit_transaction` (commit will be failed in case of parallel transaction) +//! into delta log using `LogStore::write_commit_entry` (commit will be failed in case of parallel transaction) +//! TODO: comment is outdated //! 7) If table was modified in parallel then ignore restore and raise exception. //! //! # Example @@ -31,9 +32,9 @@ use object_store::ObjectStore; use serde::Serialize; use crate::kernel::{Action, Add, Protocol, Remove}; -use crate::operations::transaction::{prepare_commit, try_commit_transaction, TransactionError}; +use crate::logstore::LogStoreRef; +use crate::operations::transaction::{prepare_commit, TransactionError}; use crate::protocol::DeltaOperation; -use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableConfig, DeltaTableError, ObjectStoreError}; @@ -74,7 +75,7 @@ pub struct RestoreBuilder { /// A snapshot of the to-be-restored table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: ObjectStoreRef, + log_store: LogStoreRef, /// Version to restore version_to_restore: Option, /// Datetime to restore @@ -87,10 +88,10 @@ pub struct RestoreBuilder { impl RestoreBuilder { /// Create a new [`RestoreBuilder`] - pub fn new(store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, version_to_restore: None, datetime_to_restore: None, ignore_missing_files: false, @@ -125,7 +126,7 @@ impl RestoreBuilder { } async fn execute( - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: DeltaTableState, version_to_restore: Option, datetime_to_restore: Option>, @@ -138,7 +139,7 @@ async fn execute( { return Err(DeltaTableError::from(RestoreError::InvalidRestoreParameter)); } - let mut table = DeltaTable::new(object_store.clone(), DeltaTableConfig::default()); + let mut table = DeltaTable::new(log_store.clone(), DeltaTableConfig::default()); let version = match datetime_to_restore { Some(datetime) => { table.load_with_datetime(datetime).await?; @@ -195,7 +196,7 @@ async fn execute( .collect(); if !ignore_missing_files { - check_files_available(object_store.as_ref(), &files_to_add).await?; + check_files_available(log_store.object_store().as_ref(), &files_to_add).await?; } let metrics = RestoreMetrics { @@ -238,7 +239,7 @@ async fn execute( actions.extend(files_to_remove.into_iter().map(Action::Remove)); let commit = prepare_commit( - object_store.as_ref(), + log_store.object_store().as_ref(), &DeltaOperation::Restore { version: version_to_restore, datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }), @@ -249,13 +250,13 @@ async fn execute( ) .await?; let commit_version = snapshot.version() + 1; - match try_commit_transaction(object_store.as_ref(), &commit, commit_version).await { + match log_store.write_commit_entry(commit_version, &commit).await { Ok(_) => {} Err(err @ TransactionError::VersionAlreadyExists(_)) => { return Err(err.into()); } Err(err) => { - object_store.delete(&commit).await?; + log_store.object_store().delete(&commit).await?; return Err(err.into()); } } @@ -291,7 +292,7 @@ impl std::future::IntoFuture for RestoreBuilder { Box::pin(async move { let metrics = execute( - this.store.clone(), + this.log_store.clone(), this.snapshot.clone(), this.version_to_restore, this.datetime_to_restore, @@ -299,7 +300,7 @@ impl std::future::IntoFuture for RestoreBuilder { this.protocol_downgrade_allowed, ) .await?; - let mut table = DeltaTable::new_with_state(this.store, this.snapshot); + let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot); table.update().await?; Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/transaction/mod.rs b/crates/deltalake-core/src/operations/transaction/mod.rs index c31c349fd7..8b66bd38fd 100644 --- a/crates/deltalake-core/src/operations/transaction/mod.rs +++ b/crates/deltalake-core/src/operations/transaction/mod.rs @@ -10,8 +10,8 @@ use serde_json::Value; use crate::crate_version; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, CommitInfo}; +use crate::logstore::LogStore; use crate::protocol::DeltaOperation; -use crate::storage::commit_uri_from_version; use crate::table::state::DeltaTableState; mod conflict_checker; @@ -130,6 +130,7 @@ pub(crate) fn get_commit_bytes( /// Low-level transaction API. Creates a temporary commit file. Once created, /// the transaction object could be dropped and the actual commit could be executed /// with `DeltaTable.try_commit_transaction`. +/// TODO: comment is outdated now pub(crate) async fn prepare_commit<'a>( storage: &dyn ObjectStore, operation: &DeltaOperation, @@ -150,42 +151,26 @@ pub(crate) async fn prepare_commit<'a>( Ok(path) } -/// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists] -/// if the given `version` already exists. The caller should handle the retry logic itself. -/// This is low-level transaction API. If user does not want to maintain the commit loop then -/// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction` -/// with retry logic. -pub(crate) async fn try_commit_transaction( - storage: &dyn ObjectStore, - tmp_commit: &Path, - version: i64, -) -> Result { - // move temporary commit file to delta log directory - // rely on storage to fail if the file already exists - - storage - .rename_if_not_exists(tmp_commit, &commit_uri_from_version(version)) - .await - .map_err(|err| match err { - ObjectStoreError::AlreadyExists { .. } => { - TransactionError::VersionAlreadyExists(version) - } - _ => TransactionError::from(err), - })?; - Ok(version) -} - /// Commit a transaction, with up to 15 retries. This is higher-level transaction API. /// /// Will error early if the a concurrent transaction has already been committed /// and conflicts with this transaction. pub async fn commit( - storage: &dyn ObjectStore, + log_store: &dyn LogStore, actions: &Vec, operation: DeltaOperation, read_snapshot: &DeltaTableState, app_metadata: Option>, ) -> DeltaResult { - commit_with_retries(storage, actions, operation, read_snapshot, app_metadata, 15).await + commit_with_retries( + log_store, + actions, + operation, + read_snapshot, + app_metadata, + 15, + ) + .await } /// Commit a transaction, with up configurable number of retries. This is higher-level transaction API. @@ -193,24 +178,35 @@ pub async fn commit( /// The function will error early if the a concurrent transaction has already been committed /// and conflicts with this transaction. pub async fn commit_with_retries( - storage: &dyn ObjectStore, + log_store: &dyn LogStore, actions: &Vec, operation: DeltaOperation, read_snapshot: &DeltaTableState, app_metadata: Option>, max_retries: usize, ) -> DeltaResult { - let tmp_commit = - prepare_commit(storage, &operation, actions, read_snapshot, app_metadata).await?; + let tmp_commit = prepare_commit( + log_store.object_store().as_ref(), + &operation, + actions, + read_snapshot, + app_metadata, + ) + .await?; let mut attempt_number = 1; while attempt_number <= max_retries { let version = read_snapshot.version() + attempt_number as i64; - match try_commit_transaction(storage, &tmp_commit, version).await { - Ok(version) => return Ok(version), + match log_store.write_commit_entry(version, &tmp_commit).await { + Ok(()) => return Ok(version), Err(TransactionError::VersionAlreadyExists(version)) => { - let summary = WinningCommitSummary::try_new(storage, version - 1, version).await?; + let summary = WinningCommitSummary::try_new( + log_store.object_store().as_ref(), + version - 1, + version, + ) + .await?; let transaction_info = TransactionInfo::try_new( read_snapshot, operation.read_predicate(), @@ -225,13 +221,13 @@ pub async fn commit_with_retries( attempt_number += 1; } Err(err) => { - storage.delete(&tmp_commit).await?; + log_store.object_store().delete(&tmp_commit).await?; return Err(TransactionError::CommitConflict(err).into()); } }; } Err(err) => { - storage.delete(&tmp_commit).await?; + log_store.object_store().delete(&tmp_commit).await?; return Err(err.into()); } } @@ -242,11 +238,17 @@ pub async fn commit_with_retries( #[cfg(all(test, feature = "parquet"))] mod tests { + use std::{collections::HashMap, sync::Arc}; + use self::test_utils::{create_remove_action, init_table_actions}; use super::*; - use crate::DeltaConfigKey; + use crate::{ + logstore::default_logstore::DefaultLogStore, + storage::{commit_uri_from_version, DeltaObjectStore}, + DeltaConfigKey, + }; use object_store::memory::InMemory; - use std::collections::HashMap; + use url::Url; #[test] fn test_commit_uri_from_version() { @@ -290,18 +292,22 @@ mod tests { #[tokio::test] async fn test_try_commit_transaction() { - let store = InMemory::new(); + let store = Arc::new(InMemory::new()); + let url = Url::parse("mem://what/is/this").unwrap(); + let delta_store = DeltaObjectStore::new(store.clone(), url); + let log_store = DefaultLogStore { + storage: Arc::new(delta_store), + }; let tmp_path = Path::from("_delta_log/tmp"); let version_path = Path::from("_delta_log/00000000000000000000.json"); store.put(&tmp_path, bytes::Bytes::new()).await.unwrap(); store.put(&version_path, bytes::Bytes::new()).await.unwrap(); + let res = log_store.write_commit_entry(0, &tmp_path).await; // fails if file version already exists - let res = try_commit_transaction(&store, &tmp_path, 0).await; assert!(res.is_err()); // succeeds for next version - let res = try_commit_transaction(&store, &tmp_path, 1).await.unwrap(); - assert_eq!(res, 1); + log_store.write_commit_entry(1, &tmp_path).await.unwrap(); } } diff --git a/crates/deltalake-core/src/operations/transaction/test_utils.rs b/crates/deltalake-core/src/operations/transaction/test_utils.rs index b52b1a1c7b..56b0894019 100644 --- a/crates/deltalake-core/src/operations/transaction/test_utils.rs +++ b/crates/deltalake-core/src/operations/transaction/test_utils.rs @@ -1,7 +1,7 @@ #![allow(unused)] use std::collections::HashMap; -use super::{prepare_commit, try_commit_transaction}; +use super::prepare_commit; use crate::kernel::{ Action, Add, CommitInfo, DataType, Metadata, PrimitiveType, Protocol, Remove, StructField, StructType, @@ -121,7 +121,7 @@ pub async fn create_initialized_table( partition_cols: &[String], configuration: Option>>, ) -> DeltaTable { - let storage = DeltaTableBuilder::from_uri("memory://") + let log_store = DeltaTableBuilder::from_uri("memory://") .build_storage() .unwrap(); let table_schema = StructType::new(vec![ @@ -161,11 +161,19 @@ pub async fn create_initialized_table( ), }; let actions = init_table_actions(None); - let prepared_commit = prepare_commit(storage.as_ref(), &operation, &actions, &state, None) - .await - .unwrap(); - try_commit_transaction(storage.as_ref(), &prepared_commit, 0) + let prepared_commit = prepare_commit( + log_store.object_store().as_ref(), + &operation, + &actions, + &state, + None, + ) + .await + .unwrap(); + + log_store + .write_commit_entry(0, &prepared_commit) .await .unwrap(); - DeltaTable::new_with_state(storage, state) + DeltaTable::new_with_state(log_store, state) } diff --git a/crates/deltalake-core/src/operations/update.rs b/crates/deltalake-core/src/operations/update.rs index 1723d287a2..54479a73de 100644 --- a/crates/deltalake-core/src/operations/update.rs +++ b/crates/deltalake-core/src/operations/update.rs @@ -49,8 +49,8 @@ use super::write::write_execution_plan; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; use crate::kernel::{Action, Remove}; +use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableError}; @@ -64,7 +64,7 @@ pub struct UpdateBuilder { /// A snapshot of the table's state snapshot: DeltaTableState, /// Delta object store for handling data files - object_store: Arc, + log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan state: Option, /// Properties passed to underlying parquet writer for when files are rewritten @@ -95,12 +95,12 @@ pub struct UpdateMetrics { impl UpdateBuilder { /// Create a new ['UpdateBuilder'] - pub fn new(object_store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { predicate: None, updates: HashMap::new(), snapshot, - object_store, + log_store, state: None, writer_properties: None, app_metadata: None, @@ -164,7 +164,7 @@ impl UpdateBuilder { async fn execute( predicate: Option, updates: HashMap, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, state: SessionState, writer_properties: Option, @@ -213,7 +213,13 @@ async fn execute( let table_partition_cols = current_metadata.partition_columns.clone(); let scan_start = Instant::now(); - let candidates = find_files(snapshot, object_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files( + snapshot, + log_store.object_store(), + &state, + predicate.clone(), + ) + .await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis() as u64; if candidates.candidates.is_empty() { @@ -225,7 +231,7 @@ async fn execute( let execution_props = state.execution_props(); // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches - let scan = DeltaScanBuilder::new(snapshot, object_store.clone(), &state) + let scan = DeltaScanBuilder::new(snapshot, log_store.object_store().clone(), &state) .with_files(&candidates.candidates) .build() .await?; @@ -357,7 +363,7 @@ async fn execute( state.clone(), projection.clone(), table_partition_cols.clone(), - object_store.clone(), + log_store.object_store().clone(), Some(snapshot.table_config().target_file_size() as usize), None, writer_properties, @@ -407,7 +413,7 @@ async fn execute( predicate: Some(fmt_expr_to_sql(&predicate)?), }; version = commit( - object_store.as_ref(), + log_store.as_ref(), &actions, operation, snapshot, @@ -430,7 +436,7 @@ impl std::future::IntoFuture for UpdateBuilder { let session = SessionContext::new(); // If a user provides their own their DF state then they must register the store themselves - register_store(this.object_store.clone(), session.runtime_env()); + register_store(this.log_store.object_store().clone(), session.runtime_env()); session.state() }); @@ -438,7 +444,7 @@ impl std::future::IntoFuture for UpdateBuilder { let ((actions, version), metrics) = execute( this.predicate, this.updates, - this.object_store.clone(), + this.log_store.clone(), &this.snapshot, state, this.writer_properties, @@ -449,7 +455,7 @@ impl std::future::IntoFuture for UpdateBuilder { this.snapshot .merge(DeltaTableState::from_actions(actions, version)?, true, true); - let table = DeltaTable::new_with_state(this.object_store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/vacuum.rs b/crates/deltalake-core/src/operations/vacuum.rs index 47f7c1d5c9..efdde55347 100644 --- a/crates/deltalake-core/src/operations/vacuum.rs +++ b/crates/deltalake-core/src/operations/vacuum.rs @@ -37,8 +37,8 @@ use super::transaction::commit; use crate::crate_version; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::Action; +use crate::logstore::{LogStore, LogStoreRef}; use crate::protocol::DeltaOperation; -use crate::storage::DeltaObjectStore; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -82,7 +82,7 @@ pub struct VacuumBuilder { /// A snapshot of the to-be-vacuumed table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Period of stale files allowed. retention_period: Option, /// Validate the retention period is not below the retention period configured in the table @@ -125,10 +125,10 @@ pub struct VacuumEndOperationMetrics { /// Methods to specify various vacuum options and to execute the operation impl VacuumBuilder { /// Create a new [`VacuumBuilder`] - pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { VacuumBuilder { snapshot, - store, + log_store, retention_period: None, enforce_retention_duration: true, dry_run: false, @@ -184,8 +184,11 @@ impl VacuumBuilder { let mut files_to_delete = vec![]; let mut file_sizes = vec![]; - let mut all_files = self.store.list(None).await.map_err(DeltaTableError::from)?; - + let object_store = self.log_store.object_store(); + let mut all_files = object_store + .list(None) + .await + .map_err(DeltaTableError::from)?; let partition_columns = &self .snapshot .current_metadata() @@ -227,7 +230,7 @@ impl std::future::IntoFuture for VacuumBuilder { let plan = this.create_vacuum_plan().await?; if this.dry_run { return Ok(( - DeltaTable::new_with_state(this.store, this.snapshot), + DeltaTable::new_with_state(this.log_store, this.snapshot), VacuumMetrics { files_deleted: plan.files_to_delete.iter().map(|f| f.to_string()).collect(), dry_run: true, @@ -235,9 +238,11 @@ impl std::future::IntoFuture for VacuumBuilder { )); } - let metrics = plan.execute(&this.store, &this.snapshot).await?; + let metrics = plan + .execute(this.log_store.as_ref(), &this.snapshot) + .await?; Ok(( - DeltaTable::new_with_state(this.store, this.snapshot), + DeltaTable::new_with_state(this.log_store, this.snapshot), metrics, )) }) @@ -262,7 +267,7 @@ impl VacuumPlan { /// Execute the vacuum plan and delete files from underlying storage pub async fn execute( self, - store: &DeltaObjectStore, + store: &dyn LogStore, snapshot: &DeltaTableState, ) -> Result { if self.files_to_delete.is_empty() { @@ -311,6 +316,7 @@ impl VacuumPlan { .boxed(); let files_deleted = store + .object_store() .delete_stream(locations) .map(|res| match res { Ok(path) => Ok(path.to_string()), @@ -395,7 +401,7 @@ mod tests { async fn vacuum_delta_8_0_table() { let table = open_table("./tests/data/delta-0.8.0").await.unwrap(); - let result = VacuumBuilder::new(table.object_store(), table.state.clone()) + let result = VacuumBuilder::new(table.log_store, table.state.clone()) .with_retention_period(Duration::hours(1)) .with_dry_run(true) .await; @@ -403,7 +409,7 @@ mod tests { assert!(result.is_err()); let table = open_table("./tests/data/delta-0.8.0").await.unwrap(); - let (table, result) = VacuumBuilder::new(table.object_store(), table.state) + let (table, result) = VacuumBuilder::new(table.log_store, table.state) .with_retention_period(Duration::hours(0)) .with_dry_run(true) .with_enforce_retention_duration(false) @@ -415,7 +421,7 @@ mod tests { vec!["part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"] ); - let (table, result) = VacuumBuilder::new(table.object_store(), table.state) + let (table, result) = VacuumBuilder::new(table.log_store, table.state) .with_retention_period(Duration::hours(169)) .with_dry_run(true) .await @@ -432,7 +438,7 @@ mod tests { .as_secs() / 3600; let empty: Vec = Vec::new(); - let (_table, result) = VacuumBuilder::new(table.object_store(), table.state) + let (_table, result) = VacuumBuilder::new(table.log_store, table.state) .with_retention_period(Duration::hours(retention_hours as i64)) .with_dry_run(true) .await diff --git a/crates/deltalake-core/src/operations/write.rs b/crates/deltalake-core/src/operations/write.rs index 45bdaaeff5..46fb579ffc 100644 --- a/crates/deltalake-core/src/operations/write.rs +++ b/crates/deltalake-core/src/operations/write.rs @@ -44,8 +44,9 @@ use super::{transaction::commit, CreateBuilder}; use crate::delta_datafusion::DeltaDataChecker; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, Add, Remove, StructType}; +use crate::logstore::LogStoreRef; use crate::protocol::{DeltaOperation, SaveMode}; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; +use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; use crate::writer::record_batch::divide_by_partition_values; use crate::writer::utils::PartitionPath; @@ -90,7 +91,7 @@ pub struct WriteBuilder { /// A snapshot of the to-be-loaded table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// The input plan input: Option>, /// Datafusion session state relevant for executing the input plan @@ -117,10 +118,10 @@ pub struct WriteBuilder { impl WriteBuilder { /// Create a new [`WriteBuilder`] - pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, input: None, state: None, mode: SaveMode::Append, @@ -210,7 +211,8 @@ impl WriteBuilder { } async fn check_preconditions(&self) -> DeltaResult> { - match self.store.is_delta_table_location().await? { + let object_store = self.log_store.object_store(); + match object_store.is_delta_table_location().await? { true => { let min_writer = self.snapshot.min_writer_version(); if min_writer > MAX_SUPPORTED_WRITER_VERSION { @@ -218,7 +220,7 @@ impl WriteBuilder { } else { match self.mode { SaveMode::ErrorIfExists => { - Err(WriteError::AlreadyExists(self.store.root_uri()).into()) + Err(WriteError::AlreadyExists(object_store.root_uri()).into()) } _ => Ok(vec![]), } @@ -236,7 +238,7 @@ impl WriteBuilder { Err(WriteError::MissingData) }?; let mut builder = CreateBuilder::new() - .with_object_store(self.store.clone()) + .with_log_store(self.log_store.clone()) .with_columns(schema.fields().clone()); if let Some(partition_columns) = self.partition_columns.as_ref() { builder = builder.with_partition_columns(partition_columns.clone()) @@ -356,7 +358,7 @@ impl std::future::IntoFuture for WriteBuilder { let schema = batches[0].schema(); let table_schema = this .snapshot - .physical_arrow_schema(this.store.clone()) + .physical_arrow_schema(this.log_store.object_store().clone()) .await .or_else(|_| this.snapshot.arrow_schema()) .unwrap_or(schema.clone()); @@ -418,7 +420,7 @@ impl std::future::IntoFuture for WriteBuilder { state, plan, partition_columns.clone(), - this.store.clone(), + this.log_store.object_store().clone(), this.target_file_size, this.write_batch_size, this.writer_properties, @@ -468,7 +470,7 @@ impl std::future::IntoFuture for WriteBuilder { }; let version = commit( - this.store.as_ref(), + this.log_store.as_ref(), &actions, DeltaOperation::Write { mode: this.mode, @@ -492,7 +494,7 @@ impl std::future::IntoFuture for WriteBuilder { // TODO should we build checkpoints based on config? - Ok(DeltaTable::new_with_state(this.store, this.snapshot)) + Ok(DeltaTable::new_with_state(this.log_store, this.snapshot)) }) } } diff --git a/crates/deltalake-core/src/operations/writer.rs b/crates/deltalake-core/src/operations/writer.rs index 0bba167e33..6d551ecb96 100644 --- a/crates/deltalake-core/src/operations/writer.rs +++ b/crates/deltalake-core/src/operations/writer.rs @@ -406,9 +406,10 @@ mod tests { #[tokio::test] async fn test_write_partition() { - let object_store = DeltaTableBuilder::from_uri("memory://") + let log_store = DeltaTableBuilder::from_uri("memory://") .build_storage() .unwrap(); + let object_store = log_store.object_store(); let batch = get_record_batch(None, false); // write single un-partitioned batch @@ -439,12 +440,13 @@ mod tests { let object_store = DeltaTableBuilder::from_uri("memory://") .build_storage() - .unwrap(); + .unwrap() + .object_store(); let properties = WriterProperties::builder() .set_max_row_group_size(1024) .build(); // configure small target file size and and row group size so we can observe multiple files written - let mut writer = get_writer(object_store.clone(), &batch, Some(properties), Some(10_000)); + let mut writer = get_writer(object_store, &batch, Some(properties), Some(10_000)); writer.write(&batch).await.unwrap(); // check that we have written more then once file, and no more then 1 is below target size diff --git a/crates/deltalake-core/src/protocol/checkpoints.rs b/crates/deltalake-core/src/protocol/checkpoints.rs index fc23c1d28b..bee45f51bf 100644 --- a/crates/deltalake-core/src/protocol/checkpoints.rs +++ b/crates/deltalake-core/src/protocol/checkpoints.rs @@ -70,7 +70,12 @@ pub const CHECKPOINT_RECORD_BATCH_SIZE: usize = 5000; /// Creates checkpoint at current table version pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), ProtocolError> { - create_checkpoint_for(table.version(), table.get_state(), table.storage.as_ref()).await?; + create_checkpoint_for( + table.version(), + table.get_state(), + table.object_store().as_ref(), + ) + .await?; Ok(()) } @@ -81,7 +86,7 @@ pub async fn cleanup_metadata(table: &DeltaTable) -> Result DeltaResult { + pub fn build_storage(self) -> DeltaResult { match self.options.storage_backend { - Some((storage, location)) => Ok(Arc::new(DeltaObjectStore::new( - storage, - ensure_table_uri(location.as_str())?, - ))), + Some((storage, location)) => Ok(Arc::new(DefaultLogStore { + storage: Arc::new(DeltaObjectStore::new( + storage, + ensure_table_uri(location.as_str())?, + )), + })), None => { let location = ensure_table_uri(&self.options.table_uri)?; - Ok(Arc::new(DeltaObjectStore::try_new( - location, - self.storage_options(), - )?)) + Ok(Arc::new(DefaultLogStore { + storage: Arc::new(DeltaObjectStore::try_new(location, self.storage_options())?), + })) } } } diff --git a/crates/deltalake-core/src/table/mod.rs b/crates/deltalake-core/src/table/mod.rs index 2b011ff608..098bcdfdd4 100644 --- a/crates/deltalake-core/src/table/mod.rs +++ b/crates/deltalake-core/src/table/mod.rs @@ -26,6 +26,7 @@ use crate::kernel::{ Action, Add, CommitInfo, DataType, Format, Metadata, ReaderFeatures, Remove, StructType, WriterFeatures, }; +use crate::logstore::{default_logstore::DefaultLogStore, LogStoreRef}; use crate::partitions::PartitionFilter; use crate::protocol::{ find_latest_check_point_for_version, get_last_checkpoint, ProtocolError, Stats, @@ -249,8 +250,8 @@ pub struct DeltaTable { pub state: DeltaTableState, /// the load options used during load pub config: DeltaTableConfig, - /// object store to access log and data files - pub(crate) storage: ObjectStoreRef, + /// log store + pub(crate) log_store: LogStoreRef, /// file metadata for latest checkpoint last_check_point: Option, /// table versions associated with timestamps @@ -265,7 +266,8 @@ impl Serialize for DeltaTable { let mut seq = serializer.serialize_seq(None)?; seq.serialize_element(&self.state)?; seq.serialize_element(&self.config)?; - seq.serialize_element(self.storage.as_ref())?; + // tp::TODO (de)serialize actual log store instead + seq.serialize_element(self.object_store().as_ref())?; seq.serialize_element(&self.last_check_point)?; seq.serialize_element(&self.version_timestamp)?; seq.end() @@ -296,9 +298,13 @@ impl<'de> Deserialize<'de> for DeltaTable { let config = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?; - let storage = seq - .next_element()? - .ok_or_else(|| A::Error::invalid_length(0, &self))?; + let storage = Arc::new( + seq.next_element()? + .ok_or_else(|| A::Error::invalid_length(0, &self))?, + ); + let log_store = Arc::new(DefaultLogStore { + storage: Arc::clone(&storage), + }); let last_check_point = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?; @@ -309,7 +315,7 @@ impl<'de> Deserialize<'de> for DeltaTable { let table = DeltaTable { state, config, - storage: Arc::new(storage), + log_store, last_check_point, version_timestamp, }; @@ -326,10 +332,10 @@ impl DeltaTable { /// /// NOTE: This is for advanced users. If you don't know why you need to use this method, please /// call one of the `open_table` helper methods instead. - pub fn new(storage: ObjectStoreRef, config: DeltaTableConfig) -> Self { + pub fn new(log_store: LogStoreRef, config: DeltaTableConfig) -> Self { Self { state: DeltaTableState::with_version(-1), - storage, + log_store, config, last_check_point: None, version_timestamp: HashMap::new(), @@ -341,10 +347,10 @@ impl DeltaTable { /// /// NOTE: This is for advanced users. If you don't know why you need to use this method, /// please call one of the `open_table` helper methods instead. - pub(crate) fn new_with_state(storage: ObjectStoreRef, state: DeltaTableState) -> Self { + pub(crate) fn new_with_state(log_store: LogStoreRef, state: DeltaTableState) -> Self { Self { state, - storage, + log_store, config: Default::default(), last_check_point: None, version_timestamp: HashMap::new(), @@ -353,18 +359,24 @@ impl DeltaTable { /// get a shared reference to the delta object store pub fn object_store(&self) -> ObjectStoreRef { - self.storage.clone() + self.log_store.object_store() } /// The URI of the underlying data pub fn table_uri(&self) -> String { - self.storage.root_uri() + self.log_store.object_store().root_uri() + } + + /// get a shared reference to the log store + pub fn log_store(&self) -> LogStoreRef { + self.log_store.clone() } /// Return the list of paths of given checkpoint. pub fn get_checkpoint_data_paths(&self, check_point: &CheckPoint) -> Vec { let checkpoint_prefix = format!("{:020}", check_point.version); - let log_path = self.storage.log_path(); + let object_store = self.object_store(); + let log_path = object_store.log_path(); let mut checkpoint_data_paths = Vec::new(); match check_point.parts { @@ -399,7 +411,8 @@ impl DeltaTable { let mut current_delta_log_ver = i64::MAX; // Get file objects from table. - let mut stream = self.storage.list(Some(self.storage.log_path())).await?; + let storage = self.object_store(); + let mut stream = storage.list(Some(storage.log_path())).await?; while let Some(obj_meta) = stream.next().await { let obj_meta = obj_meta?; @@ -422,54 +435,7 @@ impl DeltaTable { } /// returns the latest available version of the table pub async fn get_latest_version(&mut self) -> Result { - let version_start = match get_last_checkpoint(&self.storage).await { - Ok(last_check_point) => last_check_point.version, - Err(ProtocolError::CheckpointNotFound) => { - // no checkpoint - -1 - } - Err(e) => { - return Err(DeltaTableError::from(e)); - } - }; - - debug!("latest checkpoint version: {version_start}"); - - let version_start = max(self.version(), version_start); - - lazy_static! { - static ref DELTA_LOG_REGEX: Regex = - Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint).*$").unwrap(); - } - - // list files to find max version - let version = async { - let mut max_version: i64 = version_start; - let prefix = Some(self.storage.log_path()); - let offset_path = commit_uri_from_version(max_version); - let mut files = self.storage.list_with_offset(prefix, &offset_path).await?; - - while let Some(obj_meta) = files.next().await { - let obj_meta = obj_meta?; - if let Some(captures) = DELTA_LOG_REGEX.captures(obj_meta.location.as_ref()) { - let log_version = captures.get(1).unwrap().as_str().parse().unwrap(); - // listing may not be ordered - max_version = max(max_version, log_version); - // also cache timestamp for version, for faster time-travel - self.version_timestamp - .insert(log_version, obj_meta.last_modified.timestamp()); - } - } - - if max_version < 0 { - return Err(DeltaTableError::not_a_table(self.table_uri())); - } - - Ok::(max_version) - } - .await?; - - Ok(version) + self.log_store.get_latest_version(self.version()).await } /// Currently loaded version of the table @@ -490,12 +456,12 @@ impl DeltaTable { current_version: i64, ) -> Result { let next_version = current_version + 1; - let commit_uri = commit_uri_from_version(next_version); - let commit_log_bytes = self.storage.get(&commit_uri).await; - let commit_log_bytes = match commit_log_bytes { - Err(ObjectStoreError::NotFound { .. }) => return Ok(PeekCommit::UpToDate), + let commit_log_bytes = match self.log_store.read_commit_entry(next_version).await { + Ok(bytes) => Ok(bytes), + Err(DeltaTableError::ObjectStore { + source: ObjectStoreError::NotFound { .. }, + }) => return Ok(PeekCommit::UpToDate), Err(err) => Err(err), - Ok(result) => result.bytes().await, }?; let actions = Self::get_actions(next_version, commit_log_bytes).await; @@ -529,7 +495,7 @@ impl DeltaTable { /// loading the last checkpoint and incrementally applying each version since. #[cfg(any(feature = "parquet", feature = "parquet2"))] pub async fn update(&mut self) -> Result<(), DeltaTableError> { - match get_last_checkpoint(&self.storage).await { + match get_last_checkpoint(&self.object_store()).await { Ok(last_check_point) => { debug!("update with latest checkpoint {last_check_point:?}"); if Some(last_check_point) == self.last_check_point { @@ -574,13 +540,12 @@ impl DeltaTable { let buf_size = self.config.log_buffer_size; - let store = self.storage.clone(); + let log_store = self.log_store.clone(); let mut log_stream = futures::stream::iter(self.version() + 1..max_version + 1) .map(|version| { - let store = store.clone(); - let loc = commit_uri_from_version(version); + let log_store = log_store.clone(); async move { - let data = store.get(&loc).await?.bytes().await?; + let data = log_store.read_commit_entry(version).await?; let actions = Self::get_actions(version, data).await?; Ok((version, actions)) } @@ -616,7 +581,7 @@ impl DeltaTable { pub async fn load_version(&mut self, version: i64) -> Result<(), DeltaTableError> { // check if version is valid let commit_uri = commit_uri_from_version(version); - match self.storage.head(&commit_uri).await { + match self.object_store().head(&commit_uri).await { Ok(_) => {} Err(ObjectStoreError::NotFound { .. }) => { return Err(DeltaTableError::InvalidVersion(version)); @@ -628,7 +593,7 @@ impl DeltaTable { // 1. find latest checkpoint below version #[cfg(any(feature = "parquet", feature = "parquet2"))] - match find_latest_check_point_for_version(&self.storage, version).await? { + match find_latest_check_point_for_version(&self.object_store(), version).await? { Some(check_point) => { self.restore_checkpoint(check_point).await?; } @@ -652,7 +617,10 @@ impl DeltaTable { match self.version_timestamp.get(&version) { Some(ts) => Ok(*ts), None => { - let meta = self.storage.head(&commit_uri_from_version(version)).await?; + let meta = self + .object_store() + .head(&commit_uri_from_version(version)) + .await?; let ts = meta.last_modified.timestamp(); // also cache timestamp for version self.version_timestamp.insert(version, ts); @@ -746,7 +714,7 @@ impl DeltaTable { let files = self.get_files_by_partitions(filters)?; Ok(files .iter() - .map(|fname| self.storage.to_uri(fname)) + .map(|fname| self.object_store().to_uri(fname)) .collect()) } @@ -771,7 +739,7 @@ impl DeltaTable { pub fn get_file_uris(&self) -> impl Iterator + '_ { self.state .file_paths_iter() - .map(|path| self.storage.to_uri(&path)) + .map(|path| self.object_store().to_uri(&path)) } /// Returns statistics for files, in order diff --git a/crates/deltalake-core/src/table/state.rs b/crates/deltalake-core/src/table/state.rs index 26becd0703..8fa51c55fd 100644 --- a/crates/deltalake-core/src/table/state.rs +++ b/crates/deltalake-core/src/table/state.rs @@ -71,7 +71,7 @@ impl DeltaTableState { /// Construct a delta table state object from commit version. pub async fn from_commit(table: &DeltaTable, version: i64) -> Result { let commit_uri = commit_uri_from_version(version); - let commit_log_bytes = match table.storage.get(&commit_uri).await { + let commit_log_bytes = match table.object_store().get(&commit_uri).await { Ok(get) => Ok(get.bytes().await?), Err(ObjectStoreError::NotFound { .. }) => Err(ProtocolError::EndOfLog), Err(source) => Err(ProtocolError::ObjectStore { source }), @@ -161,7 +161,7 @@ impl DeltaTableState { let mut new_state = Self::with_version(check_point.version); for f in &checkpoint_data_paths { - let obj = table.storage.get(f).await?.bytes().await?; + let obj = table.object_store().get(f).await?.bytes().await?; new_state.process_checkpoint_bytes(obj, &table.config)?; } diff --git a/crates/deltalake-core/src/test_utils.rs b/crates/deltalake-core/src/test_utils.rs index 124ec0365b..76b6c61820 100644 --- a/crates/deltalake-core/src/test_utils.rs +++ b/crates/deltalake-core/src/test_utils.rs @@ -88,6 +88,7 @@ impl IntegrationContext { _ => DeltaTableBuilder::from_uri(store_uri) .with_allow_http(true) .build_storage()? + .object_store() .storage_backend(), }; diff --git a/crates/deltalake-core/src/writer/json.rs b/crates/deltalake-core/src/writer/json.rs index 044ffc20e2..0239dee2f9 100644 --- a/crates/deltalake-core/src/writer/json.rs +++ b/crates/deltalake-core/src/writer/json.rs @@ -195,7 +195,7 @@ impl JsonWriter { .build(); Ok(Self { - storage, + storage: storage.object_store(), arrow_schema_ref: schema, writer_properties, partition_columns: partition_columns.unwrap_or_default(), @@ -218,7 +218,7 @@ impl JsonWriter { .build(); Ok(Self { - storage: table.storage.clone(), + storage: table.object_store(), arrow_schema_ref, writer_properties, partition_columns, diff --git a/crates/deltalake-core/src/writer/mod.rs b/crates/deltalake-core/src/writer/mod.rs index 478a0b11f2..3b73fe2ef6 100644 --- a/crates/deltalake-core/src/writer/mod.rs +++ b/crates/deltalake-core/src/writer/mod.rs @@ -146,7 +146,14 @@ pub trait DeltaWriter { partition_by, predicate: None, }; - let version = commit(table.storage.as_ref(), &adds, operation, &table.state, None).await?; + let version = commit( + table.log_store.as_ref(), + &adds, + operation, + &table.state, + None, + ) + .await?; table.update().await?; Ok(version) } diff --git a/crates/deltalake-core/src/writer/record_batch.rs b/crates/deltalake-core/src/writer/record_batch.rs index b673146907..6e23313860 100644 --- a/crates/deltalake-core/src/writer/record_batch.rs +++ b/crates/deltalake-core/src/writer/record_batch.rs @@ -56,7 +56,8 @@ impl RecordBatchWriter { ) -> Result { let storage = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(storage_options.unwrap_or_default()) - .build_storage()?; + .build_storage()? + .object_store(); // Initialize writer properties for the underlying arrow writer let writer_properties = WriterProperties::builder() @@ -89,7 +90,7 @@ impl RecordBatchWriter { .build(); Ok(Self { - storage: table.storage.clone(), + storage: table.object_store(), arrow_schema_ref, writer_properties, partition_columns, diff --git a/crates/deltalake-core/src/writer/test_utils.rs b/crates/deltalake-core/src/writer/test_utils.rs index d67931c096..1daf9e407b 100644 --- a/crates/deltalake-core/src/writer/test_utils.rs +++ b/crates/deltalake-core/src/writer/test_utils.rs @@ -323,7 +323,7 @@ pub mod datafusion { use std::sync::Arc; pub async fn get_data(table: &DeltaTable) -> Vec { - let table = DeltaTable::new_with_state(table.object_store(), table.state.clone()); + let table = DeltaTable::new_with_state(table.log_store.clone(), table.state.clone()); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(table)).unwrap(); ctx.sql("select * from test") diff --git a/crates/deltalake-core/tests/command_optimize.rs b/crates/deltalake-core/tests/command_optimize.rs index a923d0064d..14f9d4c410 100644 --- a/crates/deltalake-core/tests/command_optimize.rs +++ b/crates/deltalake-core/tests/command_optimize.rs @@ -298,7 +298,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { let operation = DeltaOperation::Delete { predicate: None }; commit( - other_dt.object_store().as_ref(), + other_dt.log_store().as_ref(), &vec![Action::Remove(remove)], operation, &other_dt.state, @@ -306,9 +306,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { ) .await?; - let maybe_metrics = plan - .execute(dt.object_store(), &dt.state, 1, 20, None) - .await; + let maybe_metrics = plan.execute(dt.log_store(), &dt.state, 1, 20, None).await; assert!(maybe_metrics.is_err()); assert_eq!(dt.version(), version + 1); @@ -357,9 +355,7 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box> { ) .await?; - let metrics = plan - .execute(dt.object_store(), &dt.state, 1, 20, None) - .await?; + let metrics = plan.execute(dt.log_store(), &dt.state, 1, 20, None).await?; assert_eq!(metrics.num_files_added, 1); assert_eq!(metrics.num_files_removed, 2); @@ -399,7 +395,7 @@ async fn test_commit_interval() -> Result<(), Box> { let metrics = plan .execute( - dt.object_store(), + dt.log_store(), &dt.state, 1, 20, diff --git a/crates/deltalake-core/tests/command_vacuum.rs b/crates/deltalake-core/tests/command_vacuum.rs index 0007f479d5..51ff3217b3 100644 --- a/crates/deltalake-core/tests/command_vacuum.rs +++ b/crates/deltalake-core/tests/command_vacuum.rs @@ -297,6 +297,6 @@ async fn test_non_managed_files() { async fn is_deleted(context: &mut TestContext, path: &Path) -> bool { let backend = context.get_storage(); - let res = backend.head(path).await; + let res = backend.object_store().head(path).await; matches!(res, Err(ObjectStoreError::NotFound { .. })) } diff --git a/crates/deltalake-core/tests/commit_info_format.rs b/crates/deltalake-core/tests/commit_info_format.rs index de69397e32..a9d05e4c11 100644 --- a/crates/deltalake-core/tests/commit_info_format.rs +++ b/crates/deltalake-core/tests/commit_info_format.rs @@ -22,7 +22,7 @@ async fn test_operational_parameters() -> Result<(), Box> { }; commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, diff --git a/crates/deltalake-core/tests/common/mod.rs b/crates/deltalake-core/tests/common/mod.rs index 80df899323..af2e6e1a7f 100644 --- a/crates/deltalake-core/tests/common/mod.rs +++ b/crates/deltalake-core/tests/common/mod.rs @@ -2,10 +2,10 @@ use bytes::Bytes; use deltalake_core::kernel::{Action, Add, Remove, StructType}; +use deltalake_core::logstore::LogStore; use deltalake_core::operations::create::CreateBuilder; use deltalake_core::operations::transaction::commit; use deltalake_core::protocol::{DeltaOperation, SaveMode}; -use deltalake_core::storage::DeltaObjectStore; use deltalake_core::DeltaTable; use deltalake_core::DeltaTableBuilder; use object_store::{path::Path, ObjectStore}; @@ -28,7 +28,7 @@ pub mod s3; pub struct TestContext { /// The main table under test pub table: Option, - pub backend: Option>, + pub backend: Option>, /// The configuration used to create the backend. pub config: HashMap, /// An object when it is dropped will clean up any temporary resources created for the test @@ -55,7 +55,7 @@ impl TestContext { } } - pub fn get_storage(&mut self) -> Arc { + pub fn get_storage(&mut self) -> Arc { if self.backend.is_none() { self.backend = Some(self.new_storage()) } @@ -63,7 +63,7 @@ impl TestContext { self.backend.as_ref().unwrap().clone() } - fn new_storage(&self) -> Arc { + fn new_storage(&self) -> Arc { let config = self.config.clone(); let uri = config.get("URI").unwrap().to_string(); DeltaTableBuilder::from_uri(uri) @@ -82,9 +82,9 @@ impl TestContext { .iter() .map(|s| s.to_string()) .collect::>(); - let backend = self.new_storage(); + let log_store = self.new_storage(); CreateBuilder::new() - .with_object_store(backend) + .with_log_store(log_store) .with_table_name("delta-rs_test_table") .with_comment("Table created by delta-rs tests") .with_columns(schema.fields().clone()) @@ -149,7 +149,7 @@ pub async fn add_file( }; let actions = vec![Action::Add(add)]; commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, @@ -187,7 +187,7 @@ pub async fn remove_file( let operation = DeltaOperation::Delete { predicate: None }; let actions = vec![Action::Remove(remove)]; commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, diff --git a/crates/deltalake-core/tests/fs_common/mod.rs b/crates/deltalake-core/tests/fs_common/mod.rs index dc9ec2547a..4de3ff1546 100644 --- a/crates/deltalake-core/tests/fs_common/mod.rs +++ b/crates/deltalake-core/tests/fs_common/mod.rs @@ -119,7 +119,7 @@ pub async fn commit_actions( operation: DeltaOperation, ) -> i64 { let version = commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, diff --git a/crates/deltalake-core/tests/integration_checkpoint.rs b/crates/deltalake-core/tests/integration_checkpoint.rs index 9b5b0a73ff..219474765a 100644 --- a/crates/deltalake-core/tests/integration_checkpoint.rs +++ b/crates/deltalake-core/tests/integration_checkpoint.rs @@ -61,7 +61,8 @@ async fn cleanup_metadata_test(context: &IntegrationContext) -> TestResult { let table_uri = context.root_uri(); let object_store = DeltaTableBuilder::from_uri(table_uri) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); let log_path = |version| { object_store diff --git a/crates/deltalake-core/tests/integration_concurrent_writes.rs b/crates/deltalake-core/tests/integration_concurrent_writes.rs index bef44d0693..90dba7659a 100644 --- a/crates/deltalake-core/tests/integration_concurrent_writes.rs +++ b/crates/deltalake-core/tests/integration_concurrent_writes.rs @@ -168,7 +168,7 @@ impl Worker { default_row_commit_version: None, })]; let version = commit( - self.table.object_store().as_ref(), + self.table.log_store().as_ref(), &actions, operation, &self.table.state, diff --git a/crates/deltalake-core/tests/integration_datafusion.rs b/crates/deltalake-core/tests/integration_datafusion.rs index 3476de6839..c9293f187e 100644 --- a/crates/deltalake-core/tests/integration_datafusion.rs +++ b/crates/deltalake-core/tests/integration_datafusion.rs @@ -211,7 +211,7 @@ mod local { // Trying to execute the write from the input plan without providing Datafusion with a session // state containing the referenced object store in the registry results in an error. assert!( - WriteBuilder::new(target_table.object_store(), target_table.state.clone()) + WriteBuilder::new(target_table.log_store(), target_table.state.clone()) .with_input_execution_plan(source_scan.clone()) .await .unwrap_err() @@ -235,11 +235,10 @@ mod local { .register_object_store(source_store_url, Arc::from(source_store)); // Execute write to the target table with the proper state - let target_table = - WriteBuilder::new(target_table.object_store(), target_table.state.clone()) - .with_input_execution_plan(source_scan) - .with_input_session_state(state) - .await?; + let target_table = WriteBuilder::new(target_table.log_store(), target_table.state.clone()) + .with_input_execution_plan(source_scan) + .with_input_session_state(state) + .await?; ctx.register_table("target", Arc::new(target_table))?; // Check results diff --git a/crates/deltalake-core/tests/integration_object_store.rs b/crates/deltalake-core/tests/integration_object_store.rs index 3988cbdb6d..9c5720db85 100644 --- a/crates/deltalake-core/tests/integration_object_store.rs +++ b/crates/deltalake-core/tests/integration_object_store.rs @@ -81,7 +81,8 @@ async fn read_write_test_onelake(integration: StorageIntegration, path: &Path) - let delta_store = DeltaTableBuilder::from_uri(&context.root_uri()) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); //println!("{:#?}",delta_store); @@ -103,7 +104,8 @@ async fn test_object_store(integration: StorageIntegration, skip_copy: bool) -> let context = IntegrationContext::new(integration)?; let delta_store = DeltaTableBuilder::from_uri(context.root_uri()) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); put_get_delete_list(delta_store.as_ref()).await?; list_with_delimiter(delta_store.as_ref()).await?; @@ -482,7 +484,8 @@ async fn test_object_store_prefixes(integration: StorageIntegration) -> TestResu let rooturi = format!("{}/{}", context.root_uri(), prefix); let delta_store = DeltaTableBuilder::from_uri(&rooturi) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); let contents = Bytes::from("cats"); let path = Path::from("test"); diff --git a/crates/deltalake-core/tests/integration_read.rs b/crates/deltalake-core/tests/integration_read.rs index 0e17d34397..a15679a09c 100644 --- a/crates/deltalake-core/tests/integration_read.rs +++ b/crates/deltalake-core/tests/integration_read.rs @@ -155,7 +155,8 @@ async fn verify_store(integration: &IntegrationContext, root_path: &str) -> Test let table_uri = format!("{}/{}", integration.root_uri(), root_path); let storage = DeltaTableBuilder::from_uri(table_uri.clone()) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); let files = storage.list_with_delimiter(None).await?; assert_eq!( diff --git a/crates/deltalake-core/tests/repair_s3_rename_test.rs b/crates/deltalake-core/tests/repair_s3_rename_test.rs index 3157fab896..25eb9704ee 100644 --- a/crates/deltalake-core/tests/repair_s3_rename_test.rs +++ b/crates/deltalake-core/tests/repair_s3_rename_test.rs @@ -117,6 +117,7 @@ fn create_s3_backend( .with_allow_http(true) .build_storage() .unwrap() + .object_store() .storage_backend(); let delayed_store = DelayedObjectStore { diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 1f558b82fe..a8bfb6668a 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -51,7 +51,8 @@ impl DeltaFileSystemHandler { let storage = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(options.clone().unwrap_or_default()) .build_storage() - .map_err(PythonError::from)?; + .map_err(PythonError::from)? + .object_store(); Ok(Self { inner: storage, rt: Arc::new(rt()?), diff --git a/python/src/lib.rs b/python/src/lib.rs index 923a06d159..a58d1f9aa6 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -274,7 +274,7 @@ impl RawDeltaTable { retention_hours: Option, enforce_retention_duration: bool, ) -> PyResult> { - let mut cmd = VacuumBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = VacuumBuilder::new(self._table.log_store(), self._table.state.clone()) .with_enforce_retention_duration(enforce_retention_duration) .with_dry_run(dry_run); if let Some(retention_period) = retention_hours { @@ -296,7 +296,7 @@ impl RawDeltaTable { writer_properties: Option>, safe_cast: bool, ) -> PyResult { - let mut cmd = UpdateBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = UpdateBuilder::new(self._table.log_store(), self._table.state.clone()) .with_safe_cast(safe_cast); if let Some(writer_props) = writer_properties { @@ -349,7 +349,7 @@ impl RawDeltaTable { max_concurrent_tasks: Option, min_commit_interval: Option, ) -> PyResult { - let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = OptimizeBuilder::new(self._table.log_store(), self._table.state.clone()) .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)); if let Some(size) = target_size { cmd = cmd.with_target_size(size); @@ -379,7 +379,7 @@ impl RawDeltaTable { max_spill_size: usize, min_commit_interval: Option, ) -> PyResult { - let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = OptimizeBuilder::new(self._table.log_store(), self._table.state.clone()) .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)) .with_max_spill_size(max_spill_size) .with_type(OptimizeType::ZOrder(z_order_columns)); @@ -446,7 +446,7 @@ impl RawDeltaTable { let source_df = ctx.read_table(table_provider).unwrap(); let mut cmd = MergeBuilder::new( - self._table.object_store(), + self._table.log_store(), self._table.state.clone(), predicate, source_df, @@ -608,7 +608,7 @@ impl RawDeltaTable { ignore_missing_files: bool, protocol_downgrade_allowed: bool, ) -> PyResult { - let mut cmd = RestoreBuilder::new(self._table.object_store(), self._table.state.clone()); + let mut cmd = RestoreBuilder::new(self._table.log_store(), self._table.state.clone()); if let Some(val) = target { if let Ok(version) = val.extract::() { cmd = cmd.with_version_to_restore(version) @@ -822,7 +822,7 @@ impl RawDeltaTable { partition_by: Some(partition_by), predicate: None, }; - let store = self._table.object_store(); + let store = self._table.log_store(); rt()? .block_on(commit( @@ -866,7 +866,7 @@ impl RawDeltaTable { /// Run the delete command on the delta table: delete records following a predicate and return the delete metrics. #[pyo3(signature = (predicate = None))] pub fn delete(&mut self, predicate: Option) -> PyResult { - let mut cmd = DeleteBuilder::new(self._table.object_store(), self._table.state.clone()); + let mut cmd = DeleteBuilder::new(self._table.log_store(), self._table.state.clone()); if let Some(predicate) = predicate { cmd = cmd.with_predicate(predicate); } @@ -881,9 +881,8 @@ impl RawDeltaTable { /// have been deleted or are malformed #[pyo3(signature = (dry_run = true))] pub fn repair(&mut self, dry_run: bool) -> PyResult { - let cmd = - FileSystemCheckBuilder::new(self._table.object_store(), self._table.state.clone()) - .with_dry_run(dry_run); + let cmd = FileSystemCheckBuilder::new(self._table.log_store(), self._table.state.clone()) + .with_dry_run(dry_run); let (table, metrics) = rt()? .block_on(cmd.into_future())