From 394ae2bfe6cf857dca7b89122462588da944f5c1 Mon Sep 17 00:00:00 2001 From: Thomas Peiselt Date: Mon, 6 Nov 2023 11:34:03 +0100 Subject: [PATCH] feat: default logstore implementation Introduce a `LogStore` abstraction to channel all log store reads and writes through a single place. This is supposed to allow implementations with more sophisticated locking mechanisms that do not rely on atomic rename semantics for the underlying object store. This does not change any functionality - it reorganizes read operations and commits on the delta commit log to be funneled through the respective methods of `LogStore`. The goal is to align the implementation of multi-cluster writes for Delta Lake on S3 with the one provided by the original `delta` library, enabling multi-cluster writes with some writers using Spark / Delta library and other writers using `delta-rs` For an overview of how it's done in delta, please see: 1. Delta [blog post](https://delta.io/blog/2022-05-18-multi-cluster-writes-to-delta-lake-storage-in-s3/) (high-level concept) 2. Associated Databricks [design doc](https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit#heading=h.mjjuxw9mcz9h) (detailed read) 3. [S3DynamoDbLogStore.java](https://github.com/delta-io/delta/blob/master/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java)(content warning: Java code behind this link) This approach requires readers of a delta table to "recover" unfinished commits from writers - as a result, reading and writing is combined in a single interface, which in this PR is modeled after [LogStore.java](https://github.com/delta-io/delta/blob/master/storage/src/main/java/io/delta/storage/LogStore.java). Currently in `delta-rs`, read path for commits is implemented directly in `DeltaTable`, and there's no mechanism to implement storage-specific behavior like interacting with DynamoDb. --- .../src/delta_datafusion/mod.rs | 6 +- crates/deltalake-core/src/lib.rs | 1 + .../src/logstore/default_logstore.rs | 115 ++++++++++++++++ crates/deltalake-core/src/logstore/mod.rs | 53 ++++++++ .../deltalake-core/src/operations/create.rs | 28 ++-- .../deltalake-core/src/operations/delete.rs | 29 ++-- .../src/operations/filesystem_check.rs | 24 ++-- crates/deltalake-core/src/operations/load.rs | 10 +- crates/deltalake-core/src/operations/merge.rs | 22 ++-- crates/deltalake-core/src/operations/mod.rs | 20 +-- .../deltalake-core/src/operations/optimize.rs | 29 ++-- .../deltalake-core/src/operations/restore.rs | 29 ++-- .../src/operations/transaction/mod.rs | 88 +++++++------ .../src/operations/transaction/test_utils.rs | 22 +++- .../deltalake-core/src/operations/update.rs | 30 +++-- .../deltalake-core/src/operations/vacuum.rs | 34 +++-- crates/deltalake-core/src/operations/write.rs | 24 ++-- .../deltalake-core/src/operations/writer.rs | 8 +- .../src/protocol/checkpoints.rs | 11 +- crates/deltalake-core/src/storage/utils.rs | 2 +- crates/deltalake-core/src/table/builder.rs | 23 ++-- crates/deltalake-core/src/table/mod.rs | 124 +++++++----------- crates/deltalake-core/src/table/state.rs | 4 +- crates/deltalake-core/src/test_utils.rs | 1 + crates/deltalake-core/src/writer/json.rs | 4 +- crates/deltalake-core/src/writer/mod.rs | 9 +- .../deltalake-core/src/writer/record_batch.rs | 5 +- .../deltalake-core/src/writer/test_utils.rs | 2 +- .../deltalake-core/tests/command_optimize.rs | 12 +- crates/deltalake-core/tests/command_vacuum.rs | 2 +- .../tests/commit_info_format.rs | 2 +- crates/deltalake-core/tests/common/mod.rs | 16 +-- crates/deltalake-core/tests/fs_common/mod.rs | 2 +- .../tests/integration_checkpoint.rs | 3 +- .../tests/integration_concurrent_writes.rs | 2 +- .../tests/integration_datafusion.rs | 11 +- .../tests/integration_object_store.rs | 9 +- .../deltalake-core/tests/integration_read.rs | 3 +- .../tests/repair_s3_rename_test.rs | 1 + python/src/filesystem.rs | 3 +- python/src/lib.rs | 21 ++- 41 files changed, 521 insertions(+), 323 deletions(-) create mode 100644 crates/deltalake-core/src/logstore/default_logstore.rs create mode 100644 crates/deltalake-core/src/logstore/mod.rs diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index 19d7a510ef..645c62c53c 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -1924,7 +1924,8 @@ mod tests { .build(&table.state) .unwrap(); - let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap(); + let storage = table.object_store(); + let provider = DeltaTableProvider::try_new(table.state, storage, config).unwrap(); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(provider)).unwrap(); @@ -1983,7 +1984,8 @@ mod tests { let config = DeltaScanConfigBuilder::new().build(&table.state).unwrap(); - let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap(); + let storage = table.object_store(); + let provider = DeltaTableProvider::try_new(table.state, storage, config).unwrap(); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(provider)).unwrap(); diff --git a/crates/deltalake-core/src/lib.rs b/crates/deltalake-core/src/lib.rs index d683b906dd..644da2dcac 100644 --- a/crates/deltalake-core/src/lib.rs +++ b/crates/deltalake-core/src/lib.rs @@ -85,6 +85,7 @@ compile_error!( pub mod data_catalog; pub mod errors; pub mod kernel; +pub mod logstore; pub mod operations; pub mod protocol; pub mod schema; diff --git a/crates/deltalake-core/src/logstore/default_logstore.rs b/crates/deltalake-core/src/logstore/default_logstore.rs new file mode 100644 index 0000000000..d27b3650f8 --- /dev/null +++ b/crates/deltalake-core/src/logstore/default_logstore.rs @@ -0,0 +1,115 @@ +//! Default implementation of [`LogStore`] for storage backends with atomic put-if-absent operation +use std::cmp::max; + +use bytes::Bytes; +use futures::StreamExt; +use lazy_static::lazy_static; +use log::debug; +use object_store::{path::Path, Error as ObjectStoreError, ObjectStore}; +use regex::Regex; + +use super::LogStore; +use crate::{ + operations::transaction::TransactionError, + protocol::{get_last_checkpoint, ProtocolError}, + storage::{commit_uri_from_version, ObjectStoreRef}, + DeltaResult, DeltaTableError, +}; + +/// Default [`LogStore`] implementation +#[derive(Debug, Clone)] +pub struct DefaultLogStore { + pub(crate) storage: ObjectStoreRef, +} + +#[async_trait::async_trait] +impl LogStore for DefaultLogStore { + async fn read_commit_entry(&self, version: i64) -> DeltaResult { + let commit_uri = commit_uri_from_version(version); + let data = self.storage.get(&commit_uri).await?.bytes().await?; + Ok(data) + + // TODO: return actual actions instead + // let actions = Self::get_actions(next_version, commit_log_bytes).await; + } + + /// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists] + /// if the given `version` already exists. The caller should handle the retry logic itself. + /// This is low-level transaction API. If user does not want to maintain the commit loop then + /// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction` + /// with retry logic. + async fn write_commit_entry( + &self, + version: i64, + tmp_commit: &Path, + ) -> Result<(), TransactionError> { + // move temporary commit file to delta log directory + // rely on storage to fail if the file already exists - + self.storage + .rename_if_not_exists(tmp_commit, &commit_uri_from_version(version)) + .await + .map_err(|err| match err { + ObjectStoreError::AlreadyExists { .. } => { + TransactionError::VersionAlreadyExists(version) + } + _ => TransactionError::from(err), + })?; + Ok(()) + } + + async fn get_latest_version(&self, current_version: i64) -> DeltaResult { + let version_start = match get_last_checkpoint(&self.storage).await { + Ok(last_check_point) => last_check_point.version, + Err(ProtocolError::CheckpointNotFound) => { + // no checkpoint + -1 + } + Err(e) => { + return Err(DeltaTableError::from(e)); + } + }; + + debug!("latest checkpoint version: {version_start}"); + + let version_start = max(current_version, version_start); + + lazy_static! { + static ref DELTA_LOG_REGEX: Regex = + Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint).*$").unwrap(); + } + + // list files to find max version + let version = async { + let mut max_version: i64 = version_start; + let prefix = Some(self.storage.log_path()); + let offset_path = commit_uri_from_version(max_version); + let mut files = self.storage.list_with_offset(prefix, &offset_path).await?; + // let mut files = self.storage.list_with_offset(prefix, &offset_path).await?; + + while let Some(obj_meta) = files.next().await { + let obj_meta = obj_meta?; + if let Some(captures) = DELTA_LOG_REGEX.captures(obj_meta.location.as_ref()) { + let log_version = captures.get(1).unwrap().as_str().parse().unwrap(); + // listing may not be ordered + max_version = max(max_version, log_version); + // also cache timestamp for version, for faster time-travel + // TODO: temporarily disabled because `version_timestamp` is not available in the [`LogStore`] + // self.version_timestamp + // .insert(log_version, obj_meta.last_modified.timestamp()); + } + } + + if max_version < 0 { + return Err(DeltaTableError::not_a_table(self.storage.root_uri())); + } + + Ok::(max_version) + } + .await?; + Ok(version) + } + + fn object_store(&self) -> ObjectStoreRef { + self.storage.clone() + } +} diff --git a/crates/deltalake-core/src/logstore/mod.rs b/crates/deltalake-core/src/logstore/mod.rs new file mode 100644 index 0000000000..51d89ecff4 --- /dev/null +++ b/crates/deltalake-core/src/logstore/mod.rs @@ -0,0 +1,53 @@ +//! Delta log store. +use std::sync::Arc; + +use crate::{ + errors::DeltaResult, operations::transaction::TransactionError, storage::ObjectStoreRef, +}; +use bytes::Bytes; +use object_store::path::Path; + +pub mod default_logstore; + +/// Sharable reference to [`LogStore`] +pub type LogStoreRef = Arc; + +/// Trait for critical operations required to read and write commit entries in Delta logs. +/// +/// The correctness is predicated on the atomicity and durability guarantees of +/// the implementation of this interface. Specifically, +/// +/// - Atomic visibility: Any commit created via `write_commit_entry` must become visible atomically. +/// - Mutual exclusion: Only one writer must be able to create a commit for a specific version. +/// - Consistent listing: Once a commit entry for version `v` has been written, any future call to +/// `get_latest_version` must return a version >= `v`, i.e. the underlying file system entry must +/// become visible immediately. +#[async_trait::async_trait] +pub trait LogStore: Sync + Send { + /// Read data for commit entry with the given version. + /// TODO: return the actual commit data, i.e. Vec, instead? + async fn read_commit_entry(&self, version: i64) -> DeltaResult; + + /// Write list of actions as delta commit entry for given version. + /// + /// This operation can be retried with a higher version in case the write + /// fails with `TransactionError::VersionAlreadyExists`. + async fn write_commit_entry( + &self, + version: i64, + tmp_commit: &Path, + ) -> Result<(), TransactionError>; + + /// Find latest version currently stored in the delta log. + async fn get_latest_version(&self, start_version: i64) -> DeltaResult; + + /// Get underlying object store. + fn object_store(&self) -> ObjectStoreRef; +} + +// TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders +impl std::fmt::Debug for dyn LogStore + '_ { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.object_store().fmt(f) + } +} diff --git a/crates/deltalake-core/src/operations/create.rs b/crates/deltalake-core/src/operations/create.rs index 1dc9fdf8b2..c8c24a9d00 100644 --- a/crates/deltalake-core/src/operations/create.rs +++ b/crates/deltalake-core/src/operations/create.rs @@ -11,8 +11,8 @@ use super::transaction::commit; use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, DataType, Metadata, Protocol, StructField, StructType}; +use crate::logstore::{LogStore, LogStoreRef}; use crate::protocol::{DeltaOperation, SaveMode}; -use crate::storage::DeltaObjectStore; use crate::table::builder::ensure_table_uri; use crate::table::config::DeltaConfigKey; use crate::table::DeltaTableMetaData; @@ -55,7 +55,7 @@ pub struct CreateBuilder { partition_columns: Option>, storage_options: Option>, actions: Vec, - object_store: Option>, + log_store: Option, configuration: HashMap>, metadata: Option>, } @@ -78,7 +78,7 @@ impl CreateBuilder { partition_columns: None, storage_options: None, actions: Default::default(), - object_store: None, + log_store: None, configuration: Default::default(), metadata: Default::default(), } @@ -198,9 +198,9 @@ impl CreateBuilder { self } - /// Provide a [`DeltaObjectStore`] instance, that points at table location - pub fn with_object_store(mut self, object_store: Arc) -> Self { - self.object_store = Some(object_store); + /// Provide a [`LogStore`] instance, that points at table location + pub fn with_log_store(mut self, log_store: Arc) -> Self { + self.log_store = Some(log_store); self } @@ -219,12 +219,12 @@ impl CreateBuilder { return Err(CreateError::MissingSchema.into()); } - let (storage_url, table) = if let Some(object_store) = self.object_store { + let (storage_url, table) = if let Some(log_store) = self.log_store { ( - ensure_table_uri(object_store.root_uri())? + ensure_table_uri(log_store.object_store().root_uri())? .as_str() .to_string(), - DeltaTable::new(object_store, Default::default()), + DeltaTable::new(log_store, Default::default()), ) } else { let storage_url = ensure_table_uri(self.location.ok_or(CreateError::MissingLocation)?)?; @@ -311,7 +311,7 @@ impl std::future::IntoFuture for CreateBuilder { }; let version = commit( - table.object_store().as_ref(), + table.log_store.as_ref(), &actions, operation, table_state, @@ -443,11 +443,11 @@ mod tests { assert_eq!(table.version(), 0); let first_id = table.get_metadata().unwrap().id.clone(); - let object_store = table.object_store(); + let log_store = table.log_store; // Check an error is raised when a table exists at location let table = CreateBuilder::new() - .with_object_store(object_store.clone()) + .with_log_store(log_store.clone()) .with_columns(schema.fields().clone()) .with_save_mode(SaveMode::ErrorIfExists) .await; @@ -455,7 +455,7 @@ mod tests { // Check current table is returned when ignore option is chosen. let table = CreateBuilder::new() - .with_object_store(object_store.clone()) + .with_log_store(log_store.clone()) .with_columns(schema.fields().clone()) .with_save_mode(SaveMode::Ignore) .await @@ -464,7 +464,7 @@ mod tests { // Check table is overwritten let table = CreateBuilder::new() - .with_object_store(object_store.clone()) + .with_log_store(log_store.clone()) .with_columns(schema.fields().iter().cloned()) .with_save_mode(SaveMode::Overwrite) .await diff --git a/crates/deltalake-core/src/operations/delete.rs b/crates/deltalake-core/src/operations/delete.rs index 7f8be1f293..b697d0cdbe 100644 --- a/crates/deltalake-core/src/operations/delete.rs +++ b/crates/deltalake-core/src/operations/delete.rs @@ -21,6 +21,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Instant, SystemTime, UNIX_EPOCH}; +use crate::logstore::LogStoreRef; use datafusion::execution::context::{SessionContext, SessionState}; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_plan::filter::FilterExec; @@ -40,7 +41,7 @@ use crate::kernel::{Action, Add, Remove}; use crate::operations::transaction::commit; use crate::operations::write::write_execution_plan; use crate::protocol::DeltaOperation; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; +use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -54,7 +55,7 @@ pub struct DeleteBuilder { /// A snapshot of the table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan state: Option, /// Properties passed to underlying parquet writer for when files are rewritten @@ -84,11 +85,11 @@ pub struct DeleteMetrics { impl DeleteBuilder { /// Create a new [`DeleteBuilder`] - pub fn new(object_store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { predicate: None, snapshot, - store: object_store, + log_store, state: None, app_metadata: None, writer_properties: None, @@ -187,7 +188,7 @@ async fn excute_non_empty_expr( async fn execute( predicate: Option, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, state: SessionState, writer_properties: Option, @@ -197,7 +198,13 @@ async fn execute( let mut metrics = DeleteMetrics::default(); let scan_start = Instant::now(); - let candidates = find_files(snapshot, object_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files( + snapshot, + log_store.object_store().clone(), + &state, + predicate.clone(), + ) + .await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_micros(); let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); @@ -208,7 +215,7 @@ async fn execute( let write_start = Instant::now(); let add = excute_non_empty_expr( snapshot, - object_store.clone(), + log_store.object_store().clone(), &state, &predicate, &mut metrics, @@ -254,7 +261,7 @@ async fn execute( predicate: Some(fmt_expr_to_sql(&predicate)?), }; version = commit( - object_store.as_ref(), + log_store.as_ref(), &actions, operation, snapshot, @@ -278,7 +285,7 @@ impl std::future::IntoFuture for DeleteBuilder { let session = SessionContext::new(); // If a user provides their own their DF state then they must register the store themselves - register_store(this.store.clone(), session.runtime_env()); + register_store(this.log_store.object_store().clone(), session.runtime_env()); session.state() }); @@ -295,7 +302,7 @@ impl std::future::IntoFuture for DeleteBuilder { let ((actions, version), metrics) = execute( predicate, - this.store.clone(), + this.log_store.clone(), &this.snapshot, state, this.writer_properties, @@ -305,7 +312,7 @@ impl std::future::IntoFuture for DeleteBuilder { this.snapshot .merge(DeltaTableState::from_actions(actions, version)?, true, true); - let table = DeltaTable::new_with_state(this.store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/filesystem_check.rs b/crates/deltalake-core/src/operations/filesystem_check.rs index 65716bbfe1..826dd2975e 100644 --- a/crates/deltalake-core/src/operations/filesystem_check.rs +++ b/crates/deltalake-core/src/operations/filesystem_check.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; use std::fmt::Debug; -use std::sync::Arc; use std::time::SystemTime; use std::time::UNIX_EPOCH; @@ -27,9 +26,9 @@ use url::{ParseError, Url}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, Add, Remove}; +use crate::logstore::LogStoreRef; use crate::operations::transaction::commit; use crate::protocol::DeltaOperation; -use crate::storage::DeltaObjectStore; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -40,7 +39,7 @@ pub struct FileSystemCheckBuilder { /// A snapshot of the to-be-checked table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Don't remove actions to the table log. Just determine which files can be removed dry_run: bool, } @@ -56,7 +55,7 @@ pub struct FileSystemCheckMetrics { struct FileSystemCheckPlan { /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Files that no longer exists in undlying ObjectStore but have active add actions pub files_to_remove: Vec, } @@ -74,10 +73,10 @@ fn is_absolute_path(path: &str) -> DeltaResult { impl FileSystemCheckBuilder { /// Create a new [`FileSystemCheckBuilder`] - pub fn new(store: Arc, state: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, state: DeltaTableState) -> Self { FileSystemCheckBuilder { snapshot: state, - store, + log_store, dry_run: false, } } @@ -91,7 +90,7 @@ impl FileSystemCheckBuilder { async fn create_fsck_plan(&self) -> DeltaResult { let mut files_relative: HashMap<&str, &Add> = HashMap::with_capacity(self.snapshot.files().len()); - let store = self.store.clone(); + let log_store = self.log_store.clone(); for active in self.snapshot.files() { if is_absolute_path(&active.path)? { @@ -103,7 +102,8 @@ impl FileSystemCheckBuilder { } } - let mut files = self.store.list(None).await?; + let object_store = log_store.object_store(); + let mut files = object_store.list(None).await?; while let Some(result) = files.next().await { let file = result?; files_relative.remove(file.location.as_ref()); @@ -120,7 +120,7 @@ impl FileSystemCheckBuilder { Ok(FileSystemCheckPlan { files_to_remove, - store, + log_store, }) } } @@ -156,7 +156,7 @@ impl FileSystemCheckPlan { } commit( - self.store.as_ref(), + self.log_store.as_ref(), &actions, DeltaOperation::FileSystemCheck {}, snapshot, @@ -183,7 +183,7 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { let plan = this.create_fsck_plan().await?; if this.dry_run { return Ok(( - DeltaTable::new_with_state(this.store, this.snapshot), + DeltaTable::new_with_state(this.log_store, this.snapshot), FileSystemCheckMetrics { files_removed: plan.files_to_remove.into_iter().map(|f| f.path).collect(), dry_run: true, @@ -192,7 +192,7 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { } let metrics = plan.execute(&this.snapshot).await?; - let mut table = DeltaTable::new_with_state(this.store, this.snapshot); + let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot); table.update().await?; Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/load.rs b/crates/deltalake-core/src/operations/load.rs index 7baa59e3e1..1a4c5c4cc6 100644 --- a/crates/deltalake-core/src/operations/load.rs +++ b/crates/deltalake-core/src/operations/load.rs @@ -7,7 +7,7 @@ use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use futures::future::BoxFuture; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::storage::DeltaObjectStore; +use crate::logstore::LogStoreRef; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -16,17 +16,17 @@ pub struct LoadBuilder { /// A snapshot of the to-be-loaded table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// A sub-selection of columns to be loaded columns: Option>, } impl LoadBuilder { /// Create a new [`LoadBuilder`] - pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, columns: None, } } @@ -46,7 +46,7 @@ impl std::future::IntoFuture for LoadBuilder { let this = self; Box::pin(async move { - let table = DeltaTable::new_with_state(this.store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); let schema = table.state.arrow_schema()?; let projection = this .columns diff --git a/crates/deltalake-core/src/operations/merge.rs b/crates/deltalake-core/src/operations/merge.rs index 57621cb316..eb28e14f69 100644 --- a/crates/deltalake-core/src/operations/merge.rs +++ b/crates/deltalake-core/src/operations/merge.rs @@ -68,10 +68,10 @@ use super::transaction::commit; use crate::delta_datafusion::expr::{fmt_expr_to_sql, parse_predicate_expression}; use crate::delta_datafusion::{register_store, DeltaScanBuilder}; use crate::kernel::{Action, Remove}; +use crate::logstore::LogStoreRef; use crate::operations::datafusion_utils::MetricObserverExec; use crate::operations::write::write_execution_plan; use crate::protocol::{DeltaOperation, MergePredicate}; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableError}; @@ -107,7 +107,7 @@ pub struct MergeBuilder { /// The source data source: DataFrame, /// Delta object store for handling data files - object_store: Arc, + log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan state: Option, /// Properties passed to underlying parquet writer for when files are rewritten @@ -122,7 +122,7 @@ pub struct MergeBuilder { impl MergeBuilder { /// Create a new [`MergeBuilder`] pub fn new>( - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: DeltaTableState, predicate: E, source: DataFrame, @@ -132,7 +132,7 @@ impl MergeBuilder { predicate, source, snapshot, - object_store, + log_store, source_alias: None, target_alias: None, state: None, @@ -561,7 +561,7 @@ pub struct MergeMetrics { async fn execute( predicate: Expression, source: DataFrame, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, state: SessionState, writer_properties: Option, @@ -590,7 +590,7 @@ async fn execute( // predicates also need to be considered when pruning let target = Arc::new( - DeltaScanBuilder::new(snapshot, object_store.clone(), &state) + DeltaScanBuilder::new(snapshot, log_store.object_store(), &state) .with_schema(snapshot.input_schema()?) .build() .await?, @@ -1126,7 +1126,7 @@ async fn execute( state.clone(), projection.clone(), table_partition_cols.clone(), - object_store.clone(), + log_store.object_store().clone(), Some(snapshot.table_config().target_file_size() as usize), None, writer_properties, @@ -1188,7 +1188,7 @@ async fn execute( not_matched_by_source_predicates: not_match_source_operations, }; version = commit( - object_store.as_ref(), + log_store.as_ref(), &actions, operation, snapshot, @@ -1212,7 +1212,7 @@ impl std::future::IntoFuture for MergeBuilder { let session = SessionContext::new(); // If a user provides their own their DF state then they must register the store themselves - register_store(this.object_store.clone(), session.runtime_env()); + register_store(this.log_store.object_store().clone(), session.runtime_env()); session.state() }); @@ -1220,7 +1220,7 @@ impl std::future::IntoFuture for MergeBuilder { let ((actions, version), metrics) = execute( this.predicate, this.source, - this.object_store.clone(), + this.log_store.clone(), &this.snapshot, state, this.writer_properties, @@ -1236,7 +1236,7 @@ impl std::future::IntoFuture for MergeBuilder { this.snapshot .merge(DeltaTableState::from_actions(actions, version)?, true, true); - let table = DeltaTable::new_with_state(this.object_store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/mod.rs b/crates/deltalake-core/src/operations/mod.rs index 35301f067e..d008e5874a 100644 --- a/crates/deltalake-core/src/operations/mod.rs +++ b/crates/deltalake-core/src/operations/mod.rs @@ -107,60 +107,60 @@ impl DeltaOps { /// ``` #[must_use] pub fn create(self) -> CreateBuilder { - CreateBuilder::default().with_object_store(self.0.object_store()) + CreateBuilder::default().with_log_store(self.0.log_store.clone()) } /// Load data from a DeltaTable #[cfg(feature = "datafusion")] #[must_use] pub fn load(self) -> LoadBuilder { - LoadBuilder::new(self.0.object_store(), self.0.state) + LoadBuilder::new(self.0.log_store.clone(), self.0.state) } /// Write data to Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn write(self, batches: impl IntoIterator) -> WriteBuilder { - WriteBuilder::new(self.0.object_store(), self.0.state).with_input_batches(batches) + WriteBuilder::new(self.0.log_store.clone(), self.0.state).with_input_batches(batches) } /// Vacuum stale files from delta table #[must_use] pub fn vacuum(self) -> VacuumBuilder { - VacuumBuilder::new(self.0.object_store(), self.0.state) + VacuumBuilder::new(self.0.log_store.clone(), self.0.state) } /// Audit active files with files present on the filesystem #[must_use] pub fn filesystem_check(self) -> FileSystemCheckBuilder { - FileSystemCheckBuilder::new(self.0.object_store(), self.0.state) + FileSystemCheckBuilder::new(self.0.log_store.clone(), self.0.state) } /// Audit active files with files present on the filesystem #[cfg(all(feature = "arrow", feature = "parquet"))] #[must_use] pub fn optimize<'a>(self) -> OptimizeBuilder<'a> { - OptimizeBuilder::new(self.0.object_store(), self.0.state) + OptimizeBuilder::new(self.0.log_store.clone(), self.0.state) } /// Delete data from Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn delete(self) -> DeleteBuilder { - DeleteBuilder::new(self.0.object_store(), self.0.state) + DeleteBuilder::new(self.0.log_store.clone(), self.0.state) } /// Update data from Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn update(self) -> UpdateBuilder { - UpdateBuilder::new(self.0.object_store(), self.0.state) + UpdateBuilder::new(self.0.log_store.clone(), self.0.state) } /// Restore delta table to a specified version or datetime #[must_use] pub fn restore(self) -> RestoreBuilder { - RestoreBuilder::new(self.0.object_store(), self.0.state) + RestoreBuilder::new(self.0.log_store.clone(), self.0.state) } /// Update data from Delta table @@ -172,7 +172,7 @@ impl DeltaOps { predicate: E, ) -> MergeBuilder { MergeBuilder::new( - self.0.object_store(), + self.0.log_store.clone(), self.0.state, predicate.into(), source, diff --git a/crates/deltalake-core/src/operations/optimize.rs b/crates/deltalake-core/src/operations/optimize.rs index 7feecd1e56..254ff3ce03 100644 --- a/crates/deltalake-core/src/operations/optimize.rs +++ b/crates/deltalake-core/src/operations/optimize.rs @@ -42,6 +42,7 @@ use super::transaction::commit; use super::writer::{PartitionWriter, PartitionWriterConfig}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, Remove}; +use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; @@ -155,7 +156,7 @@ pub struct OptimizeBuilder<'a> { /// A snapshot of the to-be-optimized table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: ObjectStoreRef, + log_store: LogStoreRef, /// Filters to select specific table partitions to be optimized filters: &'a [PartitionFilter], /// Desired file size after bin-packing files @@ -177,10 +178,10 @@ pub struct OptimizeBuilder<'a> { impl<'a> OptimizeBuilder<'a> { /// Create a new [`OptimizeBuilder`] - pub fn new(store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, filters: &[], target_size: None, writer_properties: None, @@ -274,14 +275,14 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { )?; let metrics = plan .execute( - this.store.clone(), + this.log_store.clone(), &this.snapshot, this.max_concurrent_tasks, this.max_spill_size, this.min_commit_interval, ) .await?; - let mut table = DeltaTable::new_with_state(this.store, this.snapshot); + let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot); table.update().await?; Ok((table, metrics)) }) @@ -584,7 +585,7 @@ impl MergePlan { /// Perform the operations outlined in the plan. pub async fn execute( mut self, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, max_concurrent_tasks: usize, #[allow(unused_variables)] // used behind a feature flag @@ -607,12 +608,13 @@ impl MergePlan { for file in files.iter() { debug!(" file {}", file.location); } - let object_store_ref = object_store.clone(); + let object_store_ref = log_store.clone(); let batch_stream = futures::stream::iter(files.clone()) .then(move |file| { let object_store_ref = object_store_ref.clone(); async move { - let file_reader = ParquetObjectReader::new(object_store_ref, file); + let file_reader = + ParquetObjectReader::new(object_store_ref.object_store(), file); ParquetRecordBatchStreamBuilder::new(file_reader) .await? .build() @@ -625,7 +627,7 @@ impl MergePlan { self.task_parameters.clone(), partition, files, - object_store.clone(), + log_store.object_store().clone(), futures::future::ready(Ok(batch_stream)), )); util::flatten_join_error(rewrite_result) @@ -635,13 +637,15 @@ impl MergePlan { #[cfg(not(feature = "datafusion"))] let exec_context = Arc::new(zorder::ZOrderExecContext::new( zorder_columns, - object_store.clone(), + log_store.object_store().clone(), // If there aren't enough bins to use all threads, then instead // use threads within the bins. This is important for the case where // the table is un-partitioned, in which case the entire table is just // one big bin. bins.len() <= num_cpus::get(), )); + let object_store = log_store.object_store().clone(); + #[cfg(feature = "datafusion")] let exec_context = Arc::new(zorder::ZOrderExecContext::new( zorder_columns, @@ -649,7 +653,6 @@ impl MergePlan { max_spill_size, )?); let task_parameters = self.task_parameters.clone(); - let object_store = object_store.clone(); futures::stream::iter(bins) .map(move |(partition, files)| { let batch_stream = Self::read_zorder(files.clone(), exec_context.clone()); @@ -671,7 +674,7 @@ impl MergePlan { let mut stream = stream.buffer_unordered(max_concurrent_tasks); - let mut table = DeltaTable::new_with_state(object_store.clone(), snapshot.clone()); + let mut table = DeltaTable::new_with_state(log_store.clone(), snapshot.clone()); // Actions buffered so far. These will be flushed either at the end // or when we reach the commit interval. @@ -720,7 +723,7 @@ impl MergePlan { //// TODO: Check for remove actions on optimized partitions. If a //// optimized partition was updated then abort the commit. Requires (#593). commit( - table.object_store().as_ref(), + table.log_store.as_ref(), &actions, self.task_parameters.input_parameters.clone().into(), table.get_state(), diff --git a/crates/deltalake-core/src/operations/restore.rs b/crates/deltalake-core/src/operations/restore.rs index a356b5b312..c391de6f04 100644 --- a/crates/deltalake-core/src/operations/restore.rs +++ b/crates/deltalake-core/src/operations/restore.rs @@ -10,7 +10,8 @@ //! 5) If ignore_missing_files option is false (default value) check availability of AddFile //! in file system. //! 6) Commit Protocol, all RemoveFile and AddFile actions -//! into delta log using `try_commit_transaction` (commit will be failed in case of parallel transaction) +//! into delta log using `LogStore::write_commit_entry` (commit will be failed in case of parallel transaction) +//! TODO: comment is outdated //! 7) If table was modified in parallel then ignore restore and raise exception. //! //! # Example @@ -31,9 +32,9 @@ use object_store::ObjectStore; use serde::Serialize; use crate::kernel::{Action, Add, Protocol, Remove}; -use crate::operations::transaction::{prepare_commit, try_commit_transaction, TransactionError}; +use crate::logstore::LogStoreRef; +use crate::operations::transaction::{prepare_commit, TransactionError}; use crate::protocol::DeltaOperation; -use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableConfig, DeltaTableError, ObjectStoreError}; @@ -74,7 +75,7 @@ pub struct RestoreBuilder { /// A snapshot of the to-be-restored table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: ObjectStoreRef, + log_store: LogStoreRef, /// Version to restore version_to_restore: Option, /// Datetime to restore @@ -87,10 +88,10 @@ pub struct RestoreBuilder { impl RestoreBuilder { /// Create a new [`RestoreBuilder`] - pub fn new(store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, version_to_restore: None, datetime_to_restore: None, ignore_missing_files: false, @@ -125,7 +126,7 @@ impl RestoreBuilder { } async fn execute( - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: DeltaTableState, version_to_restore: Option, datetime_to_restore: Option>, @@ -138,7 +139,7 @@ async fn execute( { return Err(DeltaTableError::from(RestoreError::InvalidRestoreParameter)); } - let mut table = DeltaTable::new(object_store.clone(), DeltaTableConfig::default()); + let mut table = DeltaTable::new(log_store.clone(), DeltaTableConfig::default()); let version = match datetime_to_restore { Some(datetime) => { table.load_with_datetime(datetime).await?; @@ -195,7 +196,7 @@ async fn execute( .collect(); if !ignore_missing_files { - check_files_available(object_store.as_ref(), &files_to_add).await?; + check_files_available(log_store.object_store().as_ref(), &files_to_add).await?; } let metrics = RestoreMetrics { @@ -238,7 +239,7 @@ async fn execute( actions.extend(files_to_remove.into_iter().map(Action::Remove)); let commit = prepare_commit( - object_store.as_ref(), + log_store.object_store().as_ref(), &DeltaOperation::Restore { version: version_to_restore, datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }), @@ -249,13 +250,13 @@ async fn execute( ) .await?; let commit_version = snapshot.version() + 1; - match try_commit_transaction(object_store.as_ref(), &commit, commit_version).await { + match log_store.write_commit_entry(commit_version, &commit).await { Ok(_) => {} Err(err @ TransactionError::VersionAlreadyExists(_)) => { return Err(err.into()); } Err(err) => { - object_store.delete(&commit).await?; + log_store.object_store().delete(&commit).await?; return Err(err.into()); } } @@ -291,7 +292,7 @@ impl std::future::IntoFuture for RestoreBuilder { Box::pin(async move { let metrics = execute( - this.store.clone(), + this.log_store.clone(), this.snapshot.clone(), this.version_to_restore, this.datetime_to_restore, @@ -299,7 +300,7 @@ impl std::future::IntoFuture for RestoreBuilder { this.protocol_downgrade_allowed, ) .await?; - let mut table = DeltaTable::new_with_state(this.store, this.snapshot); + let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot); table.update().await?; Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/transaction/mod.rs b/crates/deltalake-core/src/operations/transaction/mod.rs index c31c349fd7..8b66bd38fd 100644 --- a/crates/deltalake-core/src/operations/transaction/mod.rs +++ b/crates/deltalake-core/src/operations/transaction/mod.rs @@ -10,8 +10,8 @@ use serde_json::Value; use crate::crate_version; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, CommitInfo}; +use crate::logstore::LogStore; use crate::protocol::DeltaOperation; -use crate::storage::commit_uri_from_version; use crate::table::state::DeltaTableState; mod conflict_checker; @@ -130,6 +130,7 @@ pub(crate) fn get_commit_bytes( /// Low-level transaction API. Creates a temporary commit file. Once created, /// the transaction object could be dropped and the actual commit could be executed /// with `DeltaTable.try_commit_transaction`. +/// TODO: comment is outdated now pub(crate) async fn prepare_commit<'a>( storage: &dyn ObjectStore, operation: &DeltaOperation, @@ -150,42 +151,26 @@ pub(crate) async fn prepare_commit<'a>( Ok(path) } -/// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists] -/// if the given `version` already exists. The caller should handle the retry logic itself. -/// This is low-level transaction API. If user does not want to maintain the commit loop then -/// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction` -/// with retry logic. -pub(crate) async fn try_commit_transaction( - storage: &dyn ObjectStore, - tmp_commit: &Path, - version: i64, -) -> Result { - // move temporary commit file to delta log directory - // rely on storage to fail if the file already exists - - storage - .rename_if_not_exists(tmp_commit, &commit_uri_from_version(version)) - .await - .map_err(|err| match err { - ObjectStoreError::AlreadyExists { .. } => { - TransactionError::VersionAlreadyExists(version) - } - _ => TransactionError::from(err), - })?; - Ok(version) -} - /// Commit a transaction, with up to 15 retries. This is higher-level transaction API. /// /// Will error early if the a concurrent transaction has already been committed /// and conflicts with this transaction. pub async fn commit( - storage: &dyn ObjectStore, + log_store: &dyn LogStore, actions: &Vec, operation: DeltaOperation, read_snapshot: &DeltaTableState, app_metadata: Option>, ) -> DeltaResult { - commit_with_retries(storage, actions, operation, read_snapshot, app_metadata, 15).await + commit_with_retries( + log_store, + actions, + operation, + read_snapshot, + app_metadata, + 15, + ) + .await } /// Commit a transaction, with up configurable number of retries. This is higher-level transaction API. @@ -193,24 +178,35 @@ pub async fn commit( /// The function will error early if the a concurrent transaction has already been committed /// and conflicts with this transaction. pub async fn commit_with_retries( - storage: &dyn ObjectStore, + log_store: &dyn LogStore, actions: &Vec, operation: DeltaOperation, read_snapshot: &DeltaTableState, app_metadata: Option>, max_retries: usize, ) -> DeltaResult { - let tmp_commit = - prepare_commit(storage, &operation, actions, read_snapshot, app_metadata).await?; + let tmp_commit = prepare_commit( + log_store.object_store().as_ref(), + &operation, + actions, + read_snapshot, + app_metadata, + ) + .await?; let mut attempt_number = 1; while attempt_number <= max_retries { let version = read_snapshot.version() + attempt_number as i64; - match try_commit_transaction(storage, &tmp_commit, version).await { - Ok(version) => return Ok(version), + match log_store.write_commit_entry(version, &tmp_commit).await { + Ok(()) => return Ok(version), Err(TransactionError::VersionAlreadyExists(version)) => { - let summary = WinningCommitSummary::try_new(storage, version - 1, version).await?; + let summary = WinningCommitSummary::try_new( + log_store.object_store().as_ref(), + version - 1, + version, + ) + .await?; let transaction_info = TransactionInfo::try_new( read_snapshot, operation.read_predicate(), @@ -225,13 +221,13 @@ pub async fn commit_with_retries( attempt_number += 1; } Err(err) => { - storage.delete(&tmp_commit).await?; + log_store.object_store().delete(&tmp_commit).await?; return Err(TransactionError::CommitConflict(err).into()); } }; } Err(err) => { - storage.delete(&tmp_commit).await?; + log_store.object_store().delete(&tmp_commit).await?; return Err(err.into()); } } @@ -242,11 +238,17 @@ pub async fn commit_with_retries( #[cfg(all(test, feature = "parquet"))] mod tests { + use std::{collections::HashMap, sync::Arc}; + use self::test_utils::{create_remove_action, init_table_actions}; use super::*; - use crate::DeltaConfigKey; + use crate::{ + logstore::default_logstore::DefaultLogStore, + storage::{commit_uri_from_version, DeltaObjectStore}, + DeltaConfigKey, + }; use object_store::memory::InMemory; - use std::collections::HashMap; + use url::Url; #[test] fn test_commit_uri_from_version() { @@ -290,18 +292,22 @@ mod tests { #[tokio::test] async fn test_try_commit_transaction() { - let store = InMemory::new(); + let store = Arc::new(InMemory::new()); + let url = Url::parse("mem://what/is/this").unwrap(); + let delta_store = DeltaObjectStore::new(store.clone(), url); + let log_store = DefaultLogStore { + storage: Arc::new(delta_store), + }; let tmp_path = Path::from("_delta_log/tmp"); let version_path = Path::from("_delta_log/00000000000000000000.json"); store.put(&tmp_path, bytes::Bytes::new()).await.unwrap(); store.put(&version_path, bytes::Bytes::new()).await.unwrap(); + let res = log_store.write_commit_entry(0, &tmp_path).await; // fails if file version already exists - let res = try_commit_transaction(&store, &tmp_path, 0).await; assert!(res.is_err()); // succeeds for next version - let res = try_commit_transaction(&store, &tmp_path, 1).await.unwrap(); - assert_eq!(res, 1); + log_store.write_commit_entry(1, &tmp_path).await.unwrap(); } } diff --git a/crates/deltalake-core/src/operations/transaction/test_utils.rs b/crates/deltalake-core/src/operations/transaction/test_utils.rs index b52b1a1c7b..56b0894019 100644 --- a/crates/deltalake-core/src/operations/transaction/test_utils.rs +++ b/crates/deltalake-core/src/operations/transaction/test_utils.rs @@ -1,7 +1,7 @@ #![allow(unused)] use std::collections::HashMap; -use super::{prepare_commit, try_commit_transaction}; +use super::prepare_commit; use crate::kernel::{ Action, Add, CommitInfo, DataType, Metadata, PrimitiveType, Protocol, Remove, StructField, StructType, @@ -121,7 +121,7 @@ pub async fn create_initialized_table( partition_cols: &[String], configuration: Option>>, ) -> DeltaTable { - let storage = DeltaTableBuilder::from_uri("memory://") + let log_store = DeltaTableBuilder::from_uri("memory://") .build_storage() .unwrap(); let table_schema = StructType::new(vec![ @@ -161,11 +161,19 @@ pub async fn create_initialized_table( ), }; let actions = init_table_actions(None); - let prepared_commit = prepare_commit(storage.as_ref(), &operation, &actions, &state, None) - .await - .unwrap(); - try_commit_transaction(storage.as_ref(), &prepared_commit, 0) + let prepared_commit = prepare_commit( + log_store.object_store().as_ref(), + &operation, + &actions, + &state, + None, + ) + .await + .unwrap(); + + log_store + .write_commit_entry(0, &prepared_commit) .await .unwrap(); - DeltaTable::new_with_state(storage, state) + DeltaTable::new_with_state(log_store, state) } diff --git a/crates/deltalake-core/src/operations/update.rs b/crates/deltalake-core/src/operations/update.rs index 1723d287a2..54479a73de 100644 --- a/crates/deltalake-core/src/operations/update.rs +++ b/crates/deltalake-core/src/operations/update.rs @@ -49,8 +49,8 @@ use super::write::write_execution_plan; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; use crate::kernel::{Action, Remove}; +use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableError}; @@ -64,7 +64,7 @@ pub struct UpdateBuilder { /// A snapshot of the table's state snapshot: DeltaTableState, /// Delta object store for handling data files - object_store: Arc, + log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan state: Option, /// Properties passed to underlying parquet writer for when files are rewritten @@ -95,12 +95,12 @@ pub struct UpdateMetrics { impl UpdateBuilder { /// Create a new ['UpdateBuilder'] - pub fn new(object_store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { predicate: None, updates: HashMap::new(), snapshot, - object_store, + log_store, state: None, writer_properties: None, app_metadata: None, @@ -164,7 +164,7 @@ impl UpdateBuilder { async fn execute( predicate: Option, updates: HashMap, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, state: SessionState, writer_properties: Option, @@ -213,7 +213,13 @@ async fn execute( let table_partition_cols = current_metadata.partition_columns.clone(); let scan_start = Instant::now(); - let candidates = find_files(snapshot, object_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files( + snapshot, + log_store.object_store(), + &state, + predicate.clone(), + ) + .await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis() as u64; if candidates.candidates.is_empty() { @@ -225,7 +231,7 @@ async fn execute( let execution_props = state.execution_props(); // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches - let scan = DeltaScanBuilder::new(snapshot, object_store.clone(), &state) + let scan = DeltaScanBuilder::new(snapshot, log_store.object_store().clone(), &state) .with_files(&candidates.candidates) .build() .await?; @@ -357,7 +363,7 @@ async fn execute( state.clone(), projection.clone(), table_partition_cols.clone(), - object_store.clone(), + log_store.object_store().clone(), Some(snapshot.table_config().target_file_size() as usize), None, writer_properties, @@ -407,7 +413,7 @@ async fn execute( predicate: Some(fmt_expr_to_sql(&predicate)?), }; version = commit( - object_store.as_ref(), + log_store.as_ref(), &actions, operation, snapshot, @@ -430,7 +436,7 @@ impl std::future::IntoFuture for UpdateBuilder { let session = SessionContext::new(); // If a user provides their own their DF state then they must register the store themselves - register_store(this.object_store.clone(), session.runtime_env()); + register_store(this.log_store.object_store().clone(), session.runtime_env()); session.state() }); @@ -438,7 +444,7 @@ impl std::future::IntoFuture for UpdateBuilder { let ((actions, version), metrics) = execute( this.predicate, this.updates, - this.object_store.clone(), + this.log_store.clone(), &this.snapshot, state, this.writer_properties, @@ -449,7 +455,7 @@ impl std::future::IntoFuture for UpdateBuilder { this.snapshot .merge(DeltaTableState::from_actions(actions, version)?, true, true); - let table = DeltaTable::new_with_state(this.object_store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/vacuum.rs b/crates/deltalake-core/src/operations/vacuum.rs index 47f7c1d5c9..efdde55347 100644 --- a/crates/deltalake-core/src/operations/vacuum.rs +++ b/crates/deltalake-core/src/operations/vacuum.rs @@ -37,8 +37,8 @@ use super::transaction::commit; use crate::crate_version; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::Action; +use crate::logstore::{LogStore, LogStoreRef}; use crate::protocol::DeltaOperation; -use crate::storage::DeltaObjectStore; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -82,7 +82,7 @@ pub struct VacuumBuilder { /// A snapshot of the to-be-vacuumed table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Period of stale files allowed. retention_period: Option, /// Validate the retention period is not below the retention period configured in the table @@ -125,10 +125,10 @@ pub struct VacuumEndOperationMetrics { /// Methods to specify various vacuum options and to execute the operation impl VacuumBuilder { /// Create a new [`VacuumBuilder`] - pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { VacuumBuilder { snapshot, - store, + log_store, retention_period: None, enforce_retention_duration: true, dry_run: false, @@ -184,8 +184,11 @@ impl VacuumBuilder { let mut files_to_delete = vec![]; let mut file_sizes = vec![]; - let mut all_files = self.store.list(None).await.map_err(DeltaTableError::from)?; - + let object_store = self.log_store.object_store(); + let mut all_files = object_store + .list(None) + .await + .map_err(DeltaTableError::from)?; let partition_columns = &self .snapshot .current_metadata() @@ -227,7 +230,7 @@ impl std::future::IntoFuture for VacuumBuilder { let plan = this.create_vacuum_plan().await?; if this.dry_run { return Ok(( - DeltaTable::new_with_state(this.store, this.snapshot), + DeltaTable::new_with_state(this.log_store, this.snapshot), VacuumMetrics { files_deleted: plan.files_to_delete.iter().map(|f| f.to_string()).collect(), dry_run: true, @@ -235,9 +238,11 @@ impl std::future::IntoFuture for VacuumBuilder { )); } - let metrics = plan.execute(&this.store, &this.snapshot).await?; + let metrics = plan + .execute(this.log_store.as_ref(), &this.snapshot) + .await?; Ok(( - DeltaTable::new_with_state(this.store, this.snapshot), + DeltaTable::new_with_state(this.log_store, this.snapshot), metrics, )) }) @@ -262,7 +267,7 @@ impl VacuumPlan { /// Execute the vacuum plan and delete files from underlying storage pub async fn execute( self, - store: &DeltaObjectStore, + store: &dyn LogStore, snapshot: &DeltaTableState, ) -> Result { if self.files_to_delete.is_empty() { @@ -311,6 +316,7 @@ impl VacuumPlan { .boxed(); let files_deleted = store + .object_store() .delete_stream(locations) .map(|res| match res { Ok(path) => Ok(path.to_string()), @@ -395,7 +401,7 @@ mod tests { async fn vacuum_delta_8_0_table() { let table = open_table("./tests/data/delta-0.8.0").await.unwrap(); - let result = VacuumBuilder::new(table.object_store(), table.state.clone()) + let result = VacuumBuilder::new(table.log_store, table.state.clone()) .with_retention_period(Duration::hours(1)) .with_dry_run(true) .await; @@ -403,7 +409,7 @@ mod tests { assert!(result.is_err()); let table = open_table("./tests/data/delta-0.8.0").await.unwrap(); - let (table, result) = VacuumBuilder::new(table.object_store(), table.state) + let (table, result) = VacuumBuilder::new(table.log_store, table.state) .with_retention_period(Duration::hours(0)) .with_dry_run(true) .with_enforce_retention_duration(false) @@ -415,7 +421,7 @@ mod tests { vec!["part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"] ); - let (table, result) = VacuumBuilder::new(table.object_store(), table.state) + let (table, result) = VacuumBuilder::new(table.log_store, table.state) .with_retention_period(Duration::hours(169)) .with_dry_run(true) .await @@ -432,7 +438,7 @@ mod tests { .as_secs() / 3600; let empty: Vec = Vec::new(); - let (_table, result) = VacuumBuilder::new(table.object_store(), table.state) + let (_table, result) = VacuumBuilder::new(table.log_store, table.state) .with_retention_period(Duration::hours(retention_hours as i64)) .with_dry_run(true) .await diff --git a/crates/deltalake-core/src/operations/write.rs b/crates/deltalake-core/src/operations/write.rs index 45bdaaeff5..46fb579ffc 100644 --- a/crates/deltalake-core/src/operations/write.rs +++ b/crates/deltalake-core/src/operations/write.rs @@ -44,8 +44,9 @@ use super::{transaction::commit, CreateBuilder}; use crate::delta_datafusion::DeltaDataChecker; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, Add, Remove, StructType}; +use crate::logstore::LogStoreRef; use crate::protocol::{DeltaOperation, SaveMode}; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; +use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; use crate::writer::record_batch::divide_by_partition_values; use crate::writer::utils::PartitionPath; @@ -90,7 +91,7 @@ pub struct WriteBuilder { /// A snapshot of the to-be-loaded table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// The input plan input: Option>, /// Datafusion session state relevant for executing the input plan @@ -117,10 +118,10 @@ pub struct WriteBuilder { impl WriteBuilder { /// Create a new [`WriteBuilder`] - pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, input: None, state: None, mode: SaveMode::Append, @@ -210,7 +211,8 @@ impl WriteBuilder { } async fn check_preconditions(&self) -> DeltaResult> { - match self.store.is_delta_table_location().await? { + let object_store = self.log_store.object_store(); + match object_store.is_delta_table_location().await? { true => { let min_writer = self.snapshot.min_writer_version(); if min_writer > MAX_SUPPORTED_WRITER_VERSION { @@ -218,7 +220,7 @@ impl WriteBuilder { } else { match self.mode { SaveMode::ErrorIfExists => { - Err(WriteError::AlreadyExists(self.store.root_uri()).into()) + Err(WriteError::AlreadyExists(object_store.root_uri()).into()) } _ => Ok(vec![]), } @@ -236,7 +238,7 @@ impl WriteBuilder { Err(WriteError::MissingData) }?; let mut builder = CreateBuilder::new() - .with_object_store(self.store.clone()) + .with_log_store(self.log_store.clone()) .with_columns(schema.fields().clone()); if let Some(partition_columns) = self.partition_columns.as_ref() { builder = builder.with_partition_columns(partition_columns.clone()) @@ -356,7 +358,7 @@ impl std::future::IntoFuture for WriteBuilder { let schema = batches[0].schema(); let table_schema = this .snapshot - .physical_arrow_schema(this.store.clone()) + .physical_arrow_schema(this.log_store.object_store().clone()) .await .or_else(|_| this.snapshot.arrow_schema()) .unwrap_or(schema.clone()); @@ -418,7 +420,7 @@ impl std::future::IntoFuture for WriteBuilder { state, plan, partition_columns.clone(), - this.store.clone(), + this.log_store.object_store().clone(), this.target_file_size, this.write_batch_size, this.writer_properties, @@ -468,7 +470,7 @@ impl std::future::IntoFuture for WriteBuilder { }; let version = commit( - this.store.as_ref(), + this.log_store.as_ref(), &actions, DeltaOperation::Write { mode: this.mode, @@ -492,7 +494,7 @@ impl std::future::IntoFuture for WriteBuilder { // TODO should we build checkpoints based on config? - Ok(DeltaTable::new_with_state(this.store, this.snapshot)) + Ok(DeltaTable::new_with_state(this.log_store, this.snapshot)) }) } } diff --git a/crates/deltalake-core/src/operations/writer.rs b/crates/deltalake-core/src/operations/writer.rs index 0bba167e33..6d551ecb96 100644 --- a/crates/deltalake-core/src/operations/writer.rs +++ b/crates/deltalake-core/src/operations/writer.rs @@ -406,9 +406,10 @@ mod tests { #[tokio::test] async fn test_write_partition() { - let object_store = DeltaTableBuilder::from_uri("memory://") + let log_store = DeltaTableBuilder::from_uri("memory://") .build_storage() .unwrap(); + let object_store = log_store.object_store(); let batch = get_record_batch(None, false); // write single un-partitioned batch @@ -439,12 +440,13 @@ mod tests { let object_store = DeltaTableBuilder::from_uri("memory://") .build_storage() - .unwrap(); + .unwrap() + .object_store(); let properties = WriterProperties::builder() .set_max_row_group_size(1024) .build(); // configure small target file size and and row group size so we can observe multiple files written - let mut writer = get_writer(object_store.clone(), &batch, Some(properties), Some(10_000)); + let mut writer = get_writer(object_store, &batch, Some(properties), Some(10_000)); writer.write(&batch).await.unwrap(); // check that we have written more then once file, and no more then 1 is below target size diff --git a/crates/deltalake-core/src/protocol/checkpoints.rs b/crates/deltalake-core/src/protocol/checkpoints.rs index fc23c1d28b..bee45f51bf 100644 --- a/crates/deltalake-core/src/protocol/checkpoints.rs +++ b/crates/deltalake-core/src/protocol/checkpoints.rs @@ -70,7 +70,12 @@ pub const CHECKPOINT_RECORD_BATCH_SIZE: usize = 5000; /// Creates checkpoint at current table version pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), ProtocolError> { - create_checkpoint_for(table.version(), table.get_state(), table.storage.as_ref()).await?; + create_checkpoint_for( + table.version(), + table.get_state(), + table.object_store().as_ref(), + ) + .await?; Ok(()) } @@ -81,7 +86,7 @@ pub async fn cleanup_metadata(table: &DeltaTable) -> Result DeltaResult { + pub fn build_storage(self) -> DeltaResult { match self.options.storage_backend { - Some((storage, location)) => Ok(Arc::new(DeltaObjectStore::new( - storage, - ensure_table_uri(location.as_str())?, - ))), + Some((storage, location)) => Ok(Arc::new(DefaultLogStore { + storage: Arc::new(DeltaObjectStore::new( + storage, + ensure_table_uri(location.as_str())?, + )), + })), None => { let location = ensure_table_uri(&self.options.table_uri)?; - Ok(Arc::new(DeltaObjectStore::try_new( - location, - self.storage_options(), - )?)) + Ok(Arc::new(DefaultLogStore { + storage: Arc::new(DeltaObjectStore::try_new(location, self.storage_options())?), + })) } } } diff --git a/crates/deltalake-core/src/table/mod.rs b/crates/deltalake-core/src/table/mod.rs index 2b011ff608..098bcdfdd4 100644 --- a/crates/deltalake-core/src/table/mod.rs +++ b/crates/deltalake-core/src/table/mod.rs @@ -26,6 +26,7 @@ use crate::kernel::{ Action, Add, CommitInfo, DataType, Format, Metadata, ReaderFeatures, Remove, StructType, WriterFeatures, }; +use crate::logstore::{default_logstore::DefaultLogStore, LogStoreRef}; use crate::partitions::PartitionFilter; use crate::protocol::{ find_latest_check_point_for_version, get_last_checkpoint, ProtocolError, Stats, @@ -249,8 +250,8 @@ pub struct DeltaTable { pub state: DeltaTableState, /// the load options used during load pub config: DeltaTableConfig, - /// object store to access log and data files - pub(crate) storage: ObjectStoreRef, + /// log store + pub(crate) log_store: LogStoreRef, /// file metadata for latest checkpoint last_check_point: Option, /// table versions associated with timestamps @@ -265,7 +266,8 @@ impl Serialize for DeltaTable { let mut seq = serializer.serialize_seq(None)?; seq.serialize_element(&self.state)?; seq.serialize_element(&self.config)?; - seq.serialize_element(self.storage.as_ref())?; + // tp::TODO (de)serialize actual log store instead + seq.serialize_element(self.object_store().as_ref())?; seq.serialize_element(&self.last_check_point)?; seq.serialize_element(&self.version_timestamp)?; seq.end() @@ -296,9 +298,13 @@ impl<'de> Deserialize<'de> for DeltaTable { let config = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?; - let storage = seq - .next_element()? - .ok_or_else(|| A::Error::invalid_length(0, &self))?; + let storage = Arc::new( + seq.next_element()? + .ok_or_else(|| A::Error::invalid_length(0, &self))?, + ); + let log_store = Arc::new(DefaultLogStore { + storage: Arc::clone(&storage), + }); let last_check_point = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?; @@ -309,7 +315,7 @@ impl<'de> Deserialize<'de> for DeltaTable { let table = DeltaTable { state, config, - storage: Arc::new(storage), + log_store, last_check_point, version_timestamp, }; @@ -326,10 +332,10 @@ impl DeltaTable { /// /// NOTE: This is for advanced users. If you don't know why you need to use this method, please /// call one of the `open_table` helper methods instead. - pub fn new(storage: ObjectStoreRef, config: DeltaTableConfig) -> Self { + pub fn new(log_store: LogStoreRef, config: DeltaTableConfig) -> Self { Self { state: DeltaTableState::with_version(-1), - storage, + log_store, config, last_check_point: None, version_timestamp: HashMap::new(), @@ -341,10 +347,10 @@ impl DeltaTable { /// /// NOTE: This is for advanced users. If you don't know why you need to use this method, /// please call one of the `open_table` helper methods instead. - pub(crate) fn new_with_state(storage: ObjectStoreRef, state: DeltaTableState) -> Self { + pub(crate) fn new_with_state(log_store: LogStoreRef, state: DeltaTableState) -> Self { Self { state, - storage, + log_store, config: Default::default(), last_check_point: None, version_timestamp: HashMap::new(), @@ -353,18 +359,24 @@ impl DeltaTable { /// get a shared reference to the delta object store pub fn object_store(&self) -> ObjectStoreRef { - self.storage.clone() + self.log_store.object_store() } /// The URI of the underlying data pub fn table_uri(&self) -> String { - self.storage.root_uri() + self.log_store.object_store().root_uri() + } + + /// get a shared reference to the log store + pub fn log_store(&self) -> LogStoreRef { + self.log_store.clone() } /// Return the list of paths of given checkpoint. pub fn get_checkpoint_data_paths(&self, check_point: &CheckPoint) -> Vec { let checkpoint_prefix = format!("{:020}", check_point.version); - let log_path = self.storage.log_path(); + let object_store = self.object_store(); + let log_path = object_store.log_path(); let mut checkpoint_data_paths = Vec::new(); match check_point.parts { @@ -399,7 +411,8 @@ impl DeltaTable { let mut current_delta_log_ver = i64::MAX; // Get file objects from table. - let mut stream = self.storage.list(Some(self.storage.log_path())).await?; + let storage = self.object_store(); + let mut stream = storage.list(Some(storage.log_path())).await?; while let Some(obj_meta) = stream.next().await { let obj_meta = obj_meta?; @@ -422,54 +435,7 @@ impl DeltaTable { } /// returns the latest available version of the table pub async fn get_latest_version(&mut self) -> Result { - let version_start = match get_last_checkpoint(&self.storage).await { - Ok(last_check_point) => last_check_point.version, - Err(ProtocolError::CheckpointNotFound) => { - // no checkpoint - -1 - } - Err(e) => { - return Err(DeltaTableError::from(e)); - } - }; - - debug!("latest checkpoint version: {version_start}"); - - let version_start = max(self.version(), version_start); - - lazy_static! { - static ref DELTA_LOG_REGEX: Regex = - Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint).*$").unwrap(); - } - - // list files to find max version - let version = async { - let mut max_version: i64 = version_start; - let prefix = Some(self.storage.log_path()); - let offset_path = commit_uri_from_version(max_version); - let mut files = self.storage.list_with_offset(prefix, &offset_path).await?; - - while let Some(obj_meta) = files.next().await { - let obj_meta = obj_meta?; - if let Some(captures) = DELTA_LOG_REGEX.captures(obj_meta.location.as_ref()) { - let log_version = captures.get(1).unwrap().as_str().parse().unwrap(); - // listing may not be ordered - max_version = max(max_version, log_version); - // also cache timestamp for version, for faster time-travel - self.version_timestamp - .insert(log_version, obj_meta.last_modified.timestamp()); - } - } - - if max_version < 0 { - return Err(DeltaTableError::not_a_table(self.table_uri())); - } - - Ok::(max_version) - } - .await?; - - Ok(version) + self.log_store.get_latest_version(self.version()).await } /// Currently loaded version of the table @@ -490,12 +456,12 @@ impl DeltaTable { current_version: i64, ) -> Result { let next_version = current_version + 1; - let commit_uri = commit_uri_from_version(next_version); - let commit_log_bytes = self.storage.get(&commit_uri).await; - let commit_log_bytes = match commit_log_bytes { - Err(ObjectStoreError::NotFound { .. }) => return Ok(PeekCommit::UpToDate), + let commit_log_bytes = match self.log_store.read_commit_entry(next_version).await { + Ok(bytes) => Ok(bytes), + Err(DeltaTableError::ObjectStore { + source: ObjectStoreError::NotFound { .. }, + }) => return Ok(PeekCommit::UpToDate), Err(err) => Err(err), - Ok(result) => result.bytes().await, }?; let actions = Self::get_actions(next_version, commit_log_bytes).await; @@ -529,7 +495,7 @@ impl DeltaTable { /// loading the last checkpoint and incrementally applying each version since. #[cfg(any(feature = "parquet", feature = "parquet2"))] pub async fn update(&mut self) -> Result<(), DeltaTableError> { - match get_last_checkpoint(&self.storage).await { + match get_last_checkpoint(&self.object_store()).await { Ok(last_check_point) => { debug!("update with latest checkpoint {last_check_point:?}"); if Some(last_check_point) == self.last_check_point { @@ -574,13 +540,12 @@ impl DeltaTable { let buf_size = self.config.log_buffer_size; - let store = self.storage.clone(); + let log_store = self.log_store.clone(); let mut log_stream = futures::stream::iter(self.version() + 1..max_version + 1) .map(|version| { - let store = store.clone(); - let loc = commit_uri_from_version(version); + let log_store = log_store.clone(); async move { - let data = store.get(&loc).await?.bytes().await?; + let data = log_store.read_commit_entry(version).await?; let actions = Self::get_actions(version, data).await?; Ok((version, actions)) } @@ -616,7 +581,7 @@ impl DeltaTable { pub async fn load_version(&mut self, version: i64) -> Result<(), DeltaTableError> { // check if version is valid let commit_uri = commit_uri_from_version(version); - match self.storage.head(&commit_uri).await { + match self.object_store().head(&commit_uri).await { Ok(_) => {} Err(ObjectStoreError::NotFound { .. }) => { return Err(DeltaTableError::InvalidVersion(version)); @@ -628,7 +593,7 @@ impl DeltaTable { // 1. find latest checkpoint below version #[cfg(any(feature = "parquet", feature = "parquet2"))] - match find_latest_check_point_for_version(&self.storage, version).await? { + match find_latest_check_point_for_version(&self.object_store(), version).await? { Some(check_point) => { self.restore_checkpoint(check_point).await?; } @@ -652,7 +617,10 @@ impl DeltaTable { match self.version_timestamp.get(&version) { Some(ts) => Ok(*ts), None => { - let meta = self.storage.head(&commit_uri_from_version(version)).await?; + let meta = self + .object_store() + .head(&commit_uri_from_version(version)) + .await?; let ts = meta.last_modified.timestamp(); // also cache timestamp for version self.version_timestamp.insert(version, ts); @@ -746,7 +714,7 @@ impl DeltaTable { let files = self.get_files_by_partitions(filters)?; Ok(files .iter() - .map(|fname| self.storage.to_uri(fname)) + .map(|fname| self.object_store().to_uri(fname)) .collect()) } @@ -771,7 +739,7 @@ impl DeltaTable { pub fn get_file_uris(&self) -> impl Iterator + '_ { self.state .file_paths_iter() - .map(|path| self.storage.to_uri(&path)) + .map(|path| self.object_store().to_uri(&path)) } /// Returns statistics for files, in order diff --git a/crates/deltalake-core/src/table/state.rs b/crates/deltalake-core/src/table/state.rs index 26becd0703..8fa51c55fd 100644 --- a/crates/deltalake-core/src/table/state.rs +++ b/crates/deltalake-core/src/table/state.rs @@ -71,7 +71,7 @@ impl DeltaTableState { /// Construct a delta table state object from commit version. pub async fn from_commit(table: &DeltaTable, version: i64) -> Result { let commit_uri = commit_uri_from_version(version); - let commit_log_bytes = match table.storage.get(&commit_uri).await { + let commit_log_bytes = match table.object_store().get(&commit_uri).await { Ok(get) => Ok(get.bytes().await?), Err(ObjectStoreError::NotFound { .. }) => Err(ProtocolError::EndOfLog), Err(source) => Err(ProtocolError::ObjectStore { source }), @@ -161,7 +161,7 @@ impl DeltaTableState { let mut new_state = Self::with_version(check_point.version); for f in &checkpoint_data_paths { - let obj = table.storage.get(f).await?.bytes().await?; + let obj = table.object_store().get(f).await?.bytes().await?; new_state.process_checkpoint_bytes(obj, &table.config)?; } diff --git a/crates/deltalake-core/src/test_utils.rs b/crates/deltalake-core/src/test_utils.rs index 124ec0365b..76b6c61820 100644 --- a/crates/deltalake-core/src/test_utils.rs +++ b/crates/deltalake-core/src/test_utils.rs @@ -88,6 +88,7 @@ impl IntegrationContext { _ => DeltaTableBuilder::from_uri(store_uri) .with_allow_http(true) .build_storage()? + .object_store() .storage_backend(), }; diff --git a/crates/deltalake-core/src/writer/json.rs b/crates/deltalake-core/src/writer/json.rs index 044ffc20e2..0239dee2f9 100644 --- a/crates/deltalake-core/src/writer/json.rs +++ b/crates/deltalake-core/src/writer/json.rs @@ -195,7 +195,7 @@ impl JsonWriter { .build(); Ok(Self { - storage, + storage: storage.object_store(), arrow_schema_ref: schema, writer_properties, partition_columns: partition_columns.unwrap_or_default(), @@ -218,7 +218,7 @@ impl JsonWriter { .build(); Ok(Self { - storage: table.storage.clone(), + storage: table.object_store(), arrow_schema_ref, writer_properties, partition_columns, diff --git a/crates/deltalake-core/src/writer/mod.rs b/crates/deltalake-core/src/writer/mod.rs index 478a0b11f2..3b73fe2ef6 100644 --- a/crates/deltalake-core/src/writer/mod.rs +++ b/crates/deltalake-core/src/writer/mod.rs @@ -146,7 +146,14 @@ pub trait DeltaWriter { partition_by, predicate: None, }; - let version = commit(table.storage.as_ref(), &adds, operation, &table.state, None).await?; + let version = commit( + table.log_store.as_ref(), + &adds, + operation, + &table.state, + None, + ) + .await?; table.update().await?; Ok(version) } diff --git a/crates/deltalake-core/src/writer/record_batch.rs b/crates/deltalake-core/src/writer/record_batch.rs index b673146907..6e23313860 100644 --- a/crates/deltalake-core/src/writer/record_batch.rs +++ b/crates/deltalake-core/src/writer/record_batch.rs @@ -56,7 +56,8 @@ impl RecordBatchWriter { ) -> Result { let storage = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(storage_options.unwrap_or_default()) - .build_storage()?; + .build_storage()? + .object_store(); // Initialize writer properties for the underlying arrow writer let writer_properties = WriterProperties::builder() @@ -89,7 +90,7 @@ impl RecordBatchWriter { .build(); Ok(Self { - storage: table.storage.clone(), + storage: table.object_store(), arrow_schema_ref, writer_properties, partition_columns, diff --git a/crates/deltalake-core/src/writer/test_utils.rs b/crates/deltalake-core/src/writer/test_utils.rs index d67931c096..1daf9e407b 100644 --- a/crates/deltalake-core/src/writer/test_utils.rs +++ b/crates/deltalake-core/src/writer/test_utils.rs @@ -323,7 +323,7 @@ pub mod datafusion { use std::sync::Arc; pub async fn get_data(table: &DeltaTable) -> Vec { - let table = DeltaTable::new_with_state(table.object_store(), table.state.clone()); + let table = DeltaTable::new_with_state(table.log_store.clone(), table.state.clone()); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(table)).unwrap(); ctx.sql("select * from test") diff --git a/crates/deltalake-core/tests/command_optimize.rs b/crates/deltalake-core/tests/command_optimize.rs index a923d0064d..14f9d4c410 100644 --- a/crates/deltalake-core/tests/command_optimize.rs +++ b/crates/deltalake-core/tests/command_optimize.rs @@ -298,7 +298,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { let operation = DeltaOperation::Delete { predicate: None }; commit( - other_dt.object_store().as_ref(), + other_dt.log_store().as_ref(), &vec![Action::Remove(remove)], operation, &other_dt.state, @@ -306,9 +306,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { ) .await?; - let maybe_metrics = plan - .execute(dt.object_store(), &dt.state, 1, 20, None) - .await; + let maybe_metrics = plan.execute(dt.log_store(), &dt.state, 1, 20, None).await; assert!(maybe_metrics.is_err()); assert_eq!(dt.version(), version + 1); @@ -357,9 +355,7 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box> { ) .await?; - let metrics = plan - .execute(dt.object_store(), &dt.state, 1, 20, None) - .await?; + let metrics = plan.execute(dt.log_store(), &dt.state, 1, 20, None).await?; assert_eq!(metrics.num_files_added, 1); assert_eq!(metrics.num_files_removed, 2); @@ -399,7 +395,7 @@ async fn test_commit_interval() -> Result<(), Box> { let metrics = plan .execute( - dt.object_store(), + dt.log_store(), &dt.state, 1, 20, diff --git a/crates/deltalake-core/tests/command_vacuum.rs b/crates/deltalake-core/tests/command_vacuum.rs index 0007f479d5..51ff3217b3 100644 --- a/crates/deltalake-core/tests/command_vacuum.rs +++ b/crates/deltalake-core/tests/command_vacuum.rs @@ -297,6 +297,6 @@ async fn test_non_managed_files() { async fn is_deleted(context: &mut TestContext, path: &Path) -> bool { let backend = context.get_storage(); - let res = backend.head(path).await; + let res = backend.object_store().head(path).await; matches!(res, Err(ObjectStoreError::NotFound { .. })) } diff --git a/crates/deltalake-core/tests/commit_info_format.rs b/crates/deltalake-core/tests/commit_info_format.rs index de69397e32..a9d05e4c11 100644 --- a/crates/deltalake-core/tests/commit_info_format.rs +++ b/crates/deltalake-core/tests/commit_info_format.rs @@ -22,7 +22,7 @@ async fn test_operational_parameters() -> Result<(), Box> { }; commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, diff --git a/crates/deltalake-core/tests/common/mod.rs b/crates/deltalake-core/tests/common/mod.rs index 80df899323..af2e6e1a7f 100644 --- a/crates/deltalake-core/tests/common/mod.rs +++ b/crates/deltalake-core/tests/common/mod.rs @@ -2,10 +2,10 @@ use bytes::Bytes; use deltalake_core::kernel::{Action, Add, Remove, StructType}; +use deltalake_core::logstore::LogStore; use deltalake_core::operations::create::CreateBuilder; use deltalake_core::operations::transaction::commit; use deltalake_core::protocol::{DeltaOperation, SaveMode}; -use deltalake_core::storage::DeltaObjectStore; use deltalake_core::DeltaTable; use deltalake_core::DeltaTableBuilder; use object_store::{path::Path, ObjectStore}; @@ -28,7 +28,7 @@ pub mod s3; pub struct TestContext { /// The main table under test pub table: Option, - pub backend: Option>, + pub backend: Option>, /// The configuration used to create the backend. pub config: HashMap, /// An object when it is dropped will clean up any temporary resources created for the test @@ -55,7 +55,7 @@ impl TestContext { } } - pub fn get_storage(&mut self) -> Arc { + pub fn get_storage(&mut self) -> Arc { if self.backend.is_none() { self.backend = Some(self.new_storage()) } @@ -63,7 +63,7 @@ impl TestContext { self.backend.as_ref().unwrap().clone() } - fn new_storage(&self) -> Arc { + fn new_storage(&self) -> Arc { let config = self.config.clone(); let uri = config.get("URI").unwrap().to_string(); DeltaTableBuilder::from_uri(uri) @@ -82,9 +82,9 @@ impl TestContext { .iter() .map(|s| s.to_string()) .collect::>(); - let backend = self.new_storage(); + let log_store = self.new_storage(); CreateBuilder::new() - .with_object_store(backend) + .with_log_store(log_store) .with_table_name("delta-rs_test_table") .with_comment("Table created by delta-rs tests") .with_columns(schema.fields().clone()) @@ -149,7 +149,7 @@ pub async fn add_file( }; let actions = vec![Action::Add(add)]; commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, @@ -187,7 +187,7 @@ pub async fn remove_file( let operation = DeltaOperation::Delete { predicate: None }; let actions = vec![Action::Remove(remove)]; commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, diff --git a/crates/deltalake-core/tests/fs_common/mod.rs b/crates/deltalake-core/tests/fs_common/mod.rs index dc9ec2547a..4de3ff1546 100644 --- a/crates/deltalake-core/tests/fs_common/mod.rs +++ b/crates/deltalake-core/tests/fs_common/mod.rs @@ -119,7 +119,7 @@ pub async fn commit_actions( operation: DeltaOperation, ) -> i64 { let version = commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, diff --git a/crates/deltalake-core/tests/integration_checkpoint.rs b/crates/deltalake-core/tests/integration_checkpoint.rs index 9b5b0a73ff..219474765a 100644 --- a/crates/deltalake-core/tests/integration_checkpoint.rs +++ b/crates/deltalake-core/tests/integration_checkpoint.rs @@ -61,7 +61,8 @@ async fn cleanup_metadata_test(context: &IntegrationContext) -> TestResult { let table_uri = context.root_uri(); let object_store = DeltaTableBuilder::from_uri(table_uri) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); let log_path = |version| { object_store diff --git a/crates/deltalake-core/tests/integration_concurrent_writes.rs b/crates/deltalake-core/tests/integration_concurrent_writes.rs index bef44d0693..90dba7659a 100644 --- a/crates/deltalake-core/tests/integration_concurrent_writes.rs +++ b/crates/deltalake-core/tests/integration_concurrent_writes.rs @@ -168,7 +168,7 @@ impl Worker { default_row_commit_version: None, })]; let version = commit( - self.table.object_store().as_ref(), + self.table.log_store().as_ref(), &actions, operation, &self.table.state, diff --git a/crates/deltalake-core/tests/integration_datafusion.rs b/crates/deltalake-core/tests/integration_datafusion.rs index 3476de6839..c9293f187e 100644 --- a/crates/deltalake-core/tests/integration_datafusion.rs +++ b/crates/deltalake-core/tests/integration_datafusion.rs @@ -211,7 +211,7 @@ mod local { // Trying to execute the write from the input plan without providing Datafusion with a session // state containing the referenced object store in the registry results in an error. assert!( - WriteBuilder::new(target_table.object_store(), target_table.state.clone()) + WriteBuilder::new(target_table.log_store(), target_table.state.clone()) .with_input_execution_plan(source_scan.clone()) .await .unwrap_err() @@ -235,11 +235,10 @@ mod local { .register_object_store(source_store_url, Arc::from(source_store)); // Execute write to the target table with the proper state - let target_table = - WriteBuilder::new(target_table.object_store(), target_table.state.clone()) - .with_input_execution_plan(source_scan) - .with_input_session_state(state) - .await?; + let target_table = WriteBuilder::new(target_table.log_store(), target_table.state.clone()) + .with_input_execution_plan(source_scan) + .with_input_session_state(state) + .await?; ctx.register_table("target", Arc::new(target_table))?; // Check results diff --git a/crates/deltalake-core/tests/integration_object_store.rs b/crates/deltalake-core/tests/integration_object_store.rs index 3988cbdb6d..9c5720db85 100644 --- a/crates/deltalake-core/tests/integration_object_store.rs +++ b/crates/deltalake-core/tests/integration_object_store.rs @@ -81,7 +81,8 @@ async fn read_write_test_onelake(integration: StorageIntegration, path: &Path) - let delta_store = DeltaTableBuilder::from_uri(&context.root_uri()) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); //println!("{:#?}",delta_store); @@ -103,7 +104,8 @@ async fn test_object_store(integration: StorageIntegration, skip_copy: bool) -> let context = IntegrationContext::new(integration)?; let delta_store = DeltaTableBuilder::from_uri(context.root_uri()) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); put_get_delete_list(delta_store.as_ref()).await?; list_with_delimiter(delta_store.as_ref()).await?; @@ -482,7 +484,8 @@ async fn test_object_store_prefixes(integration: StorageIntegration) -> TestResu let rooturi = format!("{}/{}", context.root_uri(), prefix); let delta_store = DeltaTableBuilder::from_uri(&rooturi) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); let contents = Bytes::from("cats"); let path = Path::from("test"); diff --git a/crates/deltalake-core/tests/integration_read.rs b/crates/deltalake-core/tests/integration_read.rs index 0e17d34397..a15679a09c 100644 --- a/crates/deltalake-core/tests/integration_read.rs +++ b/crates/deltalake-core/tests/integration_read.rs @@ -155,7 +155,8 @@ async fn verify_store(integration: &IntegrationContext, root_path: &str) -> Test let table_uri = format!("{}/{}", integration.root_uri(), root_path); let storage = DeltaTableBuilder::from_uri(table_uri.clone()) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); let files = storage.list_with_delimiter(None).await?; assert_eq!( diff --git a/crates/deltalake-core/tests/repair_s3_rename_test.rs b/crates/deltalake-core/tests/repair_s3_rename_test.rs index 3157fab896..25eb9704ee 100644 --- a/crates/deltalake-core/tests/repair_s3_rename_test.rs +++ b/crates/deltalake-core/tests/repair_s3_rename_test.rs @@ -117,6 +117,7 @@ fn create_s3_backend( .with_allow_http(true) .build_storage() .unwrap() + .object_store() .storage_backend(); let delayed_store = DelayedObjectStore { diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 1f558b82fe..a8bfb6668a 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -51,7 +51,8 @@ impl DeltaFileSystemHandler { let storage = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(options.clone().unwrap_or_default()) .build_storage() - .map_err(PythonError::from)?; + .map_err(PythonError::from)? + .object_store(); Ok(Self { inner: storage, rt: Arc::new(rt()?), diff --git a/python/src/lib.rs b/python/src/lib.rs index 923a06d159..a58d1f9aa6 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -274,7 +274,7 @@ impl RawDeltaTable { retention_hours: Option, enforce_retention_duration: bool, ) -> PyResult> { - let mut cmd = VacuumBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = VacuumBuilder::new(self._table.log_store(), self._table.state.clone()) .with_enforce_retention_duration(enforce_retention_duration) .with_dry_run(dry_run); if let Some(retention_period) = retention_hours { @@ -296,7 +296,7 @@ impl RawDeltaTable { writer_properties: Option>, safe_cast: bool, ) -> PyResult { - let mut cmd = UpdateBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = UpdateBuilder::new(self._table.log_store(), self._table.state.clone()) .with_safe_cast(safe_cast); if let Some(writer_props) = writer_properties { @@ -349,7 +349,7 @@ impl RawDeltaTable { max_concurrent_tasks: Option, min_commit_interval: Option, ) -> PyResult { - let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = OptimizeBuilder::new(self._table.log_store(), self._table.state.clone()) .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)); if let Some(size) = target_size { cmd = cmd.with_target_size(size); @@ -379,7 +379,7 @@ impl RawDeltaTable { max_spill_size: usize, min_commit_interval: Option, ) -> PyResult { - let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = OptimizeBuilder::new(self._table.log_store(), self._table.state.clone()) .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)) .with_max_spill_size(max_spill_size) .with_type(OptimizeType::ZOrder(z_order_columns)); @@ -446,7 +446,7 @@ impl RawDeltaTable { let source_df = ctx.read_table(table_provider).unwrap(); let mut cmd = MergeBuilder::new( - self._table.object_store(), + self._table.log_store(), self._table.state.clone(), predicate, source_df, @@ -608,7 +608,7 @@ impl RawDeltaTable { ignore_missing_files: bool, protocol_downgrade_allowed: bool, ) -> PyResult { - let mut cmd = RestoreBuilder::new(self._table.object_store(), self._table.state.clone()); + let mut cmd = RestoreBuilder::new(self._table.log_store(), self._table.state.clone()); if let Some(val) = target { if let Ok(version) = val.extract::() { cmd = cmd.with_version_to_restore(version) @@ -822,7 +822,7 @@ impl RawDeltaTable { partition_by: Some(partition_by), predicate: None, }; - let store = self._table.object_store(); + let store = self._table.log_store(); rt()? .block_on(commit( @@ -866,7 +866,7 @@ impl RawDeltaTable { /// Run the delete command on the delta table: delete records following a predicate and return the delete metrics. #[pyo3(signature = (predicate = None))] pub fn delete(&mut self, predicate: Option) -> PyResult { - let mut cmd = DeleteBuilder::new(self._table.object_store(), self._table.state.clone()); + let mut cmd = DeleteBuilder::new(self._table.log_store(), self._table.state.clone()); if let Some(predicate) = predicate { cmd = cmd.with_predicate(predicate); } @@ -881,9 +881,8 @@ impl RawDeltaTable { /// have been deleted or are malformed #[pyo3(signature = (dry_run = true))] pub fn repair(&mut self, dry_run: bool) -> PyResult { - let cmd = - FileSystemCheckBuilder::new(self._table.object_store(), self._table.state.clone()) - .with_dry_run(dry_run); + let cmd = FileSystemCheckBuilder::new(self._table.log_store(), self._table.state.clone()) + .with_dry_run(dry_run); let (table, metrics) = rt()? .block_on(cmd.into_future())