From 7f34f667f389962e2e8af001752e51620484b275 Mon Sep 17 00:00:00 2001 From: Thomas Peiselt Date: Wed, 18 Oct 2023 21:20:32 +0200 Subject: [PATCH] refactor!: Implement default `LogStore` trait Introduce a new trait, `LogStore` and a single implementation that keeps the functionality unchanged. `LogStore` serves as an entry point for interacting with the delta commit log in a centralized way, allowing to read commits and atomically create new commits. This is in preparation for enabling S3 DynamoDb multi-cluster writes in compatibility with the reference JVM implementation. --- python/src/filesystem.rs | 3 +- python/src/lib.rs | 16 +-- rust/src/delta_datafusion/mod.rs | 3 +- rust/src/logstore/default_logstore.rs | 115 ++++++++++++++++ rust/src/logstore/mod.rs | 27 +++- rust/src/operations/create.rs | 28 ++-- rust/src/operations/delete.rs | 29 ++-- rust/src/operations/filesystem_check.rs | 24 ++-- rust/src/operations/load.rs | 10 +- rust/src/operations/merge.rs | 27 ++-- rust/src/operations/mod.rs | 20 +-- rust/src/operations/optimize.rs | 29 ++-- rust/src/operations/restore.rs | 27 ++-- rust/src/operations/transaction/mod.rs | 88 +++++++------ rust/src/operations/transaction/test_utils.rs | 22 +++- rust/src/operations/update.rs | 30 +++-- rust/src/operations/vacuum.rs | 27 ++-- rust/src/operations/write.rs | 24 ++-- rust/src/operations/writer.rs | 8 +- rust/src/protocol/checkpoints.rs | 11 +- rust/src/storage/utils.rs | 2 +- rust/src/table/builder.rs | 23 ++-- rust/src/table/mod.rs | 124 +++++++----------- rust/src/table/state.rs | 4 +- rust/src/test_utils.rs | 1 + rust/src/writer/json.rs | 4 +- rust/src/writer/mod.rs | 9 +- rust/src/writer/record_batch.rs | 5 +- rust/src/writer/test_utils.rs | 2 +- rust/tests/command_optimize.rs | 12 +- rust/tests/command_vacuum.rs | 2 +- rust/tests/commit_info_format.rs | 2 +- rust/tests/common/mod.rs | 16 +-- rust/tests/fs_common/mod.rs | 2 +- rust/tests/integration_checkpoint.rs | 3 +- rust/tests/integration_concurrent_writes.rs | 2 +- rust/tests/integration_datafusion.rs | 11 +- rust/tests/integration_object_store.rs | 9 +- rust/tests/integration_read.rs | 3 +- rust/tests/repair_s3_rename_test.rs | 1 + 40 files changed, 482 insertions(+), 323 deletions(-) create mode 100644 rust/src/logstore/default_logstore.rs diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 1f558b82fe..a8bfb6668a 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -51,7 +51,8 @@ impl DeltaFileSystemHandler { let storage = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(options.clone().unwrap_or_default()) .build_storage() - .map_err(PythonError::from)?; + .map_err(PythonError::from)? + .object_store(); Ok(Self { inner: storage, rt: Arc::new(rt()?), diff --git a/python/src/lib.rs b/python/src/lib.rs index 2f46436984..49ce59aae4 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -273,7 +273,7 @@ impl RawDeltaTable { retention_hours: Option, enforce_retention_duration: bool, ) -> PyResult> { - let mut cmd = VacuumBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = VacuumBuilder::new(self._table.log_store(), self._table.state.clone()) .with_enforce_retention_duration(enforce_retention_duration) .with_dry_run(dry_run); if let Some(retention_period) = retention_hours { @@ -295,7 +295,7 @@ impl RawDeltaTable { writer_properties: Option>, safe_cast: bool, ) -> PyResult { - let mut cmd = UpdateBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = UpdateBuilder::new(self._table.log_store(), self._table.state.clone()) .with_safe_cast(safe_cast); if let Some(writer_props) = writer_properties { @@ -348,7 +348,7 @@ impl RawDeltaTable { max_concurrent_tasks: Option, min_commit_interval: Option, ) -> PyResult { - let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = OptimizeBuilder::new(self._table.log_store(), self._table.state.clone()) .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)); if let Some(size) = target_size { cmd = cmd.with_target_size(size); @@ -378,7 +378,7 @@ impl RawDeltaTable { max_spill_size: usize, min_commit_interval: Option, ) -> PyResult { - let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = OptimizeBuilder::new(self._table.log_store(), self._table.state.clone()) .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)) .with_max_spill_size(max_spill_size) .with_type(OptimizeType::ZOrder(z_order_columns)); @@ -445,7 +445,7 @@ impl RawDeltaTable { let source_df = ctx.read_table(table_provider).unwrap(); let mut cmd = MergeBuilder::new( - self._table.object_store(), + self._table.log_store(), self._table.state.clone(), predicate, source_df, @@ -587,7 +587,7 @@ impl RawDeltaTable { ignore_missing_files: bool, protocol_downgrade_allowed: bool, ) -> PyResult { - let mut cmd = RestoreBuilder::new(self._table.object_store(), self._table.state.clone()); + let mut cmd = RestoreBuilder::new(self._table.log_store(), self._table.state.clone()); if let Some(val) = target { if let Ok(version) = val.extract::() { cmd = cmd.with_version_to_restore(version) @@ -799,7 +799,7 @@ impl RawDeltaTable { partition_by: Some(partition_by), predicate: None, }; - let store = self._table.object_store(); + let store = self._table.log_store(); rt()? .block_on(commit( @@ -843,7 +843,7 @@ impl RawDeltaTable { /// Run the delete command on the delta table: delete records following a predicate and return the delete metrics. #[pyo3(signature = (predicate = None))] pub fn delete(&mut self, predicate: Option) -> PyResult { - let mut cmd = DeleteBuilder::new(self._table.object_store(), self._table.state.clone()); + let mut cmd = DeleteBuilder::new(self._table.log_store(), self._table.state.clone()); if let Some(predicate) = predicate { cmd = cmd.with_predicate(predicate); } diff --git a/rust/src/delta_datafusion/mod.rs b/rust/src/delta_datafusion/mod.rs index 0d2df09e46..9a2888f1de 100644 --- a/rust/src/delta_datafusion/mod.rs +++ b/rust/src/delta_datafusion/mod.rs @@ -1912,7 +1912,8 @@ mod tests { .build(&table.state) .unwrap(); - let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap(); + let storage = table.object_store(); + let provider = DeltaTableProvider::try_new(table.state, storage, config).unwrap(); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(provider)).unwrap(); diff --git a/rust/src/logstore/default_logstore.rs b/rust/src/logstore/default_logstore.rs new file mode 100644 index 0000000000..d27b3650f8 --- /dev/null +++ b/rust/src/logstore/default_logstore.rs @@ -0,0 +1,115 @@ +//! Default implementation of [`LogStore`] for storage backends with atomic put-if-absent operation +use std::cmp::max; + +use bytes::Bytes; +use futures::StreamExt; +use lazy_static::lazy_static; +use log::debug; +use object_store::{path::Path, Error as ObjectStoreError, ObjectStore}; +use regex::Regex; + +use super::LogStore; +use crate::{ + operations::transaction::TransactionError, + protocol::{get_last_checkpoint, ProtocolError}, + storage::{commit_uri_from_version, ObjectStoreRef}, + DeltaResult, DeltaTableError, +}; + +/// Default [`LogStore`] implementation +#[derive(Debug, Clone)] +pub struct DefaultLogStore { + pub(crate) storage: ObjectStoreRef, +} + +#[async_trait::async_trait] +impl LogStore for DefaultLogStore { + async fn read_commit_entry(&self, version: i64) -> DeltaResult { + let commit_uri = commit_uri_from_version(version); + let data = self.storage.get(&commit_uri).await?.bytes().await?; + Ok(data) + + // TODO: return actual actions instead + // let actions = Self::get_actions(next_version, commit_log_bytes).await; + } + + /// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists] + /// if the given `version` already exists. The caller should handle the retry logic itself. + /// This is low-level transaction API. If user does not want to maintain the commit loop then + /// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction` + /// with retry logic. + async fn write_commit_entry( + &self, + version: i64, + tmp_commit: &Path, + ) -> Result<(), TransactionError> { + // move temporary commit file to delta log directory + // rely on storage to fail if the file already exists - + self.storage + .rename_if_not_exists(tmp_commit, &commit_uri_from_version(version)) + .await + .map_err(|err| match err { + ObjectStoreError::AlreadyExists { .. } => { + TransactionError::VersionAlreadyExists(version) + } + _ => TransactionError::from(err), + })?; + Ok(()) + } + + async fn get_latest_version(&self, current_version: i64) -> DeltaResult { + let version_start = match get_last_checkpoint(&self.storage).await { + Ok(last_check_point) => last_check_point.version, + Err(ProtocolError::CheckpointNotFound) => { + // no checkpoint + -1 + } + Err(e) => { + return Err(DeltaTableError::from(e)); + } + }; + + debug!("latest checkpoint version: {version_start}"); + + let version_start = max(current_version, version_start); + + lazy_static! { + static ref DELTA_LOG_REGEX: Regex = + Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint).*$").unwrap(); + } + + // list files to find max version + let version = async { + let mut max_version: i64 = version_start; + let prefix = Some(self.storage.log_path()); + let offset_path = commit_uri_from_version(max_version); + let mut files = self.storage.list_with_offset(prefix, &offset_path).await?; + // let mut files = self.storage.list_with_offset(prefix, &offset_path).await?; + + while let Some(obj_meta) = files.next().await { + let obj_meta = obj_meta?; + if let Some(captures) = DELTA_LOG_REGEX.captures(obj_meta.location.as_ref()) { + let log_version = captures.get(1).unwrap().as_str().parse().unwrap(); + // listing may not be ordered + max_version = max(max_version, log_version); + // also cache timestamp for version, for faster time-travel + // TODO: temporarily disabled because `version_timestamp` is not available in the [`LogStore`] + // self.version_timestamp + // .insert(log_version, obj_meta.last_modified.timestamp()); + } + } + + if max_version < 0 { + return Err(DeltaTableError::not_a_table(self.storage.root_uri())); + } + + Ok::(max_version) + } + .await?; + Ok(version) + } + + fn object_store(&self) -> ObjectStoreRef { + self.storage.clone() + } +} diff --git a/rust/src/logstore/mod.rs b/rust/src/logstore/mod.rs index 28c6ef3dde..51d89ecff4 100644 --- a/rust/src/logstore/mod.rs +++ b/rust/src/logstore/mod.rs @@ -1,12 +1,17 @@ //! Delta log store. -// use std::io::{Cursor, BufReader, BufRead}; +use std::sync::Arc; -use crate::errors::DeltaResult; +use crate::{ + errors::DeltaResult, operations::transaction::TransactionError, storage::ObjectStoreRef, +}; use bytes::Bytes; -// use log::debug; +use object_store::path::Path; pub mod default_logstore; +/// Sharable reference to [`LogStore`] +pub type LogStoreRef = Arc; + /// Trait for critical operations required to read and write commit entries in Delta logs. /// /// The correctness is predicated on the atomicity and durability guarantees of @@ -27,8 +32,22 @@ pub trait LogStore: Sync + Send { /// /// This operation can be retried with a higher version in case the write /// fails with `TransactionError::VersionAlreadyExists`. - async fn write_commit_entry(&self, version: i64, actions: Bytes) -> DeltaResult<()>; + async fn write_commit_entry( + &self, + version: i64, + tmp_commit: &Path, + ) -> Result<(), TransactionError>; /// Find latest version currently stored in the delta log. async fn get_latest_version(&self, start_version: i64) -> DeltaResult; + + /// Get underlying object store. + fn object_store(&self) -> ObjectStoreRef; +} + +// TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders +impl std::fmt::Debug for dyn LogStore + '_ { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.object_store().fmt(f) + } } diff --git a/rust/src/operations/create.rs b/rust/src/operations/create.rs index 7429007a6f..816c28a5ed 100644 --- a/rust/src/operations/create.rs +++ b/rust/src/operations/create.rs @@ -10,9 +10,9 @@ use serde_json::{Map, Value}; use super::transaction::commit; use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION}; use crate::errors::{DeltaResult, DeltaTableError}; +use crate::logstore::{LogStore, LogStoreRef}; use crate::protocol::{Action, DeltaOperation, MetaData, Protocol, SaveMode}; use crate::schema::{SchemaDataType, SchemaField, SchemaTypeStruct}; -use crate::storage::DeltaObjectStore; use crate::table::builder::ensure_table_uri; use crate::table::config::DeltaConfigKey; use crate::table::DeltaTableMetaData; @@ -55,7 +55,7 @@ pub struct CreateBuilder { partition_columns: Option>, storage_options: Option>, actions: Vec, - object_store: Option>, + log_store: Option, configuration: HashMap>, metadata: Option>, } @@ -78,7 +78,7 @@ impl CreateBuilder { partition_columns: None, storage_options: None, actions: Default::default(), - object_store: None, + log_store: None, configuration: Default::default(), metadata: Default::default(), } @@ -199,9 +199,9 @@ impl CreateBuilder { self } - /// Provide a [`DeltaObjectStore`] instance, that points at table location - pub fn with_object_store(mut self, object_store: Arc) -> Self { - self.object_store = Some(object_store); + /// Provide a [`LogStore`] instance, that points at table location + pub fn with_log_store(mut self, log_store: Arc) -> Self { + self.log_store = Some(log_store); self } @@ -220,12 +220,12 @@ impl CreateBuilder { return Err(CreateError::MissingSchema.into()); } - let (storage_url, table) = if let Some(object_store) = self.object_store { + let (storage_url, table) = if let Some(log_store) = self.log_store { ( - ensure_table_uri(object_store.root_uri())? + ensure_table_uri(log_store.object_store().root_uri())? .as_str() .to_string(), - DeltaTable::new(object_store, Default::default()), + DeltaTable::new(log_store, Default::default()), ) } else { let storage_url = ensure_table_uri(self.location.ok_or(CreateError::MissingLocation)?)?; @@ -310,7 +310,7 @@ impl std::future::IntoFuture for CreateBuilder { }; let version = commit( - table.object_store().as_ref(), + table.log_store.as_ref(), &actions, operation, table_state, @@ -440,11 +440,11 @@ mod tests { assert_eq!(table.version(), 0); let first_id = table.get_metadata().unwrap().id.clone(); - let object_store = table.object_store(); + let log_store = table.log_store; // Check an error is raised when a table exists at location let table = CreateBuilder::new() - .with_object_store(object_store.clone()) + .with_log_store(log_store.clone()) .with_columns(schema.get_fields().clone()) .with_save_mode(SaveMode::ErrorIfExists) .await; @@ -452,7 +452,7 @@ mod tests { // Check current table is returned when ignore option is chosen. let table = CreateBuilder::new() - .with_object_store(object_store.clone()) + .with_log_store(log_store.clone()) .with_columns(schema.get_fields().clone()) .with_save_mode(SaveMode::Ignore) .await @@ -461,7 +461,7 @@ mod tests { // Check table is overwritten let table = CreateBuilder::new() - .with_object_store(object_store.clone()) + .with_log_store(log_store.clone()) .with_columns(schema.get_fields().clone()) .with_save_mode(SaveMode::Overwrite) .await diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index 913658f279..37a2d09fb6 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use std::time::{Instant, SystemTime, UNIX_EPOCH}; use crate::delta_datafusion::expr::fmt_expr_to_sql; +use crate::logstore::LogStoreRef; use crate::protocol::{Action, Add, Remove}; use datafusion::execution::context::{SessionContext, SessionState}; use datafusion::physical_expr::create_physical_expr; @@ -40,7 +41,7 @@ use crate::errors::{DeltaResult, DeltaTableError}; use crate::operations::transaction::commit; use crate::operations::write::write_execution_plan; use crate::protocol::DeltaOperation; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; +use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -54,7 +55,7 @@ pub struct DeleteBuilder { /// A snapshot of the table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan state: Option, /// Properties passed to underlying parquet writer for when files are rewritten @@ -84,11 +85,11 @@ pub struct DeleteMetrics { impl DeleteBuilder { /// Create a new [`DeleteBuilder`] - pub fn new(object_store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { predicate: None, snapshot, - store: object_store, + log_store, state: None, app_metadata: None, writer_properties: None, @@ -187,7 +188,7 @@ async fn excute_non_empty_expr( async fn execute( predicate: Option, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, state: SessionState, writer_properties: Option, @@ -197,7 +198,13 @@ async fn execute( let mut metrics = DeleteMetrics::default(); let scan_start = Instant::now(); - let candidates = find_files(snapshot, object_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files( + snapshot, + log_store.object_store().clone(), + &state, + predicate.clone(), + ) + .await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_micros(); let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); @@ -208,7 +215,7 @@ async fn execute( let write_start = Instant::now(); let add = excute_non_empty_expr( snapshot, - object_store.clone(), + log_store.object_store().clone(), &state, &predicate, &mut metrics, @@ -252,7 +259,7 @@ async fn execute( predicate: Some(fmt_expr_to_sql(&predicate)?), }; version = commit( - object_store.as_ref(), + log_store.as_ref(), &actions, operation, snapshot, @@ -276,7 +283,7 @@ impl std::future::IntoFuture for DeleteBuilder { let session = SessionContext::new(); // If a user provides their own their DF state then they must register the store themselves - register_store(this.store.clone(), session.runtime_env()); + register_store(this.log_store.object_store().clone(), session.runtime_env()); session.state() }); @@ -293,7 +300,7 @@ impl std::future::IntoFuture for DeleteBuilder { let ((actions, version), metrics) = execute( predicate, - this.store.clone(), + this.log_store.clone(), &this.snapshot, state, this.writer_properties, @@ -303,7 +310,7 @@ impl std::future::IntoFuture for DeleteBuilder { this.snapshot .merge(DeltaTableState::from_actions(actions, version)?, true, true); - let table = DeltaTable::new_with_state(this.store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); Ok((table, metrics)) }) diff --git a/rust/src/operations/filesystem_check.rs b/rust/src/operations/filesystem_check.rs index bf047c45c4..877cfde6fb 100644 --- a/rust/src/operations/filesystem_check.rs +++ b/rust/src/operations/filesystem_check.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; use std::fmt::Debug; -use std::sync::Arc; use std::time::SystemTime; use std::time::UNIX_EPOCH; @@ -25,9 +24,9 @@ use object_store::ObjectStore; use url::{ParseError, Url}; use crate::errors::{DeltaResult, DeltaTableError}; +use crate::logstore::LogStoreRef; use crate::operations::transaction::commit; use crate::protocol::{Action, Add, DeltaOperation, Remove}; -use crate::storage::DeltaObjectStore; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -38,7 +37,7 @@ pub struct FileSystemCheckBuilder { /// A snapshot of the to-be-checked table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Don't remove actions to the table log. Just determine which files can be removed dry_run: bool, } @@ -54,7 +53,7 @@ pub struct FileSystemCheckMetrics { struct FileSystemCheckPlan { /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Files that no longer exists in undlying ObjectStore but have active add actions pub files_to_remove: Vec, } @@ -72,10 +71,10 @@ fn is_absolute_path(path: &str) -> DeltaResult { impl FileSystemCheckBuilder { /// Create a new [`FileSystemCheckBuilder`] - pub fn new(store: Arc, state: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, state: DeltaTableState) -> Self { FileSystemCheckBuilder { snapshot: state, - store, + log_store, dry_run: false, } } @@ -89,7 +88,7 @@ impl FileSystemCheckBuilder { async fn create_fsck_plan(&self) -> DeltaResult { let mut files_relative: HashMap<&str, &Add> = HashMap::with_capacity(self.snapshot.files().len()); - let store = self.store.clone(); + let log_store = self.log_store.clone(); for active in self.snapshot.files() { if is_absolute_path(&active.path)? { @@ -101,7 +100,8 @@ impl FileSystemCheckBuilder { } } - let mut files = self.store.list(None).await?; + let object_store = log_store.object_store(); + let mut files = object_store.list(None).await?; while let Some(result) = files.next().await { let file = result?; files_relative.remove(file.location.as_ref()); @@ -118,7 +118,7 @@ impl FileSystemCheckBuilder { Ok(FileSystemCheckPlan { files_to_remove, - store, + log_store, }) } } @@ -152,7 +152,7 @@ impl FileSystemCheckPlan { } commit( - self.store.as_ref(), + self.log_store.as_ref(), &actions, DeltaOperation::FileSystemCheck {}, snapshot, @@ -179,7 +179,7 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { let plan = this.create_fsck_plan().await?; if this.dry_run { return Ok(( - DeltaTable::new_with_state(this.store, this.snapshot), + DeltaTable::new_with_state(this.log_store, this.snapshot), FileSystemCheckMetrics { files_removed: plan.files_to_remove.into_iter().map(|f| f.path).collect(), dry_run: true, @@ -188,7 +188,7 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { } let metrics = plan.execute(&this.snapshot).await?; - let mut table = DeltaTable::new_with_state(this.store, this.snapshot); + let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot); table.update().await?; Ok((table, metrics)) }) diff --git a/rust/src/operations/load.rs b/rust/src/operations/load.rs index 7baa59e3e1..1a4c5c4cc6 100644 --- a/rust/src/operations/load.rs +++ b/rust/src/operations/load.rs @@ -7,7 +7,7 @@ use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use futures::future::BoxFuture; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::storage::DeltaObjectStore; +use crate::logstore::LogStoreRef; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -16,17 +16,17 @@ pub struct LoadBuilder { /// A snapshot of the to-be-loaded table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// A sub-selection of columns to be loaded columns: Option>, } impl LoadBuilder { /// Create a new [`LoadBuilder`] - pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, columns: None, } } @@ -46,7 +46,7 @@ impl std::future::IntoFuture for LoadBuilder { let this = self; Box::pin(async move { - let table = DeltaTable::new_with_state(this.store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); let schema = table.state.arrow_schema()?; let projection = this .columns diff --git a/rust/src/operations/merge.rs b/rust/src/operations/merge.rs index a51e7649fc..84700371bd 100644 --- a/rust/src/operations/merge.rs +++ b/rust/src/operations/merge.rs @@ -67,12 +67,9 @@ use super::datafusion_utils::{into_expr, maybe_into_expr, Expression}; use super::transaction::commit; use crate::delta_datafusion::expr::{fmt_expr_to_sql, parse_predicate_expression}; use crate::delta_datafusion::{register_store, DeltaScanBuilder}; +use crate::logstore::LogStoreRef; use crate::operations::datafusion_utils::MetricObserverExec; -use crate::{ - operations::write::write_execution_plan, - storage::{DeltaObjectStore, ObjectStoreRef}, - DeltaResult, DeltaTable, DeltaTableError, -}; +use crate::{operations::write::write_execution_plan, DeltaResult, DeltaTable, DeltaTableError}; use crate::protocol::{Action, DeltaOperation, MergePredicate, Remove}; use crate::table::state::DeltaTableState; @@ -109,7 +106,7 @@ pub struct MergeBuilder { /// The source data source: DataFrame, /// Delta object store for handling data files - object_store: Arc, + log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan state: Option, /// Properties passed to underlying parquet writer for when files are rewritten @@ -124,7 +121,7 @@ pub struct MergeBuilder { impl MergeBuilder { /// Create a new [`MergeBuilder`] pub fn new>( - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: DeltaTableState, predicate: E, source: DataFrame, @@ -134,7 +131,7 @@ impl MergeBuilder { predicate, source, snapshot, - object_store, + log_store, source_alias: None, target_alias: None, state: None, @@ -563,7 +560,7 @@ pub struct MergeMetrics { async fn execute( predicate: Expression, source: DataFrame, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, state: SessionState, writer_properties: Option, @@ -592,7 +589,7 @@ async fn execute( // predicates also need to be considered when pruning let target = Arc::new( - DeltaScanBuilder::new(snapshot, object_store.clone(), &state) + DeltaScanBuilder::new(snapshot, log_store.object_store(), &state) .with_schema(snapshot.input_schema()?) .build() .await?, @@ -1128,7 +1125,7 @@ async fn execute( state.clone(), projection.clone(), table_partition_cols.clone(), - object_store.clone(), + log_store.object_store().clone(), Some(snapshot.table_config().target_file_size() as usize), None, writer_properties, @@ -1188,7 +1185,7 @@ async fn execute( not_matched_by_source_predicates: not_match_source_operations, }; version = commit( - object_store.as_ref(), + log_store.as_ref(), &actions, operation, snapshot, @@ -1212,7 +1209,7 @@ impl std::future::IntoFuture for MergeBuilder { let session = SessionContext::new(); // If a user provides their own their DF state then they must register the store themselves - register_store(this.object_store.clone(), session.runtime_env()); + register_store(this.log_store.object_store().clone(), session.runtime_env()); session.state() }); @@ -1220,7 +1217,7 @@ impl std::future::IntoFuture for MergeBuilder { let ((actions, version), metrics) = execute( this.predicate, this.source, - this.object_store.clone(), + this.log_store.clone(), &this.snapshot, state, this.writer_properties, @@ -1236,7 +1233,7 @@ impl std::future::IntoFuture for MergeBuilder { this.snapshot .merge(DeltaTableState::from_actions(actions, version)?, true, true); - let table = DeltaTable::new_with_state(this.object_store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); Ok((table, metrics)) }) diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index c15bb8052e..30ecf42a5a 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -107,60 +107,60 @@ impl DeltaOps { /// ``` #[must_use] pub fn create(self) -> CreateBuilder { - CreateBuilder::default().with_object_store(self.0.object_store()) + CreateBuilder::default().with_log_store(self.0.log_store.clone()) } /// Load data from a DeltaTable #[cfg(feature = "datafusion")] #[must_use] pub fn load(self) -> LoadBuilder { - LoadBuilder::new(self.0.object_store(), self.0.state) + LoadBuilder::new(self.0.log_store.clone(), self.0.state) } /// Write data to Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn write(self, batches: impl IntoIterator) -> WriteBuilder { - WriteBuilder::new(self.0.object_store(), self.0.state).with_input_batches(batches) + WriteBuilder::new(self.0.log_store.clone(), self.0.state).with_input_batches(batches) } /// Vacuum stale files from delta table #[must_use] pub fn vacuum(self) -> VacuumBuilder { - VacuumBuilder::new(self.0.object_store(), self.0.state) + VacuumBuilder::new(self.0.log_store.clone(), self.0.state) } /// Audit active files with files present on the filesystem #[must_use] pub fn filesystem_check(self) -> FileSystemCheckBuilder { - FileSystemCheckBuilder::new(self.0.object_store(), self.0.state) + FileSystemCheckBuilder::new(self.0.log_store.clone(), self.0.state) } /// Audit active files with files present on the filesystem #[cfg(all(feature = "arrow", feature = "parquet"))] #[must_use] pub fn optimize<'a>(self) -> OptimizeBuilder<'a> { - OptimizeBuilder::new(self.0.object_store(), self.0.state) + OptimizeBuilder::new(self.0.log_store.clone(), self.0.state) } /// Delete data from Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn delete(self) -> DeleteBuilder { - DeleteBuilder::new(self.0.object_store(), self.0.state) + DeleteBuilder::new(self.0.log_store.clone(), self.0.state) } /// Update data from Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn update(self) -> UpdateBuilder { - UpdateBuilder::new(self.0.object_store(), self.0.state) + UpdateBuilder::new(self.0.log_store.clone(), self.0.state) } /// Restore delta table to a specified version or datetime #[must_use] pub fn restore(self) -> RestoreBuilder { - RestoreBuilder::new(self.0.object_store(), self.0.state) + RestoreBuilder::new(self.0.log_store.clone(), self.0.state) } /// Update data from Delta table @@ -172,7 +172,7 @@ impl DeltaOps { predicate: E, ) -> MergeBuilder { MergeBuilder::new( - self.0.object_store(), + self.0.log_store.clone(), self.0.state, predicate.into(), source, diff --git a/rust/src/operations/optimize.rs b/rust/src/operations/optimize.rs index 65b3731f57..9597504d6d 100644 --- a/rust/src/operations/optimize.rs +++ b/rust/src/operations/optimize.rs @@ -42,6 +42,7 @@ use serde_json::Map; use super::transaction::commit; use super::writer::{PartitionWriter, PartitionWriterConfig}; use crate::errors::{DeltaResult, DeltaTableError}; +use crate::logstore::LogStoreRef; use crate::protocol::{self, Action, DeltaOperation}; use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; @@ -155,7 +156,7 @@ pub struct OptimizeBuilder<'a> { /// A snapshot of the to-be-optimized table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: ObjectStoreRef, + log_store: LogStoreRef, /// Filters to select specific table partitions to be optimized filters: &'a [PartitionFilter], /// Desired file size after bin-packing files @@ -177,10 +178,10 @@ pub struct OptimizeBuilder<'a> { impl<'a> OptimizeBuilder<'a> { /// Create a new [`OptimizeBuilder`] - pub fn new(store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, filters: &[], target_size: None, writer_properties: None, @@ -274,14 +275,14 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { )?; let metrics = plan .execute( - this.store.clone(), + this.log_store.clone(), &this.snapshot, this.max_concurrent_tasks, this.max_spill_size, this.min_commit_interval, ) .await?; - let mut table = DeltaTable::new_with_state(this.store, this.snapshot); + let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot); table.update().await?; Ok((table, metrics)) }) @@ -582,7 +583,7 @@ impl MergePlan { /// Perform the operations outlined in the plan. pub async fn execute( mut self, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, max_concurrent_tasks: usize, #[allow(unused_variables)] // used behind a feature flag @@ -605,12 +606,13 @@ impl MergePlan { for file in files.iter() { debug!(" file {}", file.location); } - let object_store_ref = object_store.clone(); + let object_store_ref = log_store.clone(); let batch_stream = futures::stream::iter(files.clone()) .then(move |file| { let object_store_ref = object_store_ref.clone(); async move { - let file_reader = ParquetObjectReader::new(object_store_ref, file); + let file_reader = + ParquetObjectReader::new(object_store_ref.object_store(), file); ParquetRecordBatchStreamBuilder::new(file_reader) .await? .build() @@ -623,7 +625,7 @@ impl MergePlan { self.task_parameters.clone(), partition, files, - object_store.clone(), + log_store.object_store().clone(), futures::future::ready(Ok(batch_stream)), )); util::flatten_join_error(rewrite_result) @@ -633,13 +635,15 @@ impl MergePlan { #[cfg(not(feature = "datafusion"))] let exec_context = Arc::new(zorder::ZOrderExecContext::new( zorder_columns, - object_store.clone(), + log_store.object_store().clone(), // If there aren't enough bins to use all threads, then instead // use threads within the bins. This is important for the case where // the table is un-partitioned, in which case the entire table is just // one big bin. bins.len() <= num_cpus::get(), )); + let object_store = log_store.object_store().clone(); + #[cfg(feature = "datafusion")] let exec_context = Arc::new(zorder::ZOrderExecContext::new( zorder_columns, @@ -647,7 +651,6 @@ impl MergePlan { max_spill_size, )?); let task_parameters = self.task_parameters.clone(); - let object_store = object_store.clone(); futures::stream::iter(bins) .map(move |(partition, files)| { let batch_stream = Self::read_zorder(files.clone(), exec_context.clone()); @@ -669,7 +672,7 @@ impl MergePlan { let mut stream = stream.buffer_unordered(max_concurrent_tasks); - let mut table = DeltaTable::new_with_state(object_store.clone(), snapshot.clone()); + let mut table = DeltaTable::new_with_state(log_store.clone(), snapshot.clone()); // Actions buffered so far. These will be flushed either at the end // or when we reach the commit interval. @@ -718,7 +721,7 @@ impl MergePlan { //// TODO: Check for remove actions on optimized partitions. If a //// optimized partition was updated then abort the commit. Requires (#593). commit( - table.object_store().as_ref(), + table.log_store.as_ref(), &actions, self.task_parameters.input_parameters.clone().into(), table.get_state(), diff --git a/rust/src/operations/restore.rs b/rust/src/operations/restore.rs index 59aceaa98f..602463e692 100644 --- a/rust/src/operations/restore.rs +++ b/rust/src/operations/restore.rs @@ -11,6 +11,7 @@ //! in file system. //! 6) Commit Protocol, all RemoveFile and AddFile actions //! into delta log using `try_commit_transaction` (commit will be failed in case of parallel transaction) +//! TODO: comment is outdated //! 7) If table was modified in parallel then ignore restore and raise exception. //! //! # Example @@ -30,9 +31,9 @@ use object_store::path::Path; use object_store::ObjectStore; use serde::Serialize; -use crate::operations::transaction::{prepare_commit, try_commit_transaction, TransactionError}; +use crate::logstore::LogStoreRef; +use crate::operations::transaction::{prepare_commit, TransactionError}; use crate::protocol::{Action, Add, DeltaOperation, Protocol, Remove}; -use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableConfig, DeltaTableError, ObjectStoreError}; @@ -73,7 +74,7 @@ pub struct RestoreBuilder { /// A snapshot of the to-be-restored table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: ObjectStoreRef, + log_store: LogStoreRef, /// Version to restore version_to_restore: Option, /// Datetime to restore @@ -86,10 +87,10 @@ pub struct RestoreBuilder { impl RestoreBuilder { /// Create a new [`RestoreBuilder`] - pub fn new(store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, version_to_restore: None, datetime_to_restore: None, ignore_missing_files: false, @@ -124,7 +125,7 @@ impl RestoreBuilder { } async fn execute( - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: DeltaTableState, version_to_restore: Option, datetime_to_restore: Option>, @@ -137,7 +138,7 @@ async fn execute( { return Err(DeltaTableError::from(RestoreError::InvalidRestoreParameter)); } - let mut table = DeltaTable::new(object_store.clone(), DeltaTableConfig::default()); + let mut table = DeltaTable::new(log_store.clone(), DeltaTableConfig::default()); let version = match datetime_to_restore { Some(datetime) => { table.load_with_datetime(datetime).await?; @@ -192,7 +193,7 @@ async fn execute( .collect(); if !ignore_missing_files { - check_files_available(object_store.as_ref(), &files_to_add).await?; + check_files_available(log_store.object_store().as_ref(), &files_to_add).await?; } let metrics = RestoreMetrics { @@ -223,7 +224,7 @@ async fn execute( actions.extend(files_to_remove.into_iter().map(Action::remove)); let commit = prepare_commit( - object_store.as_ref(), + log_store.object_store().as_ref(), &DeltaOperation::Restore { version: version_to_restore, datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }), @@ -234,13 +235,13 @@ async fn execute( ) .await?; let commit_version = snapshot.version() + 1; - match try_commit_transaction(object_store.as_ref(), &commit, commit_version).await { + match log_store.write_commit_entry(commit_version, &commit).await { Ok(_) => {} Err(err @ TransactionError::VersionAlreadyExists(_)) => { return Err(err.into()); } Err(err) => { - object_store.delete(&commit).await?; + log_store.object_store().delete(&commit).await?; return Err(err.into()); } } @@ -276,7 +277,7 @@ impl std::future::IntoFuture for RestoreBuilder { Box::pin(async move { let metrics = execute( - this.store.clone(), + this.log_store.clone(), this.snapshot.clone(), this.version_to_restore, this.datetime_to_restore, @@ -284,7 +285,7 @@ impl std::future::IntoFuture for RestoreBuilder { this.protocol_downgrade_allowed, ) .await?; - let mut table = DeltaTable::new_with_state(this.store, this.snapshot); + let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot); table.update().await?; Ok((table, metrics)) }) diff --git a/rust/src/operations/transaction/mod.rs b/rust/src/operations/transaction/mod.rs index 738ae404ec..daf73426cf 100644 --- a/rust/src/operations/transaction/mod.rs +++ b/rust/src/operations/transaction/mod.rs @@ -7,8 +7,8 @@ use serde_json::{Map, Value}; use crate::crate_version; use crate::errors::{DeltaResult, DeltaTableError}; +use crate::logstore::LogStore; use crate::protocol::{Action, CommitInfo, DeltaOperation}; -use crate::storage::commit_uri_from_version; use crate::table::state::DeltaTableState; mod conflict_checker; @@ -127,6 +127,7 @@ pub(crate) fn get_commit_bytes( /// Low-level transaction API. Creates a temporary commit file. Once created, /// the transaction object could be dropped and the actual commit could be executed /// with `DeltaTable.try_commit_transaction`. +/// TODO: comment is outdated now pub(crate) async fn prepare_commit<'a>( storage: &dyn ObjectStore, operation: &DeltaOperation, @@ -147,42 +148,26 @@ pub(crate) async fn prepare_commit<'a>( Ok(path) } -/// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists] -/// if the given `version` already exists. The caller should handle the retry logic itself. -/// This is low-level transaction API. If user does not want to maintain the commit loop then -/// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction` -/// with retry logic. -pub(crate) async fn try_commit_transaction( - storage: &dyn ObjectStore, - tmp_commit: &Path, - version: i64, -) -> Result { - // move temporary commit file to delta log directory - // rely on storage to fail if the file already exists - - storage - .rename_if_not_exists(tmp_commit, &commit_uri_from_version(version)) - .await - .map_err(|err| match err { - ObjectStoreError::AlreadyExists { .. } => { - TransactionError::VersionAlreadyExists(version) - } - _ => TransactionError::from(err), - })?; - Ok(version) -} - /// Commit a transaction, with up to 15 retries. This is higher-level transaction API. /// /// Will error early if the a concurrent transaction has already been committed /// and conflicts with this transaction. pub async fn commit( - storage: &dyn ObjectStore, + log_store: &dyn LogStore, actions: &Vec, operation: DeltaOperation, read_snapshot: &DeltaTableState, app_metadata: Option>, ) -> DeltaResult { - commit_with_retries(storage, actions, operation, read_snapshot, app_metadata, 15).await + commit_with_retries( + log_store, + actions, + operation, + read_snapshot, + app_metadata, + 15, + ) + .await } /// Commit a transaction, with up configurable number of retries. This is higher-level transaction API. @@ -190,24 +175,35 @@ pub async fn commit( /// The function will error early if the a concurrent transaction has already been committed /// and conflicts with this transaction. pub async fn commit_with_retries( - storage: &dyn ObjectStore, + log_store: &dyn LogStore, actions: &Vec, operation: DeltaOperation, read_snapshot: &DeltaTableState, app_metadata: Option>, max_retries: usize, ) -> DeltaResult { - let tmp_commit = - prepare_commit(storage, &operation, actions, read_snapshot, app_metadata).await?; + let tmp_commit = prepare_commit( + log_store.object_store().as_ref(), + &operation, + actions, + read_snapshot, + app_metadata, + ) + .await?; let mut attempt_number = 1; while attempt_number <= max_retries { let version = read_snapshot.version() + attempt_number as i64; - match try_commit_transaction(storage, &tmp_commit, version).await { - Ok(version) => return Ok(version), + match log_store.write_commit_entry(version, &tmp_commit).await { + Ok(()) => return Ok(version), Err(TransactionError::VersionAlreadyExists(version)) => { - let summary = WinningCommitSummary::try_new(storage, version - 1, version).await?; + let summary = WinningCommitSummary::try_new( + log_store.object_store().as_ref(), + version - 1, + version, + ) + .await?; let transaction_info = TransactionInfo::try_new( read_snapshot, operation.read_predicate(), @@ -222,13 +218,13 @@ pub async fn commit_with_retries( attempt_number += 1; } Err(err) => { - storage.delete(&tmp_commit).await?; + log_store.object_store().delete(&tmp_commit).await?; return Err(TransactionError::CommitConflict(err).into()); } }; } Err(err) => { - storage.delete(&tmp_commit).await?; + log_store.object_store().delete(&tmp_commit).await?; return Err(err.into()); } } @@ -239,11 +235,17 @@ pub async fn commit_with_retries( #[cfg(all(test, feature = "parquet"))] mod tests { + use std::{collections::HashMap, sync::Arc}; + use self::test_utils::{create_remove_action, init_table_actions}; use super::*; - use crate::DeltaConfigKey; + use crate::{ + logstore::default_logstore::DefaultLogStore, + storage::{commit_uri_from_version, DeltaObjectStore}, + DeltaConfigKey, + }; use object_store::memory::InMemory; - use std::collections::HashMap; + use url::Url; #[test] fn test_commit_uri_from_version() { @@ -287,18 +289,22 @@ mod tests { #[tokio::test] async fn test_try_commit_transaction() { - let store = InMemory::new(); + let store = Arc::new(InMemory::new()); + let url = Url::parse("mem://what/is/this").unwrap(); + let delta_store = DeltaObjectStore::new(store.clone(), url); + let log_store = DefaultLogStore { + storage: Arc::new(delta_store), + }; let tmp_path = Path::from("_delta_log/tmp"); let version_path = Path::from("_delta_log/00000000000000000000.json"); store.put(&tmp_path, bytes::Bytes::new()).await.unwrap(); store.put(&version_path, bytes::Bytes::new()).await.unwrap(); + let res = log_store.write_commit_entry(0, &tmp_path).await; // fails if file version already exists - let res = try_commit_transaction(&store, &tmp_path, 0).await; assert!(res.is_err()); // succeeds for next version - let res = try_commit_transaction(&store, &tmp_path, 1).await.unwrap(); - assert_eq!(res, 1); + log_store.write_commit_entry(1, &tmp_path).await.unwrap(); } } diff --git a/rust/src/operations/transaction/test_utils.rs b/rust/src/operations/transaction/test_utils.rs index ec7848b95a..08c22691cd 100644 --- a/rust/src/operations/transaction/test_utils.rs +++ b/rust/src/operations/transaction/test_utils.rs @@ -1,7 +1,7 @@ #![allow(unused)] use std::collections::HashMap; -use super::{prepare_commit, try_commit_transaction, CommitInfo}; +use super::{prepare_commit, CommitInfo}; use crate::protocol::{Action, Add, DeltaOperation, MetaData, Protocol, Remove, SaveMode}; use crate::table::state::DeltaTableState; use crate::table::DeltaTableMetaData; @@ -104,7 +104,7 @@ pub async fn create_initialized_table( partition_cols: &[String], configuration: Option>>, ) -> DeltaTable { - let storage = DeltaTableBuilder::from_uri("memory://") + let log_store = DeltaTableBuilder::from_uri("memory://") .build_storage() .unwrap(); let table_schema = Schema::new(vec![ @@ -145,11 +145,19 @@ pub async fn create_initialized_table( ), }; let actions = init_table_actions(None); - let prepared_commit = prepare_commit(storage.as_ref(), &operation, &actions, &state, None) - .await - .unwrap(); - try_commit_transaction(storage.as_ref(), &prepared_commit, 0) + let prepared_commit = prepare_commit( + log_store.object_store().as_ref(), + &operation, + &actions, + &state, + None, + ) + .await + .unwrap(); + + log_store + .write_commit_entry(0, &prepared_commit) .await .unwrap(); - DeltaTable::new_with_state(storage, state) + DeltaTable::new_with_state(log_store, state) } diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index 4764736eb2..1cb62e4d10 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -45,8 +45,8 @@ use serde_json::{Map, Value}; use crate::{ delta_datafusion::{expr::fmt_expr_to_sql, find_files, register_store, DeltaScanBuilder}, + logstore::LogStoreRef, protocol::{Action, DeltaOperation, Remove}, - storage::{DeltaObjectStore, ObjectStoreRef}, table::state::DeltaTableState, DeltaResult, DeltaTable, DeltaTableError, }; @@ -67,7 +67,7 @@ pub struct UpdateBuilder { /// A snapshot of the table's state snapshot: DeltaTableState, /// Delta object store for handling data files - object_store: Arc, + log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan state: Option, /// Properties passed to underlying parquet writer for when files are rewritten @@ -98,12 +98,12 @@ pub struct UpdateMetrics { impl UpdateBuilder { /// Create a new ['UpdateBuilder'] - pub fn new(object_store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { predicate: None, updates: HashMap::new(), snapshot, - object_store, + log_store, state: None, writer_properties: None, app_metadata: None, @@ -167,7 +167,7 @@ impl UpdateBuilder { async fn execute( predicate: Option, updates: HashMap, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, state: SessionState, writer_properties: Option, @@ -216,7 +216,13 @@ async fn execute( let table_partition_cols = current_metadata.partition_columns.clone(); let scan_start = Instant::now(); - let candidates = find_files(snapshot, object_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files( + snapshot, + log_store.object_store(), + &state, + predicate.clone(), + ) + .await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis() as u64; if candidates.candidates.is_empty() { @@ -228,7 +234,7 @@ async fn execute( let execution_props = state.execution_props(); // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches - let scan = DeltaScanBuilder::new(snapshot, object_store.clone(), &state) + let scan = DeltaScanBuilder::new(snapshot, log_store.object_store().clone(), &state) .with_files(&candidates.candidates) .build() .await?; @@ -360,7 +366,7 @@ async fn execute( state.clone(), projection.clone(), table_partition_cols.clone(), - object_store.clone(), + log_store.object_store().clone(), Some(snapshot.table_config().target_file_size() as usize), None, writer_properties, @@ -408,7 +414,7 @@ async fn execute( predicate: Some(fmt_expr_to_sql(&predicate)?), }; version = commit( - object_store.as_ref(), + log_store.as_ref(), &actions, operation, snapshot, @@ -431,7 +437,7 @@ impl std::future::IntoFuture for UpdateBuilder { let session = SessionContext::new(); // If a user provides their own their DF state then they must register the store themselves - register_store(this.object_store.clone(), session.runtime_env()); + register_store(this.log_store.object_store().clone(), session.runtime_env()); session.state() }); @@ -439,7 +445,7 @@ impl std::future::IntoFuture for UpdateBuilder { let ((actions, version), metrics) = execute( this.predicate, this.updates, - this.object_store.clone(), + this.log_store.clone(), &this.snapshot, state, this.writer_properties, @@ -450,7 +456,7 @@ impl std::future::IntoFuture for UpdateBuilder { this.snapshot .merge(DeltaTableState::from_actions(actions, version)?, true, true); - let table = DeltaTable::new_with_state(this.object_store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); Ok((table, metrics)) }) diff --git a/rust/src/operations/vacuum.rs b/rust/src/operations/vacuum.rs index ccaa22a33e..ab821b2244 100644 --- a/rust/src/operations/vacuum.rs +++ b/rust/src/operations/vacuum.rs @@ -32,6 +32,7 @@ use object_store::Error; use object_store::{path::Path, ObjectStore}; use crate::errors::{DeltaResult, DeltaTableError}; +use crate::logstore::LogStoreRef; use crate::storage::DeltaObjectStore; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -76,7 +77,7 @@ pub struct VacuumBuilder { /// A snapshot of the to-be-vacuumed table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Period of stale files allowed. retention_period: Option, /// Validate the retention period is not below the retention period configured in the table @@ -99,10 +100,10 @@ pub struct VacuumMetrics { /// Methods to specify various vacuum options and to execute the operation impl VacuumBuilder { /// Create a new [`VacuumBuilder`] - pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { VacuumBuilder { snapshot, - store, + log_store, retention_period: None, enforce_retention_duration: true, dry_run: false, @@ -157,7 +158,11 @@ impl VacuumBuilder { let valid_files = self.snapshot.file_paths_iter().collect::>(); let mut files_to_delete = vec![]; - let mut all_files = self.store.list(None).await.map_err(DeltaTableError::from)?; + let object_store = self.log_store.object_store(); + let mut all_files = object_store + .list(None) + .await + .map_err(DeltaTableError::from)?; let partition_columns = &self .snapshot @@ -193,7 +198,7 @@ impl std::future::IntoFuture for VacuumBuilder { let plan = this.create_vacuum_plan().await?; if this.dry_run { return Ok(( - DeltaTable::new_with_state(this.store, this.snapshot), + DeltaTable::new_with_state(this.log_store, this.snapshot), VacuumMetrics { files_deleted: plan.files_to_delete.iter().map(|f| f.to_string()).collect(), dry_run: true, @@ -201,9 +206,9 @@ impl std::future::IntoFuture for VacuumBuilder { )); } - let metrics = plan.execute(&this.store).await?; + let metrics = plan.execute(&this.log_store.object_store()).await?; Ok(( - DeltaTable::new_with_state(this.store, this.snapshot), + DeltaTable::new_with_state(this.log_store, this.snapshot), metrics, )) }) @@ -290,7 +295,7 @@ mod tests { async fn vacuum_delta_8_0_table() { let table = open_table("./tests/data/delta-0.8.0").await.unwrap(); - let result = VacuumBuilder::new(table.object_store(), table.state.clone()) + let result = VacuumBuilder::new(table.log_store, table.state.clone()) .with_retention_period(Duration::hours(1)) .with_dry_run(true) .await; @@ -298,7 +303,7 @@ mod tests { assert!(result.is_err()); let table = open_table("./tests/data/delta-0.8.0").await.unwrap(); - let (table, result) = VacuumBuilder::new(table.object_store(), table.state) + let (table, result) = VacuumBuilder::new(table.log_store, table.state) .with_retention_period(Duration::hours(0)) .with_dry_run(true) .with_enforce_retention_duration(false) @@ -310,7 +315,7 @@ mod tests { vec!["part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"] ); - let (table, result) = VacuumBuilder::new(table.object_store(), table.state) + let (table, result) = VacuumBuilder::new(table.log_store, table.state) .with_retention_period(Duration::hours(169)) .with_dry_run(true) .await @@ -327,7 +332,7 @@ mod tests { .as_secs() / 3600; let empty: Vec = Vec::new(); - let (_table, result) = VacuumBuilder::new(table.object_store(), table.state) + let (_table, result) = VacuumBuilder::new(table.log_store, table.state) .with_retention_period(Duration::hours(retention_hours as i64)) .with_dry_run(true) .await diff --git a/rust/src/operations/write.rs b/rust/src/operations/write.rs index 7472d02d07..17e15d6f0a 100644 --- a/rust/src/operations/write.rs +++ b/rust/src/operations/write.rs @@ -44,9 +44,10 @@ use super::MAX_SUPPORTED_WRITER_VERSION; use super::{transaction::commit, CreateBuilder}; use crate::delta_datafusion::DeltaDataChecker; use crate::errors::{DeltaResult, DeltaTableError}; +use crate::logstore::LogStoreRef; use crate::protocol::{Action, Add, DeltaOperation, Remove, SaveMode}; use crate::schema::Schema; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; +use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; use crate::writer::record_batch::divide_by_partition_values; use crate::writer::utils::PartitionPath; @@ -91,7 +92,7 @@ pub struct WriteBuilder { /// A snapshot of the to-be-loaded table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// The input plan input: Option>, /// Datafusion session state relevant for executing the input plan @@ -118,10 +119,10 @@ pub struct WriteBuilder { impl WriteBuilder { /// Create a new [`WriteBuilder`] - pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, input: None, state: None, mode: SaveMode::Append, @@ -211,7 +212,8 @@ impl WriteBuilder { } async fn check_preconditions(&self) -> DeltaResult> { - match self.store.is_delta_table_location().await? { + let object_store = self.log_store.object_store(); + match object_store.is_delta_table_location().await? { true => { let min_writer = self.snapshot.min_writer_version(); if min_writer > MAX_SUPPORTED_WRITER_VERSION { @@ -219,7 +221,7 @@ impl WriteBuilder { } else { match self.mode { SaveMode::ErrorIfExists => { - Err(WriteError::AlreadyExists(self.store.root_uri()).into()) + Err(WriteError::AlreadyExists(object_store.root_uri()).into()) } _ => Ok(vec![]), } @@ -237,7 +239,7 @@ impl WriteBuilder { Err(WriteError::MissingData) }?; let mut builder = CreateBuilder::new() - .with_object_store(self.store.clone()) + .with_log_store(self.log_store.clone()) .with_columns(schema.get_fields().clone()); if let Some(partition_columns) = self.partition_columns.as_ref() { builder = builder.with_partition_columns(partition_columns.clone()) @@ -357,7 +359,7 @@ impl std::future::IntoFuture for WriteBuilder { let schema = batches[0].schema(); let table_schema = this .snapshot - .physical_arrow_schema(this.store.clone()) + .physical_arrow_schema(this.log_store.object_store().clone()) .await .or_else(|_| this.snapshot.arrow_schema()) .unwrap_or(schema.clone()); @@ -419,7 +421,7 @@ impl std::future::IntoFuture for WriteBuilder { state, plan, partition_columns.clone(), - this.store.clone(), + this.log_store.object_store().clone(), this.target_file_size, this.write_batch_size, this.writer_properties, @@ -467,7 +469,7 @@ impl std::future::IntoFuture for WriteBuilder { }; let version = commit( - this.store.as_ref(), + this.log_store.as_ref(), &actions, DeltaOperation::Write { mode: this.mode, @@ -491,7 +493,7 @@ impl std::future::IntoFuture for WriteBuilder { // TODO should we build checkpoints based on config? - Ok(DeltaTable::new_with_state(this.store, this.snapshot)) + Ok(DeltaTable::new_with_state(this.log_store, this.snapshot)) }) } } diff --git a/rust/src/operations/writer.rs b/rust/src/operations/writer.rs index 05bda44ae6..d6b4762de0 100644 --- a/rust/src/operations/writer.rs +++ b/rust/src/operations/writer.rs @@ -406,9 +406,10 @@ mod tests { #[tokio::test] async fn test_write_partition() { - let object_store = DeltaTableBuilder::from_uri("memory://") + let log_store = DeltaTableBuilder::from_uri("memory://") .build_storage() .unwrap(); + let object_store = log_store.object_store(); let batch = get_record_batch(None, false); // write single un-partitioned batch @@ -439,12 +440,13 @@ mod tests { let object_store = DeltaTableBuilder::from_uri("memory://") .build_storage() - .unwrap(); + .unwrap() + .object_store(); let properties = WriterProperties::builder() .set_max_row_group_size(1024) .build(); // configure small target file size and and row group size so we can observe multiple files written - let mut writer = get_writer(object_store.clone(), &batch, Some(properties), Some(10_000)); + let mut writer = get_writer(object_store, &batch, Some(properties), Some(10_000)); writer.write(&batch).await.unwrap(); // check that we have written more then once file, and no more then 1 is below target size diff --git a/rust/src/protocol/checkpoints.rs b/rust/src/protocol/checkpoints.rs index 3bf2eb962e..518fc97932 100644 --- a/rust/src/protocol/checkpoints.rs +++ b/rust/src/protocol/checkpoints.rs @@ -69,7 +69,12 @@ pub const CHECKPOINT_RECORD_BATCH_SIZE: usize = 5000; /// Creates checkpoint at current table version pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), ProtocolError> { - create_checkpoint_for(table.version(), table.get_state(), table.storage.as_ref()).await?; + create_checkpoint_for( + table.version(), + table.get_state(), + table.object_store().as_ref(), + ) + .await?; Ok(()) } @@ -80,7 +85,7 @@ pub async fn cleanup_metadata(table: &DeltaTable) -> Result Utc::now().timestamp_millis() - table.get_state().log_retention_millis(); cleanup_expired_logs_for( table.version(), - table.storage.as_ref(), + table.object_store().as_ref(), log_retention_timestamp, ) .await @@ -97,7 +102,7 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup( let table = open_table_with_version(table_uri, version) .await .map_err(|err| ProtocolError::Generic(err.to_string()))?; - create_checkpoint_for(version, table.get_state(), table.storage.as_ref()).await?; + create_checkpoint_for(version, table.get_state(), table.object_store().as_ref()).await?; let enable_expired_log_cleanup = cleanup.unwrap_or_else(|| table.get_state().enable_expired_log_cleanup()); diff --git a/rust/src/storage/utils.rs b/rust/src/storage/utils.rs index 80710efd9b..ee45c3ae29 100644 --- a/rust/src/storage/utils.rs +++ b/rust/src/storage/utils.rs @@ -28,7 +28,7 @@ pub async fn copy_table( .with_storage_options(to_options.unwrap_or_default()) .with_allow_http(allow_http) .build_storage()?; - sync_stores(from_store, to_store).await + sync_stores(from_store.object_store(), to_store.object_store()).await } /// Synchronize the contents of two object stores diff --git a/rust/src/table/builder.rs b/rust/src/table/builder.rs index 92fc4851ad..710c1995c0 100644 --- a/rust/src/table/builder.rs +++ b/rust/src/table/builder.rs @@ -11,8 +11,10 @@ use url::Url; use super::DeltaTable; use crate::errors::{DeltaResult, DeltaTableError}; +use crate::logstore::default_logstore::DefaultLogStore; +use crate::logstore::LogStoreRef; use crate::storage::config::StorageOptions; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; +use crate::storage::DeltaObjectStore; #[allow(dead_code)] #[derive(Debug, thiserror::Error)] @@ -243,18 +245,19 @@ impl DeltaTableBuilder { } /// Build a delta storage backend for the given config - pub fn build_storage(self) -> DeltaResult { + pub fn build_storage(self) -> DeltaResult { match self.options.storage_backend { - Some((storage, location)) => Ok(Arc::new(DeltaObjectStore::new( - storage, - ensure_table_uri(location.as_str())?, - ))), + Some((storage, location)) => Ok(Arc::new(DefaultLogStore { + storage: Arc::new(DeltaObjectStore::new( + storage, + ensure_table_uri(location.as_str())?, + )), + })), None => { let location = ensure_table_uri(&self.options.table_uri)?; - Ok(Arc::new(DeltaObjectStore::try_new( - location, - self.storage_options(), - )?)) + Ok(Arc::new(DefaultLogStore { + storage: Arc::new(DeltaObjectStore::try_new(location, self.storage_options())?), + })) } } } diff --git a/rust/src/table/mod.rs b/rust/src/table/mod.rs index 4883134fcd..dabaf6e035 100644 --- a/rust/src/table/mod.rs +++ b/rust/src/table/mod.rs @@ -22,6 +22,7 @@ use uuid::Uuid; use self::builder::DeltaTableConfig; use self::state::DeltaTableState; use crate::errors::DeltaTableError; +use crate::logstore::{default_logstore::DefaultLogStore, LogStoreRef}; use crate::partitions::PartitionFilter; use crate::protocol::{self, find_latest_check_point_for_version, get_last_checkpoint, Action}; use crate::protocol::{Add, ProtocolError, Stats}; @@ -244,8 +245,8 @@ pub struct DeltaTable { pub state: DeltaTableState, /// the load options used during load pub config: DeltaTableConfig, - /// object store to access log and data files - pub(crate) storage: ObjectStoreRef, + /// log store + pub(crate) log_store: LogStoreRef, /// file metadata for latest checkpoint last_check_point: Option, /// table versions associated with timestamps @@ -260,7 +261,8 @@ impl Serialize for DeltaTable { let mut seq = serializer.serialize_seq(None)?; seq.serialize_element(&self.state)?; seq.serialize_element(&self.config)?; - seq.serialize_element(self.storage.as_ref())?; + // tp::TODO (de)serialize actual log store instead + seq.serialize_element(self.object_store().as_ref())?; seq.serialize_element(&self.last_check_point)?; seq.serialize_element(&self.version_timestamp)?; seq.end() @@ -291,9 +293,13 @@ impl<'de> Deserialize<'de> for DeltaTable { let config = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?; - let storage = seq - .next_element()? - .ok_or_else(|| A::Error::invalid_length(0, &self))?; + let storage = Arc::new( + seq.next_element()? + .ok_or_else(|| A::Error::invalid_length(0, &self))?, + ); + let log_store = Arc::new(DefaultLogStore { + storage: Arc::clone(&storage), + }); let last_check_point = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?; @@ -304,7 +310,7 @@ impl<'de> Deserialize<'de> for DeltaTable { let table = DeltaTable { state, config, - storage: Arc::new(storage), + log_store, last_check_point, version_timestamp, }; @@ -321,10 +327,10 @@ impl DeltaTable { /// /// NOTE: This is for advanced users. If you don't know why you need to use this method, please /// call one of the `open_table` helper methods instead. - pub fn new(storage: ObjectStoreRef, config: DeltaTableConfig) -> Self { + pub fn new(log_store: LogStoreRef, config: DeltaTableConfig) -> Self { Self { state: DeltaTableState::with_version(-1), - storage, + log_store, config, last_check_point: None, version_timestamp: HashMap::new(), @@ -336,10 +342,10 @@ impl DeltaTable { /// /// NOTE: This is for advanced users. If you don't know why you need to use this method, /// please call one of the `open_table` helper methods instead. - pub(crate) fn new_with_state(storage: ObjectStoreRef, state: DeltaTableState) -> Self { + pub(crate) fn new_with_state(log_store: LogStoreRef, state: DeltaTableState) -> Self { Self { state, - storage, + log_store, config: Default::default(), last_check_point: None, version_timestamp: HashMap::new(), @@ -348,18 +354,24 @@ impl DeltaTable { /// get a shared reference to the delta object store pub fn object_store(&self) -> ObjectStoreRef { - self.storage.clone() + self.log_store.object_store() } /// The URI of the underlying data pub fn table_uri(&self) -> String { - self.storage.root_uri() + self.log_store.object_store().root_uri() + } + + /// get a shared reference to the log store + pub fn log_store(&self) -> LogStoreRef { + self.log_store.clone() } /// Return the list of paths of given checkpoint. pub fn get_checkpoint_data_paths(&self, check_point: &CheckPoint) -> Vec { let checkpoint_prefix = format!("{:020}", check_point.version); - let log_path = self.storage.log_path(); + let object_store = self.object_store(); + let log_path = object_store.log_path(); let mut checkpoint_data_paths = Vec::new(); match check_point.parts { @@ -394,7 +406,8 @@ impl DeltaTable { let mut current_delta_log_ver = i64::MAX; // Get file objects from table. - let mut stream = self.storage.list(Some(self.storage.log_path())).await?; + let storage = self.object_store(); + let mut stream = storage.list(Some(storage.log_path())).await?; while let Some(obj_meta) = stream.next().await { let obj_meta = obj_meta?; @@ -417,54 +430,7 @@ impl DeltaTable { } /// returns the latest available version of the table pub async fn get_latest_version(&mut self) -> Result { - let version_start = match get_last_checkpoint(&self.storage).await { - Ok(last_check_point) => last_check_point.version, - Err(ProtocolError::CheckpointNotFound) => { - // no checkpoint - -1 - } - Err(e) => { - return Err(DeltaTableError::from(e)); - } - }; - - debug!("latest checkpoint version: {version_start}"); - - let version_start = max(self.version(), version_start); - - lazy_static! { - static ref DELTA_LOG_REGEX: Regex = - Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint).*$").unwrap(); - } - - // list files to find max version - let version = async { - let mut max_version: i64 = version_start; - let prefix = Some(self.storage.log_path()); - let offset_path = commit_uri_from_version(max_version); - let mut files = self.storage.list_with_offset(prefix, &offset_path).await?; - - while let Some(obj_meta) = files.next().await { - let obj_meta = obj_meta?; - if let Some(captures) = DELTA_LOG_REGEX.captures(obj_meta.location.as_ref()) { - let log_version = captures.get(1).unwrap().as_str().parse().unwrap(); - // listing may not be ordered - max_version = max(max_version, log_version); - // also cache timestamp for version, for faster time-travel - self.version_timestamp - .insert(log_version, obj_meta.last_modified.timestamp()); - } - } - - if max_version < 0 { - return Err(DeltaTableError::not_a_table(self.table_uri())); - } - - Ok::(max_version) - } - .await?; - - Ok(version) + self.log_store.get_latest_version(self.version()).await } /// Currently loaded version of the table @@ -485,12 +451,12 @@ impl DeltaTable { current_version: i64, ) -> Result { let next_version = current_version + 1; - let commit_uri = commit_uri_from_version(next_version); - let commit_log_bytes = self.storage.get(&commit_uri).await; - let commit_log_bytes = match commit_log_bytes { - Err(ObjectStoreError::NotFound { .. }) => return Ok(PeekCommit::UpToDate), + let commit_log_bytes = match self.log_store.read_commit_entry(next_version).await { + Ok(bytes) => Ok(bytes), + Err(DeltaTableError::ObjectStore { + source: ObjectStoreError::NotFound { .. }, + }) => return Ok(PeekCommit::UpToDate), Err(err) => Err(err), - Ok(result) => result.bytes().await, }?; let actions = Self::get_actions(next_version, commit_log_bytes).await; @@ -524,7 +490,7 @@ impl DeltaTable { /// loading the last checkpoint and incrementally applying each version since. #[cfg(any(feature = "parquet", feature = "parquet2"))] pub async fn update(&mut self) -> Result<(), DeltaTableError> { - match get_last_checkpoint(&self.storage).await { + match get_last_checkpoint(&self.object_store()).await { Ok(last_check_point) => { debug!("update with latest checkpoint {last_check_point:?}"); if Some(last_check_point) == self.last_check_point { @@ -569,13 +535,12 @@ impl DeltaTable { let buf_size = self.config.log_buffer_size; - let store = self.storage.clone(); + let log_store = self.log_store.clone(); let mut log_stream = futures::stream::iter(self.version() + 1..max_version + 1) .map(|version| { - let store = store.clone(); - let loc = commit_uri_from_version(version); + let log_store = log_store.clone(); async move { - let data = store.get(&loc).await?.bytes().await?; + let data = log_store.read_commit_entry(version).await?; let actions = Self::get_actions(version, data).await?; Ok((version, actions)) } @@ -611,7 +576,7 @@ impl DeltaTable { pub async fn load_version(&mut self, version: i64) -> Result<(), DeltaTableError> { // check if version is valid let commit_uri = commit_uri_from_version(version); - match self.storage.head(&commit_uri).await { + match self.object_store().head(&commit_uri).await { Ok(_) => {} Err(ObjectStoreError::NotFound { .. }) => { return Err(DeltaTableError::InvalidVersion(version)); @@ -623,7 +588,7 @@ impl DeltaTable { // 1. find latest checkpoint below version #[cfg(any(feature = "parquet", feature = "parquet2"))] - match find_latest_check_point_for_version(&self.storage, version).await? { + match find_latest_check_point_for_version(&self.object_store(), version).await? { Some(check_point) => { self.restore_checkpoint(check_point).await?; } @@ -647,7 +612,10 @@ impl DeltaTable { match self.version_timestamp.get(&version) { Some(ts) => Ok(*ts), None => { - let meta = self.storage.head(&commit_uri_from_version(version)).await?; + let meta = self + .object_store() + .head(&commit_uri_from_version(version)) + .await?; let ts = meta.last_modified.timestamp(); // also cache timestamp for version self.version_timestamp.insert(version, ts); @@ -741,7 +709,7 @@ impl DeltaTable { let files = self.get_files_by_partitions(filters)?; Ok(files .iter() - .map(|fname| self.storage.to_uri(fname)) + .map(|fname| self.object_store().to_uri(fname)) .collect()) } @@ -766,7 +734,7 @@ impl DeltaTable { pub fn get_file_uris(&self) -> impl Iterator + '_ { self.state .file_paths_iter() - .map(|path| self.storage.to_uri(&path)) + .map(|path| self.object_store().to_uri(&path)) } /// Returns statistics for files, in order diff --git a/rust/src/table/state.rs b/rust/src/table/state.rs index e72f726ba8..222bbe3fd1 100644 --- a/rust/src/table/state.rs +++ b/rust/src/table/state.rs @@ -67,7 +67,7 @@ impl DeltaTableState { /// Construct a delta table state object from commit version. pub async fn from_commit(table: &DeltaTable, version: i64) -> Result { let commit_uri = commit_uri_from_version(version); - let commit_log_bytes = match table.storage.get(&commit_uri).await { + let commit_log_bytes = match table.object_store().get(&commit_uri).await { Ok(get) => Ok(get.bytes().await?), Err(ObjectStoreError::NotFound { .. }) => Err(ProtocolError::EndOfLog), Err(source) => Err(ProtocolError::ObjectStore { source }), @@ -157,7 +157,7 @@ impl DeltaTableState { let mut new_state = Self::with_version(check_point.version); for f in &checkpoint_data_paths { - let obj = table.storage.get(f).await?.bytes().await?; + let obj = table.object_store().get(f).await?.bytes().await?; new_state.process_checkpoint_bytes(obj, &table.config)?; } diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index 124ec0365b..76b6c61820 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -88,6 +88,7 @@ impl IntegrationContext { _ => DeltaTableBuilder::from_uri(store_uri) .with_allow_http(true) .build_storage()? + .object_store() .storage_backend(), }; diff --git a/rust/src/writer/json.rs b/rust/src/writer/json.rs index f8d6d1a9e3..1ada3b6bd8 100644 --- a/rust/src/writer/json.rs +++ b/rust/src/writer/json.rs @@ -194,7 +194,7 @@ impl JsonWriter { .build(); Ok(Self { - storage, + storage: storage.object_store(), arrow_schema_ref: schema, writer_properties, partition_columns: partition_columns.unwrap_or_default(), @@ -217,7 +217,7 @@ impl JsonWriter { .build(); Ok(Self { - storage: table.storage.clone(), + storage: table.object_store(), arrow_schema_ref, writer_properties, partition_columns, diff --git a/rust/src/writer/mod.rs b/rust/src/writer/mod.rs index 8c5512127f..d1559c80af 100644 --- a/rust/src/writer/mod.rs +++ b/rust/src/writer/mod.rs @@ -145,7 +145,14 @@ pub trait DeltaWriter { partition_by, predicate: None, }; - let version = commit(table.storage.as_ref(), &adds, operation, &table.state, None).await?; + let version = commit( + table.log_store.as_ref(), + &adds, + operation, + &table.state, + None, + ) + .await?; table.update().await?; Ok(version) } diff --git a/rust/src/writer/record_batch.rs b/rust/src/writer/record_batch.rs index a6486ae109..8609c98a79 100644 --- a/rust/src/writer/record_batch.rs +++ b/rust/src/writer/record_batch.rs @@ -55,7 +55,8 @@ impl RecordBatchWriter { ) -> Result { let storage = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(storage_options.unwrap_or_default()) - .build_storage()?; + .build_storage()? + .object_store(); // Initialize writer properties for the underlying arrow writer let writer_properties = WriterProperties::builder() @@ -87,7 +88,7 @@ impl RecordBatchWriter { .build(); Ok(Self { - storage: table.storage.clone(), + storage: table.object_store(), arrow_schema_ref, writer_properties, partition_columns, diff --git a/rust/src/writer/test_utils.rs b/rust/src/writer/test_utils.rs index 0e2770759d..bf3a4ac79c 100644 --- a/rust/src/writer/test_utils.rs +++ b/rust/src/writer/test_utils.rs @@ -337,7 +337,7 @@ pub mod datafusion { use std::sync::Arc; pub async fn get_data(table: &DeltaTable) -> Vec { - let table = DeltaTable::new_with_state(table.object_store(), table.state.clone()); + let table = DeltaTable::new_with_state(table.log_store.clone(), table.state.clone()); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(table)).unwrap(); ctx.sql("select * from test") diff --git a/rust/tests/command_optimize.rs b/rust/tests/command_optimize.rs index 153d7f86d5..4a0ca1f087 100644 --- a/rust/tests/command_optimize.rs +++ b/rust/tests/command_optimize.rs @@ -296,7 +296,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { let operation = DeltaOperation::Delete { predicate: None }; commit( - other_dt.object_store().as_ref(), + other_dt.log_store().as_ref(), &vec![Action::remove(remove)], operation, &other_dt.state, @@ -304,9 +304,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { ) .await?; - let maybe_metrics = plan - .execute(dt.object_store(), &dt.state, 1, 20, None) - .await; + let maybe_metrics = plan.execute(dt.log_store(), &dt.state, 1, 20, None).await; assert!(maybe_metrics.is_err()); assert_eq!(dt.version(), version + 1); @@ -355,9 +353,7 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box> { ) .await?; - let metrics = plan - .execute(dt.object_store(), &dt.state, 1, 20, None) - .await?; + let metrics = plan.execute(dt.log_store(), &dt.state, 1, 20, None).await?; assert_eq!(metrics.num_files_added, 1); assert_eq!(metrics.num_files_removed, 2); @@ -397,7 +393,7 @@ async fn test_commit_interval() -> Result<(), Box> { let metrics = plan .execute( - dt.object_store(), + dt.log_store(), &dt.state, 1, 20, diff --git a/rust/tests/command_vacuum.rs b/rust/tests/command_vacuum.rs index 4437e9dc85..5086226cdf 100644 --- a/rust/tests/command_vacuum.rs +++ b/rust/tests/command_vacuum.rs @@ -297,6 +297,6 @@ async fn test_non_managed_files() { async fn is_deleted(context: &mut TestContext, path: &Path) -> bool { let backend = context.get_storage(); - let res = backend.head(path).await; + let res = backend.object_store().head(path).await; matches!(res, Err(ObjectStoreError::NotFound { .. })) } diff --git a/rust/tests/commit_info_format.rs b/rust/tests/commit_info_format.rs index fdb1f89d92..d2a3f25913 100644 --- a/rust/tests/commit_info_format.rs +++ b/rust/tests/commit_info_format.rs @@ -21,7 +21,7 @@ async fn test_operational_parameters() -> Result<(), Box> { }; commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, diff --git a/rust/tests/common/mod.rs b/rust/tests/common/mod.rs index 2ba20d0635..4784c892cd 100644 --- a/rust/tests/common/mod.rs +++ b/rust/tests/common/mod.rs @@ -1,10 +1,10 @@ #![allow(dead_code, unused_variables)] use bytes::Bytes; +use deltalake::logstore::LogStore; use deltalake::operations::create::CreateBuilder; use deltalake::operations::transaction::commit; use deltalake::protocol::{self, Add, DeltaOperation, Remove, SaveMode}; -use deltalake::storage::DeltaObjectStore; use deltalake::DeltaTableBuilder; use deltalake::{DeltaTable, Schema}; use object_store::{path::Path, ObjectStore}; @@ -27,7 +27,7 @@ pub mod s3; pub struct TestContext { /// The main table under test pub table: Option, - pub backend: Option>, + pub backend: Option>, /// The configuration used to create the backend. pub config: HashMap, /// An object when it is dropped will clean up any temporary resources created for the test @@ -54,7 +54,7 @@ impl TestContext { } } - pub fn get_storage(&mut self) -> Arc { + pub fn get_storage(&mut self) -> Arc { if self.backend.is_none() { self.backend = Some(self.new_storage()) } @@ -62,7 +62,7 @@ impl TestContext { self.backend.as_ref().unwrap().clone() } - fn new_storage(&self) -> Arc { + fn new_storage(&self) -> Arc { let config = self.config.clone(); let uri = config.get("URI").unwrap().to_string(); DeltaTableBuilder::from_uri(uri) @@ -81,9 +81,9 @@ impl TestContext { .iter() .map(|s| s.to_string()) .collect::>(); - let backend = self.new_storage(); + let log_store = self.new_storage(); CreateBuilder::new() - .with_object_store(backend) + .with_log_store(log_store) .with_table_name("delta-rs_test_table") .with_comment("Table created by delta-rs tests") .with_columns(schema.get_fields().clone()) @@ -142,7 +142,7 @@ pub async fn add_file( }; let actions = vec![protocol::Action::add(add)]; commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, @@ -175,7 +175,7 @@ pub async fn remove_file( let operation = DeltaOperation::Delete { predicate: None }; let actions = vec![protocol::Action::remove(remove)]; commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, diff --git a/rust/tests/fs_common/mod.rs b/rust/tests/fs_common/mod.rs index 3c5ab39e2c..eea157559d 100644 --- a/rust/tests/fs_common/mod.rs +++ b/rust/tests/fs_common/mod.rs @@ -115,7 +115,7 @@ pub async fn commit_actions( operation: DeltaOperation, ) -> i64 { let version = commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, diff --git a/rust/tests/integration_checkpoint.rs b/rust/tests/integration_checkpoint.rs index c4361ac7bf..f9c2404ee7 100644 --- a/rust/tests/integration_checkpoint.rs +++ b/rust/tests/integration_checkpoint.rs @@ -60,7 +60,8 @@ async fn cleanup_metadata_test(context: &IntegrationContext) -> TestResult { let table_uri = context.root_uri(); let object_store = DeltaTableBuilder::from_uri(table_uri) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); let log_path = |version| { object_store diff --git a/rust/tests/integration_concurrent_writes.rs b/rust/tests/integration_concurrent_writes.rs index f34feac6e0..37bb953098 100644 --- a/rust/tests/integration_concurrent_writes.rs +++ b/rust/tests/integration_concurrent_writes.rs @@ -166,7 +166,7 @@ impl Worker { deletion_vector: None, })]; let version = commit( - self.table.object_store().as_ref(), + self.table.log_store().as_ref(), &actions, operation, &self.table.state, diff --git a/rust/tests/integration_datafusion.rs b/rust/tests/integration_datafusion.rs index 5aafe52e87..0bc43faa4a 100644 --- a/rust/tests/integration_datafusion.rs +++ b/rust/tests/integration_datafusion.rs @@ -210,7 +210,7 @@ mod local { // Trying to execute the write from the input plan without providing Datafusion with a session // state containing the referenced object store in the registry results in an error. assert!( - WriteBuilder::new(target_table.object_store(), target_table.state.clone()) + WriteBuilder::new(target_table.log_store(), target_table.state.clone()) .with_input_execution_plan(source_scan.clone()) .await .unwrap_err() @@ -234,11 +234,10 @@ mod local { .register_object_store(source_store_url, Arc::from(source_store)); // Execute write to the target table with the proper state - let target_table = - WriteBuilder::new(target_table.object_store(), target_table.state.clone()) - .with_input_execution_plan(source_scan) - .with_input_session_state(state) - .await?; + let target_table = WriteBuilder::new(target_table.log_store(), target_table.state.clone()) + .with_input_execution_plan(source_scan) + .with_input_session_state(state) + .await?; ctx.register_table("target", Arc::new(target_table))?; // Check results diff --git a/rust/tests/integration_object_store.rs b/rust/tests/integration_object_store.rs index d8b6a0e058..ef4a992293 100644 --- a/rust/tests/integration_object_store.rs +++ b/rust/tests/integration_object_store.rs @@ -81,7 +81,8 @@ async fn read_write_test_onelake(integration: StorageIntegration, path: &Path) - let delta_store = DeltaTableBuilder::from_uri(&context.root_uri()) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); //println!("{:#?}",delta_store); @@ -103,7 +104,8 @@ async fn test_object_store(integration: StorageIntegration, skip_copy: bool) -> let context = IntegrationContext::new(integration)?; let delta_store = DeltaTableBuilder::from_uri(context.root_uri()) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); put_get_delete_list(delta_store.as_ref()).await?; list_with_delimiter(delta_store.as_ref()).await?; @@ -482,7 +484,8 @@ async fn test_object_store_prefixes(integration: StorageIntegration) -> TestResu let rooturi = format!("{}/{}", context.root_uri(), prefix); let delta_store = DeltaTableBuilder::from_uri(&rooturi) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); let contents = Bytes::from("cats"); let path = Path::from("test"); diff --git a/rust/tests/integration_read.rs b/rust/tests/integration_read.rs index 1fbf35b878..f94a516a5c 100644 --- a/rust/tests/integration_read.rs +++ b/rust/tests/integration_read.rs @@ -153,7 +153,8 @@ async fn verify_store(integration: &IntegrationContext, root_path: &str) -> Test let table_uri = format!("{}/{}", integration.root_uri(), root_path); let storage = DeltaTableBuilder::from_uri(table_uri.clone()) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); let files = storage.list_with_delimiter(None).await?; assert_eq!( diff --git a/rust/tests/repair_s3_rename_test.rs b/rust/tests/repair_s3_rename_test.rs index fdd78fb955..19e7b429a9 100644 --- a/rust/tests/repair_s3_rename_test.rs +++ b/rust/tests/repair_s3_rename_test.rs @@ -117,6 +117,7 @@ fn create_s3_backend( .with_allow_http(true) .build_storage() .unwrap() + .object_store() .storage_backend(); let delayed_store = DelayedObjectStore {