From 947067865a43af624a327db6c5cd557c2c471b46 Mon Sep 17 00:00:00 2001 From: Thomas Peiselt Date: Thu, 9 Nov 2023 08:31:34 +0100 Subject: [PATCH] feat: default logstore implementation (#1742) # Description Introduce a `LogStore` abstraction to channel all log store reads and writes through a single place. This is supposed to allow implementations with more sophisticated locking mechanisms that do not rely on atomic rename semantics for the underlying object store. This does not change any functionality - it reorganizes read operations and commits on the delta commit log to be funneled through the respective methods of `LogStore`. ## Rationale The goal is to align the implementation of multi-cluster writes for Delta Lake on S3 with the one provided by the original `delta` library, enabling multi-cluster writes with some writers using Spark / Delta library and other writers using `delta-rs` For an overview of how it's done in delta, please see: 1. Delta [blog post](https://delta.io/blog/2022-05-18-multi-cluster-writes-to-delta-lake-storage-in-s3/) (high-level concept) 2. Associated Databricks [design doc](https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit#heading=h.mjjuxw9mcz9h) (detailed read) 3. [S3DynamoDbLogStore.java](https://github.com/delta-io/delta/blob/master/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java)(content warning: Java code behind this link) This approach requires readers of a delta table to "recover" unfinished commits from writers - as a result, reading and writing is combined in a single interface, which in this PR is modeled after [LogStore.java](https://github.com/delta-io/delta/blob/master/storage/src/main/java/io/delta/storage/LogStore.java). Currently in `delta-rs`, read path for commits is implemented directly in `DeltaTable`, and there's no mechanism to implement storage-specific behavior like interacting with DynamoDb. --------- Co-authored-by: Robert Pack <42610831+roeap@users.noreply.github.com> --- crates/deltalake-core/Cargo.toml | 2 +- .../src/delta_datafusion/mod.rs | 49 ++- crates/deltalake-core/src/lib.rs | 1 + .../src/logstore/default_logstore.rs | 89 +++++ crates/deltalake-core/src/logstore/mod.rs | 325 ++++++++++++++++ .../deltalake-core/src/operations/create.rs | 33 +- .../deltalake-core/src/operations/delete.rs | 28 +- .../src/operations/filesystem_check.rs | 24 +- crates/deltalake-core/src/operations/load.rs | 10 +- crates/deltalake-core/src/operations/merge.rs | 22 +- crates/deltalake-core/src/operations/mod.rs | 25 +- .../deltalake-core/src/operations/optimize.rs | 26 +- .../deltalake-core/src/operations/restore.rs | 29 +- .../src/operations/transaction/mod.rs | 90 +++-- .../src/operations/transaction/test_utils.rs | 22 +- .../deltalake-core/src/operations/update.rs | 24 +- .../deltalake-core/src/operations/vacuum.rs | 34 +- crates/deltalake-core/src/operations/write.rs | 23 +- .../deltalake-core/src/operations/writer.rs | 8 +- .../src/protocol/checkpoints.rs | 28 +- crates/deltalake-core/src/protocol/mod.rs | 12 +- crates/deltalake-core/src/storage/config.rs | 15 +- crates/deltalake-core/src/storage/mod.rs | 358 +----------------- crates/deltalake-core/src/storage/utils.rs | 2 +- crates/deltalake-core/src/table/builder.rs | 25 +- crates/deltalake-core/src/table/mod.rs | 120 +++--- crates/deltalake-core/src/table/state.rs | 4 +- crates/deltalake-core/src/test_utils.rs | 2 +- crates/deltalake-core/src/writer/json.rs | 8 +- crates/deltalake-core/src/writer/mod.rs | 9 +- .../deltalake-core/src/writer/record_batch.rs | 9 +- .../deltalake-core/src/writer/test_utils.rs | 2 +- .../deltalake-core/tests/command_optimize.rs | 12 +- crates/deltalake-core/tests/command_vacuum.rs | 2 +- .../tests/commit_info_format.rs | 2 +- crates/deltalake-core/tests/common/mod.rs | 16 +- crates/deltalake-core/tests/fs_common/mod.rs | 11 +- .../tests/integration_checkpoint.rs | 15 +- .../tests/integration_concurrent_writes.rs | 2 +- .../tests/integration_datafusion.rs | 17 +- .../tests/integration_object_store.rs | 9 +- .../deltalake-core/tests/integration_read.rs | 3 +- .../tests/repair_s3_rename_test.rs | 2 +- python/src/filesystem.rs | 3 +- python/src/lib.rs | 21 +- 45 files changed, 826 insertions(+), 747 deletions(-) create mode 100644 crates/deltalake-core/src/logstore/default_logstore.rs create mode 100644 crates/deltalake-core/src/logstore/mod.rs diff --git a/crates/deltalake-core/Cargo.toml b/crates/deltalake-core/Cargo.toml index b3a6178203..9fa259fa39 100644 --- a/crates/deltalake-core/Cargo.toml +++ b/crates/deltalake-core/Cargo.toml @@ -117,7 +117,7 @@ sqlparser = { version = "0.38", optional = true } fs_extra = { version = "1.3.0", optional = true } tempdir = { version = "0", optional = true } -dynamodb_lock = { version = "0.6.0", default-features = false, optional = true } +dynamodb_lock = { version = "0.6", default-features = false, optional = true } [dev-dependencies] dotenvy = "0" diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index 0147c250f9..38bf135739 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -71,8 +71,8 @@ use url::Url; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Add, DataType as DeltaDataType, Invariant, PrimitiveType}; +use crate::logstore::LogStoreRef; use crate::protocol::{self}; -use crate::storage::ObjectStoreRef; use crate::table::builder::ensure_table_uri; use crate::table::state::DeltaTableState; use crate::{open_table, open_table_with_storage_options, DeltaTable}; @@ -357,10 +357,10 @@ impl PruningStatistics for DeltaTable { // each delta table must register a specific object store, since paths are internally // handled relative to the table root. -pub(crate) fn register_store(store: ObjectStoreRef, env: Arc) { +pub(crate) fn register_store(store: LogStoreRef, env: Arc) { let object_store_url = store.object_store_url(); let url: &Url = object_store_url.as_ref(); - env.register_object_store(url, store); + env.register_object_store(url, store.object_store()); } pub(crate) fn logical_schema( @@ -467,7 +467,7 @@ pub struct DeltaScanConfig { #[derive(Debug)] pub(crate) struct DeltaScanBuilder<'a> { snapshot: &'a DeltaTableState, - object_store: ObjectStoreRef, + log_store: LogStoreRef, filter: Option, state: &'a SessionState, projection: Option<&'a Vec>, @@ -480,12 +480,12 @@ pub(crate) struct DeltaScanBuilder<'a> { impl<'a> DeltaScanBuilder<'a> { pub fn new( snapshot: &'a DeltaTableState, - object_store: ObjectStoreRef, + log_store: LogStoreRef, state: &'a SessionState, ) -> Self { DeltaScanBuilder { snapshot, - object_store, + log_store, filter: None, state, files: None, @@ -532,7 +532,7 @@ impl<'a> DeltaScanBuilder<'a> { Some(schema) => schema, None => { self.snapshot - .physical_arrow_schema(self.object_store.clone()) + .physical_arrow_schema(self.log_store.object_store()) .await? } }; @@ -632,7 +632,7 @@ impl<'a> DeltaScanBuilder<'a> { .create_physical_plan( self.state, FileScanConfig { - object_store_url: self.object_store.object_store_url(), + object_store_url: self.log_store.object_store_url(), file_schema, file_groups: file_groups.into_values().collect(), statistics: self.snapshot.datafusion_table_statistics(), @@ -647,9 +647,7 @@ impl<'a> DeltaScanBuilder<'a> { .await?; Ok(DeltaScan { - table_uri: ensure_table_uri(self.object_store.root_uri())? - .as_str() - .into(), + table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(), parquet_scan: scan, config, logical_schema, @@ -686,10 +684,10 @@ impl TableProvider for DeltaTable { filters: &[Expr], limit: Option, ) -> DataFusionResult> { - register_store(self.object_store(), session.runtime_env().clone()); + register_store(self.log_store(), session.runtime_env().clone()); let filter_expr = conjunction(filters.iter().cloned()); - let scan = DeltaScanBuilder::new(&self.state, self.object_store(), session) + let scan = DeltaScanBuilder::new(&self.state, self.log_store(), session) .with_projection(projection) .with_limit(limit) .with_filter(filter_expr) @@ -714,7 +712,7 @@ impl TableProvider for DeltaTable { /// A Delta table provider that enables additional metadata columns to be included during the scan pub struct DeltaTableProvider { snapshot: DeltaTableState, - store: ObjectStoreRef, + log_store: LogStoreRef, config: DeltaScanConfig, schema: Arc, } @@ -723,13 +721,13 @@ impl DeltaTableProvider { /// Build a DeltaTableProvider pub fn try_new( snapshot: DeltaTableState, - store: ObjectStoreRef, + log_store: LogStoreRef, config: DeltaScanConfig, ) -> DeltaResult { Ok(DeltaTableProvider { schema: logical_schema(&snapshot, &config)?, snapshot, - store, + log_store, config, }) } @@ -764,10 +762,10 @@ impl TableProvider for DeltaTableProvider { filters: &[Expr], limit: Option, ) -> DataFusionResult> { - register_store(self.store.clone(), session.runtime_env().clone()); + register_store(self.log_store.clone(), session.runtime_env().clone()); let filter_expr = conjunction(filters.iter().cloned()); - let scan = DeltaScanBuilder::new(&self.snapshot, self.store.clone(), session) + let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session) .with_projection(projection) .with_limit(limit) .with_filter(filter_expr) @@ -1462,7 +1460,7 @@ fn join_batches_with_add_actions( /// Determine which files contain a record that statisfies the predicate pub(crate) async fn find_files_scan<'a>( snapshot: &DeltaTableState, - store: ObjectStoreRef, + log_store: LogStoreRef, state: &SessionState, expression: Expr, ) -> DeltaResult> { @@ -1489,7 +1487,7 @@ pub(crate) async fn find_files_scan<'a>( // Add path column used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?); - let scan = DeltaScanBuilder::new(snapshot, store.clone(), state) + let scan = DeltaScanBuilder::new(snapshot, log_store, state) .with_filter(Some(expression.clone())) .with_projection(Some(&used_columns)) .with_scan_config(scan_config) @@ -1580,7 +1578,7 @@ pub(crate) async fn scan_memory_table( /// Finds files in a snapshot that match the provided predicate. pub async fn find_files<'a>( snapshot: &DeltaTableState, - object_store: ObjectStoreRef, + log_store: LogStoreRef, state: &SessionState, predicate: Option, ) -> DeltaResult { @@ -1608,8 +1606,7 @@ pub async fn find_files<'a>( }) } else { let candidates = - find_files_scan(snapshot, object_store.clone(), state, predicate.to_owned()) - .await?; + find_files_scan(snapshot, log_store, state, predicate.to_owned()).await?; Ok(FindFiles { candidates, @@ -1924,7 +1921,8 @@ mod tests { .build(&table.state) .unwrap(); - let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap(); + let log_store = table.log_store(); + let provider = DeltaTableProvider::try_new(table.state, log_store, config).unwrap(); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(provider)).unwrap(); @@ -1983,7 +1981,8 @@ mod tests { let config = DeltaScanConfigBuilder::new().build(&table.state).unwrap(); - let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap(); + let log_store = table.log_store(); + let provider = DeltaTableProvider::try_new(table.state, log_store, config).unwrap(); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(provider)).unwrap(); diff --git a/crates/deltalake-core/src/lib.rs b/crates/deltalake-core/src/lib.rs index d683b906dd..644da2dcac 100644 --- a/crates/deltalake-core/src/lib.rs +++ b/crates/deltalake-core/src/lib.rs @@ -85,6 +85,7 @@ compile_error!( pub mod data_catalog; pub mod errors; pub mod kernel; +pub mod logstore; pub mod operations; pub mod protocol; pub mod schema; diff --git a/crates/deltalake-core/src/logstore/default_logstore.rs b/crates/deltalake-core/src/logstore/default_logstore.rs new file mode 100644 index 0000000000..715d810535 --- /dev/null +++ b/crates/deltalake-core/src/logstore/default_logstore.rs @@ -0,0 +1,89 @@ +//! Default implementation of [`LogStore`] for storage backends with atomic put-if-absent operation + +use std::sync::Arc; + +use bytes::Bytes; +#[cfg(feature = "datafusion")] +use datafusion::execution::object_store::ObjectStoreUrl; +use object_store::{path::Path, ObjectStore}; +use url::Url; + +use super::{LogStore, LogStoreConfig}; +use crate::{ + operations::transaction::TransactionError, + storage::{ + config::{self, StorageOptions}, + ObjectStoreRef, + }, + DeltaResult, +}; + +/// Default [`LogStore`] implementation +#[derive(Debug, Clone)] +pub struct DefaultLogStore { + pub(crate) storage: Arc, + config: LogStoreConfig, +} + +impl DefaultLogStore { + /// Create a new instance of [`DefaultLogStore`] + /// + /// # Arguments + /// + /// * `storage` - A shared reference to an [`object_store::ObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located). + /// * `location` - A url corresponding to the storage location of `storage`. + pub fn new(storage: ObjectStoreRef, config: LogStoreConfig) -> Self { + Self { storage, config } + } + + /// Create log store + pub fn try_new(location: Url, options: impl Into + Clone) -> DeltaResult { + let mut options = options.into(); + let storage = config::configure_store(&location, &mut options)?; + Ok(Self { + storage: Arc::new(storage), + config: LogStoreConfig { location, options }, + }) + } +} + +#[async_trait::async_trait] +impl LogStore for DefaultLogStore { + async fn read_commit_entry(&self, version: i64) -> DeltaResult { + super::read_commit_entry(self.storage.as_ref(), version).await + } + + /// Tries to commit a prepared commit file. Returns [`TransactionError`] + /// if the given `version` already exists. The caller should handle the retry logic itself. + /// This is low-level transaction API. If user does not want to maintain the commit loop then + /// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction` + /// with retry logic. + async fn write_commit_entry( + &self, + version: i64, + tmp_commit: &Path, + ) -> Result<(), TransactionError> { + super::write_commit_entry(self.storage.as_ref(), version, tmp_commit).await + } + + async fn get_latest_version(&self, current_version: i64) -> DeltaResult { + super::get_latest_version(self, current_version).await + } + + fn object_store(&self) -> Arc { + self.storage.clone() + } + + fn to_uri(&self, location: &Path) -> String { + super::to_uri(&self.config.location, location) + } + + #[cfg(feature = "datafusion")] + fn object_store_url(&self) -> ObjectStoreUrl { + super::object_store_url(&self.config.location) + } + + fn config(&self) -> &LogStoreConfig { + &self.config + } +} diff --git a/crates/deltalake-core/src/logstore/mod.rs b/crates/deltalake-core/src/logstore/mod.rs new file mode 100644 index 0000000000..7f1009b1de --- /dev/null +++ b/crates/deltalake-core/src/logstore/mod.rs @@ -0,0 +1,325 @@ +//! Delta log store. +use futures::StreamExt; +use lazy_static::lazy_static; +use regex::Regex; +use serde::{ + de::{Error, SeqAccess, Visitor}, + ser::SerializeSeq, + Deserialize, Serialize, +}; +use std::{cmp::max, collections::HashMap, sync::Arc}; +use url::Url; + +use crate::{ + errors::DeltaResult, + operations::transaction::TransactionError, + protocol::{get_last_checkpoint, ProtocolError}, + storage::{commit_uri_from_version, config::StorageOptions}, + DeltaTableError, +}; +use bytes::Bytes; +use log::debug; +use object_store::{ + path::Path, Error as ObjectStoreError, ObjectStore, Result as ObjectStoreResult, +}; + +#[cfg(feature = "datafusion")] +use datafusion::datasource::object_store::ObjectStoreUrl; + +pub mod default_logstore; + +/// Sharable reference to [`LogStore`] +pub type LogStoreRef = Arc; + +lazy_static! { + static ref DELTA_LOG_PATH: Path = Path::from("_delta_log"); +} + +/// Configuration parameters for a log store +#[derive(Debug, Clone)] +pub struct LogStoreConfig { + /// url corresponding to the storage location. + pub location: Url, + /// Options used for configuring backend storage + pub options: StorageOptions, +} + +/// Trait for critical operations required to read and write commit entries in Delta logs. +/// +/// The correctness is predicated on the atomicity and durability guarantees of +/// the implementation of this interface. Specifically, +/// +/// - Atomic visibility: Any commit created via `write_commit_entry` must become visible atomically. +/// - Mutual exclusion: Only one writer must be able to create a commit for a specific version. +/// - Consistent listing: Once a commit entry for version `v` has been written, any future call to +/// `get_latest_version` must return a version >= `v`, i.e. the underlying file system entry must +/// become visible immediately. +#[async_trait::async_trait] +pub trait LogStore: Sync + Send { + /// Read data for commit entry with the given version. + async fn read_commit_entry(&self, version: i64) -> DeltaResult; + + /// Write list of actions as delta commit entry for given version. + /// + /// This operation can be retried with a higher version in case the write + /// fails with [`TransactionError::VersionAlreadyExists`]. + async fn write_commit_entry( + &self, + version: i64, + tmp_commit: &Path, + ) -> Result<(), TransactionError>; + + /// Find latest version currently stored in the delta log. + async fn get_latest_version(&self, start_version: i64) -> DeltaResult; + + /// Get underlying object store. + fn object_store(&self) -> Arc; + + /// [Path] to Delta log + fn to_uri(&self, location: &Path) -> String; + + /// Get fully qualified uri for table root + fn root_uri(&self) -> String { + self.to_uri(&Path::from("")) + } + + /// [Path] to Delta log + fn log_path(&self) -> &Path { + &DELTA_LOG_PATH + } + + /// Check if the location is a delta table location + async fn is_delta_table_location(&self) -> ObjectStoreResult { + // TODO We should really be using HEAD here, but this fails in windows tests + let object_store = self.object_store(); + let mut stream = object_store.list(Some(self.log_path())).await?; + if let Some(res) = stream.next().await { + match res { + Ok(_) => Ok(true), + Err(ObjectStoreError::NotFound { .. }) => Ok(false), + Err(err) => Err(err), + } + } else { + Ok(false) + } + } + + #[cfg(feature = "datafusion")] + /// Generate a unique enough url to identify the store in datafusion. + /// The DF object store registry only cares about the scheme and the host of the url for + /// registering/fetching. In our case the scheme is hard-coded to "delta-rs", so to get a unique + /// host we convert the location from this `LogStore` to a valid name, combining the + /// original scheme, host and path with invalid characters replaced. + fn object_store_url(&self) -> ObjectStoreUrl; + + /// Get configuration representing configured log store. + fn config(&self) -> &LogStoreConfig; +} + +// TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders +impl std::fmt::Debug for dyn LogStore + '_ { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "LogStore({})", self.root_uri()) + } +} + +impl Serialize for LogStoreConfig { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut seq = serializer.serialize_seq(None)?; + seq.serialize_element(&self.location.to_string())?; + seq.serialize_element(&self.options.0)?; + seq.end() + } +} + +impl<'de> Deserialize<'de> for LogStoreConfig { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct LogStoreConfigVisitor {} + + impl<'de> Visitor<'de> for LogStoreConfigVisitor { + type Value = LogStoreConfig; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("struct LogStoreConfig") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: SeqAccess<'de>, + { + let location_str: String = seq + .next_element()? + .ok_or_else(|| A::Error::invalid_length(0, &self))?; + let options: HashMap = seq + .next_element()? + .ok_or_else(|| A::Error::invalid_length(0, &self))?; + let location = Url::parse(&location_str).unwrap(); + Ok(LogStoreConfig { + location, + options: options.into(), + }) + } + } + + deserializer.deserialize_seq(LogStoreConfigVisitor {}) + } +} + +lazy_static! { + static ref DELTA_LOG_REGEX: Regex = Regex::new(r"(\d{20})\.(json|checkpoint).*$").unwrap(); +} + +fn to_uri(root: &Url, location: &Path) -> String { + match root.scheme() { + "file" => { + #[cfg(windows)] + let uri = format!( + "{}/{}", + root.as_ref().trim_end_matches('/'), + location.as_ref() + ) + .replace("file:///", ""); + #[cfg(unix)] + let uri = format!( + "{}/{}", + root.as_ref().trim_end_matches('/'), + location.as_ref() + ) + .replace("file://", ""); + uri + } + _ => { + if location.as_ref().is_empty() || location.as_ref() == "/" { + root.as_ref().to_string() + } else { + format!("{}/{}", root.as_ref(), location.as_ref()) + } + } + } +} + +#[cfg(feature = "datafusion")] +fn object_store_url(location: &Url) -> ObjectStoreUrl { + // we are certain, that the URL can be parsed, since + // we make sure when we are parsing the table uri + + use object_store::path::DELIMITER; + ObjectStoreUrl::parse(format!( + "delta-rs://{}-{}{}", + location.scheme(), + location.host_str().unwrap_or("-"), + location.path().replace(DELIMITER, "-").replace(':', "-") + )) + .expect("Invalid object store url.") +} + +/// Extract version from a file name in the delta log +pub fn extract_version_from_filename(name: &str) -> Option { + DELTA_LOG_REGEX + .captures(name) + .map(|captures| captures.get(1).unwrap().as_str().parse().unwrap()) +} + +async fn get_latest_version(log_store: &dyn LogStore, current_version: i64) -> DeltaResult { + let version_start = match get_last_checkpoint(log_store).await { + Ok(last_check_point) => last_check_point.version, + Err(ProtocolError::CheckpointNotFound) => { + // no checkpoint + -1 + } + Err(e) => { + return Err(DeltaTableError::from(e)); + } + }; + + debug!("latest checkpoint version: {version_start}"); + + let version_start = max(current_version, version_start); + + // list files to find max version + let version = async { + let mut max_version: i64 = version_start; + let prefix = Some(log_store.log_path()); + let offset_path = commit_uri_from_version(max_version); + let object_store = log_store.object_store(); + let mut files = object_store.list_with_offset(prefix, &offset_path).await?; + + while let Some(obj_meta) = files.next().await { + let obj_meta = obj_meta?; + if let Some(log_version) = extract_version_from_filename(obj_meta.location.as_ref()) { + max_version = max(max_version, log_version); + // also cache timestamp for version, for faster time-travel + // TODO: temporarily disabled because `version_timestamp` is not available in the [`LogStore`] + // self.version_timestamp + // .insert(log_version, obj_meta.last_modified.timestamp()); + } + } + + if max_version < 0 { + return Err(DeltaTableError::not_a_table(log_store.root_uri())); + } + + Ok::(max_version) + } + .await?; + Ok(version) +} + +async fn read_commit_entry(storage: &dyn ObjectStore, version: i64) -> DeltaResult { + let commit_uri = commit_uri_from_version(version); + let data = storage.get(&commit_uri).await?.bytes().await?; + Ok(data) +} + +async fn write_commit_entry( + storage: &dyn ObjectStore, + version: i64, + tmp_commit: &Path, +) -> Result<(), TransactionError> { + // move temporary commit file to delta log directory + // rely on storage to fail if the file already exists - + storage + .rename_if_not_exists(tmp_commit, &commit_uri_from_version(version)) + .await + .map_err(|err| -> TransactionError { + match err { + ObjectStoreError::AlreadyExists { .. } => { + TransactionError::VersionAlreadyExists(version) + } + _ => TransactionError::from(err), + } + })?; + Ok(()) +} + +#[cfg(feature = "datafusion")] +#[cfg(test)] +mod tests { + use url::Url; + + #[tokio::test] + async fn test_unique_object_store_url() { + for (location_1, location_2) in [ + // Same scheme, no host, different path + ("file:///path/to/table_1", "file:///path/to/table_2"), + // Different scheme/host, same path + ("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"), + // Same scheme, different host, same path + ("s3://bucket_1/table_1", "s3://bucket_2/table_1"), + ] { + let url_1 = Url::parse(location_1).unwrap(); + let url_2 = Url::parse(location_2).unwrap(); + + assert_ne!( + super::object_store_url(&url_1).as_str(), + super::object_store_url(&url_2).as_str(), + ); + } + } +} diff --git a/crates/deltalake-core/src/operations/create.rs b/crates/deltalake-core/src/operations/create.rs index 1dc9fdf8b2..71398faf97 100644 --- a/crates/deltalake-core/src/operations/create.rs +++ b/crates/deltalake-core/src/operations/create.rs @@ -11,8 +11,8 @@ use super::transaction::commit; use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, DataType, Metadata, Protocol, StructField, StructType}; +use crate::logstore::{LogStore, LogStoreRef}; use crate::protocol::{DeltaOperation, SaveMode}; -use crate::storage::DeltaObjectStore; use crate::table::builder::ensure_table_uri; use crate::table::config::DeltaConfigKey; use crate::table::DeltaTableMetaData; @@ -55,7 +55,7 @@ pub struct CreateBuilder { partition_columns: Option>, storage_options: Option>, actions: Vec, - object_store: Option>, + log_store: Option, configuration: HashMap>, metadata: Option>, } @@ -78,7 +78,7 @@ impl CreateBuilder { partition_columns: None, storage_options: None, actions: Default::default(), - object_store: None, + log_store: None, configuration: Default::default(), metadata: Default::default(), } @@ -198,9 +198,9 @@ impl CreateBuilder { self } - /// Provide a [`DeltaObjectStore`] instance, that points at table location - pub fn with_object_store(mut self, object_store: Arc) -> Self { - self.object_store = Some(object_store); + /// Provide a [`LogStore`] instance, that points at table location + pub fn with_log_store(mut self, log_store: Arc) -> Self { + self.log_store = Some(log_store); self } @@ -219,12 +219,10 @@ impl CreateBuilder { return Err(CreateError::MissingSchema.into()); } - let (storage_url, table) = if let Some(object_store) = self.object_store { + let (storage_url, table) = if let Some(log_store) = self.log_store { ( - ensure_table_uri(object_store.root_uri())? - .as_str() - .to_string(), - DeltaTable::new(object_store, Default::default()), + ensure_table_uri(log_store.root_uri())?.as_str().to_string(), + DeltaTable::new(log_store, Default::default()), ) } else { let storage_url = ensure_table_uri(self.location.ok_or(CreateError::MissingLocation)?)?; @@ -293,7 +291,8 @@ impl std::future::IntoFuture for CreateBuilder { Box::pin(async move { let mode = this.mode.clone(); let (mut table, actions, operation) = this.into_table_and_actions()?; - let table_state = if table.object_store().is_delta_table_location().await? { + let log_store = table.log_store(); + let table_state = if log_store.is_delta_table_location().await? { match mode { SaveMode::ErrorIfExists => return Err(CreateError::TableAlreadyExists.into()), SaveMode::Append => return Err(CreateError::AppendNotAllowed.into()), @@ -311,7 +310,7 @@ impl std::future::IntoFuture for CreateBuilder { }; let version = commit( - table.object_store().as_ref(), + table.log_store.as_ref(), &actions, operation, table_state, @@ -443,11 +442,11 @@ mod tests { assert_eq!(table.version(), 0); let first_id = table.get_metadata().unwrap().id.clone(); - let object_store = table.object_store(); + let log_store = table.log_store; // Check an error is raised when a table exists at location let table = CreateBuilder::new() - .with_object_store(object_store.clone()) + .with_log_store(log_store.clone()) .with_columns(schema.fields().clone()) .with_save_mode(SaveMode::ErrorIfExists) .await; @@ -455,7 +454,7 @@ mod tests { // Check current table is returned when ignore option is chosen. let table = CreateBuilder::new() - .with_object_store(object_store.clone()) + .with_log_store(log_store.clone()) .with_columns(schema.fields().clone()) .with_save_mode(SaveMode::Ignore) .await @@ -464,7 +463,7 @@ mod tests { // Check table is overwritten let table = CreateBuilder::new() - .with_object_store(object_store.clone()) + .with_log_store(log_store) .with_columns(schema.fields().iter().cloned()) .with_save_mode(SaveMode::Overwrite) .await diff --git a/crates/deltalake-core/src/operations/delete.rs b/crates/deltalake-core/src/operations/delete.rs index 98e4bd1ebe..bd361c9707 100644 --- a/crates/deltalake-core/src/operations/delete.rs +++ b/crates/deltalake-core/src/operations/delete.rs @@ -21,6 +21,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Instant, SystemTime, UNIX_EPOCH}; +use crate::logstore::LogStoreRef; use datafusion::execution::context::{SessionContext, SessionState}; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_plan::filter::FilterExec; @@ -40,7 +41,6 @@ use crate::kernel::{Action, Add, Remove}; use crate::operations::transaction::commit; use crate::operations::write::write_execution_plan; use crate::protocol::DeltaOperation; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -54,7 +54,7 @@ pub struct DeleteBuilder { /// A snapshot of the table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan state: Option, /// Properties passed to underlying parquet writer for when files are rewritten @@ -84,11 +84,11 @@ pub struct DeleteMetrics { impl DeleteBuilder { /// Create a new [`DeleteBuilder`] - pub fn new(object_store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { predicate: None, snapshot, - store: object_store, + log_store, state: None, app_metadata: None, writer_properties: None, @@ -125,7 +125,7 @@ impl DeleteBuilder { async fn excute_non_empty_expr( snapshot: &DeltaTableState, - object_store: ObjectStoreRef, + log_store: LogStoreRef, state: &SessionState, expression: &Expr, metrics: &mut DeleteMetrics, @@ -144,7 +144,7 @@ async fn excute_non_empty_expr( .partition_columns .clone(); - let scan = DeltaScanBuilder::new(snapshot, object_store.clone(), state) + let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), state) .with_files(rewrite) .build() .await?; @@ -167,7 +167,7 @@ async fn excute_non_empty_expr( state.clone(), filter.clone(), table_partition_cols.clone(), - object_store.clone(), + log_store.object_store(), Some(snapshot.table_config().target_file_size() as usize), None, writer_properties, @@ -187,7 +187,7 @@ async fn excute_non_empty_expr( async fn execute( predicate: Option, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, state: SessionState, writer_properties: Option, @@ -197,7 +197,7 @@ async fn execute( let mut metrics = DeleteMetrics::default(); let scan_start = Instant::now(); - let candidates = find_files(snapshot, object_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_micros(); let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); @@ -208,7 +208,7 @@ async fn execute( let write_start = Instant::now(); let add = excute_non_empty_expr( snapshot, - object_store.clone(), + log_store.clone(), &state, &predicate, &mut metrics, @@ -254,7 +254,7 @@ async fn execute( predicate: Some(fmt_expr_to_sql(&predicate)?), }; version = commit( - object_store.as_ref(), + log_store.as_ref(), &actions, operation, snapshot, @@ -278,7 +278,7 @@ impl std::future::IntoFuture for DeleteBuilder { let session = SessionContext::new(); // If a user provides their own their DF state then they must register the store themselves - register_store(this.store.clone(), session.runtime_env()); + register_store(this.log_store.clone(), session.runtime_env()); session.state() }); @@ -295,7 +295,7 @@ impl std::future::IntoFuture for DeleteBuilder { let ((actions, version), metrics) = execute( predicate, - this.store.clone(), + this.log_store.clone(), &this.snapshot, state, this.writer_properties, @@ -305,7 +305,7 @@ impl std::future::IntoFuture for DeleteBuilder { this.snapshot .merge(DeltaTableState::from_actions(actions, version)?, true, true); - let table = DeltaTable::new_with_state(this.store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/filesystem_check.rs b/crates/deltalake-core/src/operations/filesystem_check.rs index 31b261c4cb..b79f22b1f4 100644 --- a/crates/deltalake-core/src/operations/filesystem_check.rs +++ b/crates/deltalake-core/src/operations/filesystem_check.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; use std::fmt::Debug; -use std::sync::Arc; use std::time::SystemTime; use std::time::UNIX_EPOCH; @@ -27,9 +26,9 @@ use url::{ParseError, Url}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, Add, Remove}; +use crate::logstore::LogStoreRef; use crate::operations::transaction::commit; use crate::protocol::DeltaOperation; -use crate::storage::DeltaObjectStore; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -40,7 +39,7 @@ pub struct FileSystemCheckBuilder { /// A snapshot of the to-be-checked table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Don't remove actions to the table log. Just determine which files can be removed dry_run: bool, } @@ -56,7 +55,7 @@ pub struct FileSystemCheckMetrics { struct FileSystemCheckPlan { /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Files that no longer exists in undlying ObjectStore but have active add actions pub files_to_remove: Vec, } @@ -74,10 +73,10 @@ fn is_absolute_path(path: &str) -> DeltaResult { impl FileSystemCheckBuilder { /// Create a new [`FileSystemCheckBuilder`] - pub fn new(store: Arc, state: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, state: DeltaTableState) -> Self { FileSystemCheckBuilder { snapshot: state, - store, + log_store, dry_run: false, } } @@ -91,7 +90,7 @@ impl FileSystemCheckBuilder { async fn create_fsck_plan(&self) -> DeltaResult { let mut files_relative: HashMap<&str, &Add> = HashMap::with_capacity(self.snapshot.files().len()); - let store = self.store.clone(); + let log_store = self.log_store.clone(); for active in self.snapshot.files() { if is_absolute_path(&active.path)? { @@ -103,7 +102,8 @@ impl FileSystemCheckBuilder { } } - let mut files = self.store.list(None).await?; + let object_store = log_store.object_store(); + let mut files = object_store.list(None).await?; while let Some(result) = files.next().await { let file = result?; files_relative.remove(file.location.as_ref()); @@ -120,7 +120,7 @@ impl FileSystemCheckBuilder { Ok(FileSystemCheckPlan { files_to_remove, - store, + log_store, }) } } @@ -156,7 +156,7 @@ impl FileSystemCheckPlan { } commit( - self.store.as_ref(), + self.log_store.as_ref(), &actions, DeltaOperation::FileSystemCheck {}, snapshot, @@ -183,7 +183,7 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { let plan = this.create_fsck_plan().await?; if this.dry_run { return Ok(( - DeltaTable::new_with_state(this.store, this.snapshot), + DeltaTable::new_with_state(this.log_store, this.snapshot), FileSystemCheckMetrics { files_removed: plan.files_to_remove.into_iter().map(|f| f.path).collect(), dry_run: true, @@ -192,7 +192,7 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { } let metrics = plan.execute(&this.snapshot).await?; - let mut table = DeltaTable::new_with_state(this.store, this.snapshot); + let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot); table.update().await?; Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/load.rs b/crates/deltalake-core/src/operations/load.rs index 7baa59e3e1..1a4c5c4cc6 100644 --- a/crates/deltalake-core/src/operations/load.rs +++ b/crates/deltalake-core/src/operations/load.rs @@ -7,7 +7,7 @@ use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use futures::future::BoxFuture; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::storage::DeltaObjectStore; +use crate::logstore::LogStoreRef; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -16,17 +16,17 @@ pub struct LoadBuilder { /// A snapshot of the to-be-loaded table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// A sub-selection of columns to be loaded columns: Option>, } impl LoadBuilder { /// Create a new [`LoadBuilder`] - pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, columns: None, } } @@ -46,7 +46,7 @@ impl std::future::IntoFuture for LoadBuilder { let this = self; Box::pin(async move { - let table = DeltaTable::new_with_state(this.store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); let schema = table.state.arrow_schema()?; let projection = this .columns diff --git a/crates/deltalake-core/src/operations/merge.rs b/crates/deltalake-core/src/operations/merge.rs index 57621cb316..d38ddf0efb 100644 --- a/crates/deltalake-core/src/operations/merge.rs +++ b/crates/deltalake-core/src/operations/merge.rs @@ -68,10 +68,10 @@ use super::transaction::commit; use crate::delta_datafusion::expr::{fmt_expr_to_sql, parse_predicate_expression}; use crate::delta_datafusion::{register_store, DeltaScanBuilder}; use crate::kernel::{Action, Remove}; +use crate::logstore::LogStoreRef; use crate::operations::datafusion_utils::MetricObserverExec; use crate::operations::write::write_execution_plan; use crate::protocol::{DeltaOperation, MergePredicate}; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableError}; @@ -107,7 +107,7 @@ pub struct MergeBuilder { /// The source data source: DataFrame, /// Delta object store for handling data files - object_store: Arc, + log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan state: Option, /// Properties passed to underlying parquet writer for when files are rewritten @@ -122,7 +122,7 @@ pub struct MergeBuilder { impl MergeBuilder { /// Create a new [`MergeBuilder`] pub fn new>( - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: DeltaTableState, predicate: E, source: DataFrame, @@ -132,7 +132,7 @@ impl MergeBuilder { predicate, source, snapshot, - object_store, + log_store, source_alias: None, target_alias: None, state: None, @@ -561,7 +561,7 @@ pub struct MergeMetrics { async fn execute( predicate: Expression, source: DataFrame, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, state: SessionState, writer_properties: Option, @@ -590,7 +590,7 @@ async fn execute( // predicates also need to be considered when pruning let target = Arc::new( - DeltaScanBuilder::new(snapshot, object_store.clone(), &state) + DeltaScanBuilder::new(snapshot, log_store.clone(), &state) .with_schema(snapshot.input_schema()?) .build() .await?, @@ -1126,7 +1126,7 @@ async fn execute( state.clone(), projection.clone(), table_partition_cols.clone(), - object_store.clone(), + log_store.object_store().clone(), Some(snapshot.table_config().target_file_size() as usize), None, writer_properties, @@ -1188,7 +1188,7 @@ async fn execute( not_matched_by_source_predicates: not_match_source_operations, }; version = commit( - object_store.as_ref(), + log_store.as_ref(), &actions, operation, snapshot, @@ -1212,7 +1212,7 @@ impl std::future::IntoFuture for MergeBuilder { let session = SessionContext::new(); // If a user provides their own their DF state then they must register the store themselves - register_store(this.object_store.clone(), session.runtime_env()); + register_store(this.log_store.clone(), session.runtime_env()); session.state() }); @@ -1220,7 +1220,7 @@ impl std::future::IntoFuture for MergeBuilder { let ((actions, version), metrics) = execute( this.predicate, this.source, - this.object_store.clone(), + this.log_store.clone(), &this.snapshot, state, this.writer_properties, @@ -1236,7 +1236,7 @@ impl std::future::IntoFuture for MergeBuilder { this.snapshot .merge(DeltaTableState::from_actions(actions, version)?, true, true); - let table = DeltaTable::new_with_state(this.object_store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/mod.rs b/crates/deltalake-core/src/operations/mod.rs index 35301f067e..0406272a5b 100644 --- a/crates/deltalake-core/src/operations/mod.rs +++ b/crates/deltalake-core/src/operations/mod.rs @@ -107,60 +107,60 @@ impl DeltaOps { /// ``` #[must_use] pub fn create(self) -> CreateBuilder { - CreateBuilder::default().with_object_store(self.0.object_store()) + CreateBuilder::default().with_log_store(self.0.log_store) } /// Load data from a DeltaTable #[cfg(feature = "datafusion")] #[must_use] pub fn load(self) -> LoadBuilder { - LoadBuilder::new(self.0.object_store(), self.0.state) + LoadBuilder::new(self.0.log_store, self.0.state) } /// Write data to Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn write(self, batches: impl IntoIterator) -> WriteBuilder { - WriteBuilder::new(self.0.object_store(), self.0.state).with_input_batches(batches) + WriteBuilder::new(self.0.log_store, self.0.state).with_input_batches(batches) } /// Vacuum stale files from delta table #[must_use] pub fn vacuum(self) -> VacuumBuilder { - VacuumBuilder::new(self.0.object_store(), self.0.state) + VacuumBuilder::new(self.0.log_store, self.0.state) } /// Audit active files with files present on the filesystem #[must_use] pub fn filesystem_check(self) -> FileSystemCheckBuilder { - FileSystemCheckBuilder::new(self.0.object_store(), self.0.state) + FileSystemCheckBuilder::new(self.0.log_store, self.0.state) } /// Audit active files with files present on the filesystem #[cfg(all(feature = "arrow", feature = "parquet"))] #[must_use] pub fn optimize<'a>(self) -> OptimizeBuilder<'a> { - OptimizeBuilder::new(self.0.object_store(), self.0.state) + OptimizeBuilder::new(self.0.log_store, self.0.state) } /// Delete data from Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn delete(self) -> DeleteBuilder { - DeleteBuilder::new(self.0.object_store(), self.0.state) + DeleteBuilder::new(self.0.log_store, self.0.state) } /// Update data from Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn update(self) -> UpdateBuilder { - UpdateBuilder::new(self.0.object_store(), self.0.state) + UpdateBuilder::new(self.0.log_store, self.0.state) } /// Restore delta table to a specified version or datetime #[must_use] pub fn restore(self) -> RestoreBuilder { - RestoreBuilder::new(self.0.object_store(), self.0.state) + RestoreBuilder::new(self.0.log_store, self.0.state) } /// Update data from Delta table @@ -171,12 +171,7 @@ impl DeltaOps { source: datafusion::prelude::DataFrame, predicate: E, ) -> MergeBuilder { - MergeBuilder::new( - self.0.object_store(), - self.0.state, - predicate.into(), - source, - ) + MergeBuilder::new(self.0.log_store, self.0.state, predicate.into(), source) } } diff --git a/crates/deltalake-core/src/operations/optimize.rs b/crates/deltalake-core/src/operations/optimize.rs index 7feecd1e56..0467d43a8b 100644 --- a/crates/deltalake-core/src/operations/optimize.rs +++ b/crates/deltalake-core/src/operations/optimize.rs @@ -42,6 +42,7 @@ use super::transaction::commit; use super::writer::{PartitionWriter, PartitionWriterConfig}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, Remove}; +use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; @@ -155,7 +156,7 @@ pub struct OptimizeBuilder<'a> { /// A snapshot of the to-be-optimized table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: ObjectStoreRef, + log_store: LogStoreRef, /// Filters to select specific table partitions to be optimized filters: &'a [PartitionFilter], /// Desired file size after bin-packing files @@ -177,10 +178,10 @@ pub struct OptimizeBuilder<'a> { impl<'a> OptimizeBuilder<'a> { /// Create a new [`OptimizeBuilder`] - pub fn new(store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, filters: &[], target_size: None, writer_properties: None, @@ -274,14 +275,14 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { )?; let metrics = plan .execute( - this.store.clone(), + this.log_store.clone(), &this.snapshot, this.max_concurrent_tasks, this.max_spill_size, this.min_commit_interval, ) .await?; - let mut table = DeltaTable::new_with_state(this.store, this.snapshot); + let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot); table.update().await?; Ok((table, metrics)) }) @@ -584,7 +585,7 @@ impl MergePlan { /// Perform the operations outlined in the plan. pub async fn execute( mut self, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, max_concurrent_tasks: usize, #[allow(unused_variables)] // used behind a feature flag @@ -607,7 +608,7 @@ impl MergePlan { for file in files.iter() { debug!(" file {}", file.location); } - let object_store_ref = object_store.clone(); + let object_store_ref = log_store.object_store().clone(); let batch_stream = futures::stream::iter(files.clone()) .then(move |file| { let object_store_ref = object_store_ref.clone(); @@ -625,7 +626,7 @@ impl MergePlan { self.task_parameters.clone(), partition, files, - object_store.clone(), + log_store.object_store().clone(), futures::future::ready(Ok(batch_stream)), )); util::flatten_join_error(rewrite_result) @@ -635,13 +636,15 @@ impl MergePlan { #[cfg(not(feature = "datafusion"))] let exec_context = Arc::new(zorder::ZOrderExecContext::new( zorder_columns, - object_store.clone(), + log_store.object_store().clone(), // If there aren't enough bins to use all threads, then instead // use threads within the bins. This is important for the case where // the table is un-partitioned, in which case the entire table is just // one big bin. bins.len() <= num_cpus::get(), )); + let object_store = log_store.object_store().clone(); + #[cfg(feature = "datafusion")] let exec_context = Arc::new(zorder::ZOrderExecContext::new( zorder_columns, @@ -649,7 +652,6 @@ impl MergePlan { max_spill_size, )?); let task_parameters = self.task_parameters.clone(); - let object_store = object_store.clone(); futures::stream::iter(bins) .map(move |(partition, files)| { let batch_stream = Self::read_zorder(files.clone(), exec_context.clone()); @@ -671,7 +673,7 @@ impl MergePlan { let mut stream = stream.buffer_unordered(max_concurrent_tasks); - let mut table = DeltaTable::new_with_state(object_store.clone(), snapshot.clone()); + let mut table = DeltaTable::new_with_state(log_store.clone(), snapshot.clone()); // Actions buffered so far. These will be flushed either at the end // or when we reach the commit interval. @@ -720,7 +722,7 @@ impl MergePlan { //// TODO: Check for remove actions on optimized partitions. If a //// optimized partition was updated then abort the commit. Requires (#593). commit( - table.object_store().as_ref(), + table.log_store.as_ref(), &actions, self.task_parameters.input_parameters.clone().into(), table.get_state(), diff --git a/crates/deltalake-core/src/operations/restore.rs b/crates/deltalake-core/src/operations/restore.rs index a356b5b312..c391de6f04 100644 --- a/crates/deltalake-core/src/operations/restore.rs +++ b/crates/deltalake-core/src/operations/restore.rs @@ -10,7 +10,8 @@ //! 5) If ignore_missing_files option is false (default value) check availability of AddFile //! in file system. //! 6) Commit Protocol, all RemoveFile and AddFile actions -//! into delta log using `try_commit_transaction` (commit will be failed in case of parallel transaction) +//! into delta log using `LogStore::write_commit_entry` (commit will be failed in case of parallel transaction) +//! TODO: comment is outdated //! 7) If table was modified in parallel then ignore restore and raise exception. //! //! # Example @@ -31,9 +32,9 @@ use object_store::ObjectStore; use serde::Serialize; use crate::kernel::{Action, Add, Protocol, Remove}; -use crate::operations::transaction::{prepare_commit, try_commit_transaction, TransactionError}; +use crate::logstore::LogStoreRef; +use crate::operations::transaction::{prepare_commit, TransactionError}; use crate::protocol::DeltaOperation; -use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableConfig, DeltaTableError, ObjectStoreError}; @@ -74,7 +75,7 @@ pub struct RestoreBuilder { /// A snapshot of the to-be-restored table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: ObjectStoreRef, + log_store: LogStoreRef, /// Version to restore version_to_restore: Option, /// Datetime to restore @@ -87,10 +88,10 @@ pub struct RestoreBuilder { impl RestoreBuilder { /// Create a new [`RestoreBuilder`] - pub fn new(store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, version_to_restore: None, datetime_to_restore: None, ignore_missing_files: false, @@ -125,7 +126,7 @@ impl RestoreBuilder { } async fn execute( - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: DeltaTableState, version_to_restore: Option, datetime_to_restore: Option>, @@ -138,7 +139,7 @@ async fn execute( { return Err(DeltaTableError::from(RestoreError::InvalidRestoreParameter)); } - let mut table = DeltaTable::new(object_store.clone(), DeltaTableConfig::default()); + let mut table = DeltaTable::new(log_store.clone(), DeltaTableConfig::default()); let version = match datetime_to_restore { Some(datetime) => { table.load_with_datetime(datetime).await?; @@ -195,7 +196,7 @@ async fn execute( .collect(); if !ignore_missing_files { - check_files_available(object_store.as_ref(), &files_to_add).await?; + check_files_available(log_store.object_store().as_ref(), &files_to_add).await?; } let metrics = RestoreMetrics { @@ -238,7 +239,7 @@ async fn execute( actions.extend(files_to_remove.into_iter().map(Action::Remove)); let commit = prepare_commit( - object_store.as_ref(), + log_store.object_store().as_ref(), &DeltaOperation::Restore { version: version_to_restore, datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }), @@ -249,13 +250,13 @@ async fn execute( ) .await?; let commit_version = snapshot.version() + 1; - match try_commit_transaction(object_store.as_ref(), &commit, commit_version).await { + match log_store.write_commit_entry(commit_version, &commit).await { Ok(_) => {} Err(err @ TransactionError::VersionAlreadyExists(_)) => { return Err(err.into()); } Err(err) => { - object_store.delete(&commit).await?; + log_store.object_store().delete(&commit).await?; return Err(err.into()); } } @@ -291,7 +292,7 @@ impl std::future::IntoFuture for RestoreBuilder { Box::pin(async move { let metrics = execute( - this.store.clone(), + this.log_store.clone(), this.snapshot.clone(), this.version_to_restore, this.datetime_to_restore, @@ -299,7 +300,7 @@ impl std::future::IntoFuture for RestoreBuilder { this.protocol_downgrade_allowed, ) .await?; - let mut table = DeltaTable::new_with_state(this.store, this.snapshot); + let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot); table.update().await?; Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/transaction/mod.rs b/crates/deltalake-core/src/operations/transaction/mod.rs index c31c349fd7..e5e808d2d5 100644 --- a/crates/deltalake-core/src/operations/transaction/mod.rs +++ b/crates/deltalake-core/src/operations/transaction/mod.rs @@ -10,8 +10,8 @@ use serde_json::Value; use crate::crate_version; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, CommitInfo}; +use crate::logstore::LogStore; use crate::protocol::DeltaOperation; -use crate::storage::commit_uri_from_version; use crate::table::state::DeltaTableState; mod conflict_checker; @@ -130,6 +130,7 @@ pub(crate) fn get_commit_bytes( /// Low-level transaction API. Creates a temporary commit file. Once created, /// the transaction object could be dropped and the actual commit could be executed /// with `DeltaTable.try_commit_transaction`. +/// TODO: comment is outdated now pub(crate) async fn prepare_commit<'a>( storage: &dyn ObjectStore, operation: &DeltaOperation, @@ -150,42 +151,26 @@ pub(crate) async fn prepare_commit<'a>( Ok(path) } -/// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists] -/// if the given `version` already exists. The caller should handle the retry logic itself. -/// This is low-level transaction API. If user does not want to maintain the commit loop then -/// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction` -/// with retry logic. -pub(crate) async fn try_commit_transaction( - storage: &dyn ObjectStore, - tmp_commit: &Path, - version: i64, -) -> Result { - // move temporary commit file to delta log directory - // rely on storage to fail if the file already exists - - storage - .rename_if_not_exists(tmp_commit, &commit_uri_from_version(version)) - .await - .map_err(|err| match err { - ObjectStoreError::AlreadyExists { .. } => { - TransactionError::VersionAlreadyExists(version) - } - _ => TransactionError::from(err), - })?; - Ok(version) -} - /// Commit a transaction, with up to 15 retries. This is higher-level transaction API. /// /// Will error early if the a concurrent transaction has already been committed /// and conflicts with this transaction. pub async fn commit( - storage: &dyn ObjectStore, + log_store: &dyn LogStore, actions: &Vec, operation: DeltaOperation, read_snapshot: &DeltaTableState, app_metadata: Option>, ) -> DeltaResult { - commit_with_retries(storage, actions, operation, read_snapshot, app_metadata, 15).await + commit_with_retries( + log_store, + actions, + operation, + read_snapshot, + app_metadata, + 15, + ) + .await } /// Commit a transaction, with up configurable number of retries. This is higher-level transaction API. @@ -193,24 +178,35 @@ pub async fn commit( /// The function will error early if the a concurrent transaction has already been committed /// and conflicts with this transaction. pub async fn commit_with_retries( - storage: &dyn ObjectStore, + log_store: &dyn LogStore, actions: &Vec, operation: DeltaOperation, read_snapshot: &DeltaTableState, app_metadata: Option>, max_retries: usize, ) -> DeltaResult { - let tmp_commit = - prepare_commit(storage, &operation, actions, read_snapshot, app_metadata).await?; + let tmp_commit = prepare_commit( + log_store.object_store().as_ref(), + &operation, + actions, + read_snapshot, + app_metadata, + ) + .await?; let mut attempt_number = 1; while attempt_number <= max_retries { let version = read_snapshot.version() + attempt_number as i64; - match try_commit_transaction(storage, &tmp_commit, version).await { - Ok(version) => return Ok(version), + match log_store.write_commit_entry(version, &tmp_commit).await { + Ok(()) => return Ok(version), Err(TransactionError::VersionAlreadyExists(version)) => { - let summary = WinningCommitSummary::try_new(storage, version - 1, version).await?; + let summary = WinningCommitSummary::try_new( + log_store.object_store().as_ref(), + version - 1, + version, + ) + .await?; let transaction_info = TransactionInfo::try_new( read_snapshot, operation.read_predicate(), @@ -225,13 +221,13 @@ pub async fn commit_with_retries( attempt_number += 1; } Err(err) => { - storage.delete(&tmp_commit).await?; + log_store.object_store().delete(&tmp_commit).await?; return Err(TransactionError::CommitConflict(err).into()); } }; } Err(err) => { - storage.delete(&tmp_commit).await?; + log_store.object_store().delete(&tmp_commit).await?; return Err(err.into()); } } @@ -242,11 +238,16 @@ pub async fn commit_with_retries( #[cfg(all(test, feature = "parquet"))] mod tests { + use std::{collections::HashMap, sync::Arc}; + use self::test_utils::{create_remove_action, init_table_actions}; use super::*; - use crate::DeltaConfigKey; + use crate::{ + logstore::default_logstore::DefaultLogStore, storage::commit_uri_from_version, + DeltaConfigKey, + }; use object_store::memory::InMemory; - use std::collections::HashMap; + use url::Url; #[test] fn test_commit_uri_from_version() { @@ -290,18 +291,25 @@ mod tests { #[tokio::test] async fn test_try_commit_transaction() { - let store = InMemory::new(); + let store = Arc::new(InMemory::new()); + let url = Url::parse("mem://what/is/this").unwrap(); + let log_store = DefaultLogStore::new( + store.clone(), + crate::logstore::LogStoreConfig { + location: url, + options: HashMap::new().into(), + }, + ); let tmp_path = Path::from("_delta_log/tmp"); let version_path = Path::from("_delta_log/00000000000000000000.json"); store.put(&tmp_path, bytes::Bytes::new()).await.unwrap(); store.put(&version_path, bytes::Bytes::new()).await.unwrap(); + let res = log_store.write_commit_entry(0, &tmp_path).await; // fails if file version already exists - let res = try_commit_transaction(&store, &tmp_path, 0).await; assert!(res.is_err()); // succeeds for next version - let res = try_commit_transaction(&store, &tmp_path, 1).await.unwrap(); - assert_eq!(res, 1); + log_store.write_commit_entry(1, &tmp_path).await.unwrap(); } } diff --git a/crates/deltalake-core/src/operations/transaction/test_utils.rs b/crates/deltalake-core/src/operations/transaction/test_utils.rs index b52b1a1c7b..56b0894019 100644 --- a/crates/deltalake-core/src/operations/transaction/test_utils.rs +++ b/crates/deltalake-core/src/operations/transaction/test_utils.rs @@ -1,7 +1,7 @@ #![allow(unused)] use std::collections::HashMap; -use super::{prepare_commit, try_commit_transaction}; +use super::prepare_commit; use crate::kernel::{ Action, Add, CommitInfo, DataType, Metadata, PrimitiveType, Protocol, Remove, StructField, StructType, @@ -121,7 +121,7 @@ pub async fn create_initialized_table( partition_cols: &[String], configuration: Option>>, ) -> DeltaTable { - let storage = DeltaTableBuilder::from_uri("memory://") + let log_store = DeltaTableBuilder::from_uri("memory://") .build_storage() .unwrap(); let table_schema = StructType::new(vec![ @@ -161,11 +161,19 @@ pub async fn create_initialized_table( ), }; let actions = init_table_actions(None); - let prepared_commit = prepare_commit(storage.as_ref(), &operation, &actions, &state, None) - .await - .unwrap(); - try_commit_transaction(storage.as_ref(), &prepared_commit, 0) + let prepared_commit = prepare_commit( + log_store.object_store().as_ref(), + &operation, + &actions, + &state, + None, + ) + .await + .unwrap(); + + log_store + .write_commit_entry(0, &prepared_commit) .await .unwrap(); - DeltaTable::new_with_state(storage, state) + DeltaTable::new_with_state(log_store, state) } diff --git a/crates/deltalake-core/src/operations/update.rs b/crates/deltalake-core/src/operations/update.rs index bc458bdd53..559f28868d 100644 --- a/crates/deltalake-core/src/operations/update.rs +++ b/crates/deltalake-core/src/operations/update.rs @@ -49,8 +49,8 @@ use super::write::write_execution_plan; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; use crate::kernel::{Action, Remove}; +use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableError}; @@ -64,7 +64,7 @@ pub struct UpdateBuilder { /// A snapshot of the table's state snapshot: DeltaTableState, /// Delta object store for handling data files - object_store: Arc, + log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan state: Option, /// Properties passed to underlying parquet writer for when files are rewritten @@ -95,12 +95,12 @@ pub struct UpdateMetrics { impl UpdateBuilder { /// Create a new ['UpdateBuilder'] - pub fn new(object_store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { predicate: None, updates: HashMap::new(), snapshot, - object_store, + log_store, state: None, writer_properties: None, app_metadata: None, @@ -164,7 +164,7 @@ impl UpdateBuilder { async fn execute( predicate: Option, updates: HashMap, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, state: SessionState, writer_properties: Option, @@ -213,7 +213,7 @@ async fn execute( let table_partition_cols = current_metadata.partition_columns.clone(); let scan_start = Instant::now(); - let candidates = find_files(snapshot, object_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis() as u64; if candidates.candidates.is_empty() { @@ -225,7 +225,7 @@ async fn execute( let execution_props = state.execution_props(); // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches - let scan = DeltaScanBuilder::new(snapshot, object_store.clone(), &state) + let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) .with_files(&candidates.candidates) .build() .await?; @@ -357,7 +357,7 @@ async fn execute( state.clone(), projection.clone(), table_partition_cols.clone(), - object_store.clone(), + log_store.object_store().clone(), Some(snapshot.table_config().target_file_size() as usize), None, writer_properties, @@ -407,7 +407,7 @@ async fn execute( predicate: Some(fmt_expr_to_sql(&predicate)?), }; version = commit( - object_store.as_ref(), + log_store.as_ref(), &actions, operation, snapshot, @@ -430,7 +430,7 @@ impl std::future::IntoFuture for UpdateBuilder { let session = SessionContext::new(); // If a user provides their own their DF state then they must register the store themselves - register_store(this.object_store.clone(), session.runtime_env()); + register_store(this.log_store.clone(), session.runtime_env()); session.state() }); @@ -438,7 +438,7 @@ impl std::future::IntoFuture for UpdateBuilder { let ((actions, version), metrics) = execute( this.predicate, this.updates, - this.object_store.clone(), + this.log_store.clone(), &this.snapshot, state, this.writer_properties, @@ -449,7 +449,7 @@ impl std::future::IntoFuture for UpdateBuilder { this.snapshot .merge(DeltaTableState::from_actions(actions, version)?, true, true); - let table = DeltaTable::new_with_state(this.object_store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/vacuum.rs b/crates/deltalake-core/src/operations/vacuum.rs index 47f7c1d5c9..efdde55347 100644 --- a/crates/deltalake-core/src/operations/vacuum.rs +++ b/crates/deltalake-core/src/operations/vacuum.rs @@ -37,8 +37,8 @@ use super::transaction::commit; use crate::crate_version; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::Action; +use crate::logstore::{LogStore, LogStoreRef}; use crate::protocol::DeltaOperation; -use crate::storage::DeltaObjectStore; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -82,7 +82,7 @@ pub struct VacuumBuilder { /// A snapshot of the to-be-vacuumed table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Period of stale files allowed. retention_period: Option, /// Validate the retention period is not below the retention period configured in the table @@ -125,10 +125,10 @@ pub struct VacuumEndOperationMetrics { /// Methods to specify various vacuum options and to execute the operation impl VacuumBuilder { /// Create a new [`VacuumBuilder`] - pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { VacuumBuilder { snapshot, - store, + log_store, retention_period: None, enforce_retention_duration: true, dry_run: false, @@ -184,8 +184,11 @@ impl VacuumBuilder { let mut files_to_delete = vec![]; let mut file_sizes = vec![]; - let mut all_files = self.store.list(None).await.map_err(DeltaTableError::from)?; - + let object_store = self.log_store.object_store(); + let mut all_files = object_store + .list(None) + .await + .map_err(DeltaTableError::from)?; let partition_columns = &self .snapshot .current_metadata() @@ -227,7 +230,7 @@ impl std::future::IntoFuture for VacuumBuilder { let plan = this.create_vacuum_plan().await?; if this.dry_run { return Ok(( - DeltaTable::new_with_state(this.store, this.snapshot), + DeltaTable::new_with_state(this.log_store, this.snapshot), VacuumMetrics { files_deleted: plan.files_to_delete.iter().map(|f| f.to_string()).collect(), dry_run: true, @@ -235,9 +238,11 @@ impl std::future::IntoFuture for VacuumBuilder { )); } - let metrics = plan.execute(&this.store, &this.snapshot).await?; + let metrics = plan + .execute(this.log_store.as_ref(), &this.snapshot) + .await?; Ok(( - DeltaTable::new_with_state(this.store, this.snapshot), + DeltaTable::new_with_state(this.log_store, this.snapshot), metrics, )) }) @@ -262,7 +267,7 @@ impl VacuumPlan { /// Execute the vacuum plan and delete files from underlying storage pub async fn execute( self, - store: &DeltaObjectStore, + store: &dyn LogStore, snapshot: &DeltaTableState, ) -> Result { if self.files_to_delete.is_empty() { @@ -311,6 +316,7 @@ impl VacuumPlan { .boxed(); let files_deleted = store + .object_store() .delete_stream(locations) .map(|res| match res { Ok(path) => Ok(path.to_string()), @@ -395,7 +401,7 @@ mod tests { async fn vacuum_delta_8_0_table() { let table = open_table("./tests/data/delta-0.8.0").await.unwrap(); - let result = VacuumBuilder::new(table.object_store(), table.state.clone()) + let result = VacuumBuilder::new(table.log_store, table.state.clone()) .with_retention_period(Duration::hours(1)) .with_dry_run(true) .await; @@ -403,7 +409,7 @@ mod tests { assert!(result.is_err()); let table = open_table("./tests/data/delta-0.8.0").await.unwrap(); - let (table, result) = VacuumBuilder::new(table.object_store(), table.state) + let (table, result) = VacuumBuilder::new(table.log_store, table.state) .with_retention_period(Duration::hours(0)) .with_dry_run(true) .with_enforce_retention_duration(false) @@ -415,7 +421,7 @@ mod tests { vec!["part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"] ); - let (table, result) = VacuumBuilder::new(table.object_store(), table.state) + let (table, result) = VacuumBuilder::new(table.log_store, table.state) .with_retention_period(Duration::hours(169)) .with_dry_run(true) .await @@ -432,7 +438,7 @@ mod tests { .as_secs() / 3600; let empty: Vec = Vec::new(); - let (_table, result) = VacuumBuilder::new(table.object_store(), table.state) + let (_table, result) = VacuumBuilder::new(table.log_store, table.state) .with_retention_period(Duration::hours(retention_hours as i64)) .with_dry_run(true) .await diff --git a/crates/deltalake-core/src/operations/write.rs b/crates/deltalake-core/src/operations/write.rs index 45bdaaeff5..dec4b7ced7 100644 --- a/crates/deltalake-core/src/operations/write.rs +++ b/crates/deltalake-core/src/operations/write.rs @@ -44,8 +44,9 @@ use super::{transaction::commit, CreateBuilder}; use crate::delta_datafusion::DeltaDataChecker; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, Add, Remove, StructType}; +use crate::logstore::LogStoreRef; use crate::protocol::{DeltaOperation, SaveMode}; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; +use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; use crate::writer::record_batch::divide_by_partition_values; use crate::writer::utils::PartitionPath; @@ -90,7 +91,7 @@ pub struct WriteBuilder { /// A snapshot of the to-be-loaded table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// The input plan input: Option>, /// Datafusion session state relevant for executing the input plan @@ -117,10 +118,10 @@ pub struct WriteBuilder { impl WriteBuilder { /// Create a new [`WriteBuilder`] - pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, input: None, state: None, mode: SaveMode::Append, @@ -210,7 +211,7 @@ impl WriteBuilder { } async fn check_preconditions(&self) -> DeltaResult> { - match self.store.is_delta_table_location().await? { + match self.log_store.is_delta_table_location().await? { true => { let min_writer = self.snapshot.min_writer_version(); if min_writer > MAX_SUPPORTED_WRITER_VERSION { @@ -218,7 +219,7 @@ impl WriteBuilder { } else { match self.mode { SaveMode::ErrorIfExists => { - Err(WriteError::AlreadyExists(self.store.root_uri()).into()) + Err(WriteError::AlreadyExists(self.log_store.root_uri()).into()) } _ => Ok(vec![]), } @@ -236,7 +237,7 @@ impl WriteBuilder { Err(WriteError::MissingData) }?; let mut builder = CreateBuilder::new() - .with_object_store(self.store.clone()) + .with_log_store(self.log_store.clone()) .with_columns(schema.fields().clone()); if let Some(partition_columns) = self.partition_columns.as_ref() { builder = builder.with_partition_columns(partition_columns.clone()) @@ -356,7 +357,7 @@ impl std::future::IntoFuture for WriteBuilder { let schema = batches[0].schema(); let table_schema = this .snapshot - .physical_arrow_schema(this.store.clone()) + .physical_arrow_schema(this.log_store.object_store().clone()) .await .or_else(|_| this.snapshot.arrow_schema()) .unwrap_or(schema.clone()); @@ -418,7 +419,7 @@ impl std::future::IntoFuture for WriteBuilder { state, plan, partition_columns.clone(), - this.store.clone(), + this.log_store.object_store().clone(), this.target_file_size, this.write_batch_size, this.writer_properties, @@ -468,7 +469,7 @@ impl std::future::IntoFuture for WriteBuilder { }; let version = commit( - this.store.as_ref(), + this.log_store.as_ref(), &actions, DeltaOperation::Write { mode: this.mode, @@ -492,7 +493,7 @@ impl std::future::IntoFuture for WriteBuilder { // TODO should we build checkpoints based on config? - Ok(DeltaTable::new_with_state(this.store, this.snapshot)) + Ok(DeltaTable::new_with_state(this.log_store, this.snapshot)) }) } } diff --git a/crates/deltalake-core/src/operations/writer.rs b/crates/deltalake-core/src/operations/writer.rs index 0bba167e33..6d551ecb96 100644 --- a/crates/deltalake-core/src/operations/writer.rs +++ b/crates/deltalake-core/src/operations/writer.rs @@ -406,9 +406,10 @@ mod tests { #[tokio::test] async fn test_write_partition() { - let object_store = DeltaTableBuilder::from_uri("memory://") + let log_store = DeltaTableBuilder::from_uri("memory://") .build_storage() .unwrap(); + let object_store = log_store.object_store(); let batch = get_record_batch(None, false); // write single un-partitioned batch @@ -439,12 +440,13 @@ mod tests { let object_store = DeltaTableBuilder::from_uri("memory://") .build_storage() - .unwrap(); + .unwrap() + .object_store(); let properties = WriterProperties::builder() .set_max_row_group_size(1024) .build(); // configure small target file size and and row group size so we can observe multiple files written - let mut writer = get_writer(object_store.clone(), &batch, Some(properties), Some(10_000)); + let mut writer = get_writer(object_store, &batch, Some(properties), Some(10_000)); writer.write(&batch).await.unwrap(); // check that we have written more then once file, and no more then 1 is below target size diff --git a/crates/deltalake-core/src/protocol/checkpoints.rs b/crates/deltalake-core/src/protocol/checkpoints.rs index fc23c1d28b..f48fbfbd76 100644 --- a/crates/deltalake-core/src/protocol/checkpoints.rs +++ b/crates/deltalake-core/src/protocol/checkpoints.rs @@ -23,7 +23,7 @@ use crate::kernel::{ Action, Add as AddAction, DataType, Metadata, PrimitiveType, Protocol, StructField, StructType, Txn, }; -use crate::storage::DeltaObjectStore; +use crate::logstore::LogStore; use crate::table::state::DeltaTableState; use crate::table::{CheckPoint, CheckPointBuilder}; use crate::{open_table_with_version, DeltaTable}; @@ -70,7 +70,7 @@ pub const CHECKPOINT_RECORD_BATCH_SIZE: usize = 5000; /// Creates checkpoint at current table version pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), ProtocolError> { - create_checkpoint_for(table.version(), table.get_state(), table.storage.as_ref()).await?; + create_checkpoint_for(table.version(), table.get_state(), table.log_store.as_ref()).await?; Ok(()) } @@ -81,7 +81,7 @@ pub async fn cleanup_metadata(table: &DeltaTable) -> Result Result<(), ProtocolError> { // TODO: checkpoints _can_ be multi-part... haven't actually found a good reference for // an appropriate split point yet though so only writing a single part currently. // See https://github.com/delta-io/delta-rs/issues/288 - let last_checkpoint_path = storage.log_path().child("_last_checkpoint"); + let last_checkpoint_path = log_store.log_path().child("_last_checkpoint"); debug!("Writing parquet bytes to checkpoint buffer."); let (checkpoint, parquet_bytes) = parquet_bytes_from_state(state)?; let file_name = format!("{version:020}.checkpoint.parquet"); - let checkpoint_path = storage.log_path().child(file_name); + let checkpoint_path = log_store.log_path().child(file_name); + let object_store = log_store.object_store(); debug!("Writing checkpoint to {:?}.", checkpoint_path); - storage.put(&checkpoint_path, parquet_bytes).await?; + object_store.put(&checkpoint_path, parquet_bytes).await?; let last_checkpoint_content: Value = serde_json::to_value(checkpoint)?; let last_checkpoint_content = bytes::Bytes::from(serde_json::to_vec(&last_checkpoint_content)?); debug!("Writing _last_checkpoint to {:?}.", last_checkpoint_path); - storage + object_store .put(&last_checkpoint_path, last_checkpoint_content) .await?; @@ -146,7 +147,7 @@ pub async fn create_checkpoint_for( /// and less than the specified version. pub async fn cleanup_expired_logs_for( until_version: i64, - storage: &DeltaObjectStore, + log_store: &dyn LogStore, cutoff_timestamp: i64, ) -> Result { lazy_static! { @@ -157,10 +158,11 @@ pub async fn cleanup_expired_logs_for( // Feed a stream of candidate deletion files directly into the delete_stream // function to try to improve the speed of cleanup and reduce the need for // intermediate memory. - let deleted = storage + let object_store = log_store.object_store(); + let deleted = object_store .delete_stream( - storage - .list(Some(storage.log_path())) + object_store + .list(Some(log_store.log_path())) .await? // This predicate function will filter out any locations that don't // match the given timestamp range diff --git a/crates/deltalake-core/src/protocol/mod.rs b/crates/deltalake-core/src/protocol/mod.rs index 47e24cd959..8a5cd9f858 100644 --- a/crates/deltalake-core/src/protocol/mod.rs +++ b/crates/deltalake-core/src/protocol/mod.rs @@ -26,7 +26,7 @@ use std::mem::take; use crate::errors::DeltaResult; use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove}; -use crate::storage::ObjectStoreRef; +use crate::logstore::LogStore; use crate::table::CheckPoint; use crate::table::DeltaTableMetaData; @@ -601,14 +601,15 @@ pub enum OutputMode { } pub(crate) async fn get_last_checkpoint( - object_store: &ObjectStoreRef, + log_store: &dyn LogStore, ) -> Result { let last_checkpoint_path = Path::from_iter(["_delta_log", "_last_checkpoint"]); debug!("loading checkpoint from {last_checkpoint_path}"); + let object_store = log_store.object_store(); match object_store.get(&last_checkpoint_path).await { Ok(data) => Ok(serde_json::from_slice(&data.bytes().await?)?), Err(ObjectStoreError::NotFound { .. }) => { - match find_latest_check_point_for_version(object_store, i64::MAX).await { + match find_latest_check_point_for_version(log_store, i64::MAX).await { Ok(Some(cp)) => Ok(cp), _ => Err(ProtocolError::CheckpointNotFound), } @@ -618,7 +619,7 @@ pub(crate) async fn get_last_checkpoint( } pub(crate) async fn find_latest_check_point_for_version( - object_store: &ObjectStoreRef, + log_store: &dyn LogStore, version: i64, ) -> Result, ProtocolError> { lazy_static! { @@ -629,7 +630,8 @@ pub(crate) async fn find_latest_check_point_for_version( } let mut cp: Option = None; - let mut stream = object_store.list(Some(object_store.log_path())).await?; + let object_store = log_store.object_store(); + let mut stream = object_store.list(Some(log_store.log_path())).await?; while let Some(obj_meta) = stream.next().await { // Exit early if any objects can't be listed. diff --git a/crates/deltalake-core/src/storage/config.rs b/crates/deltalake-core/src/storage/config.rs index 1cba57b579..1ddf7d9c8a 100644 --- a/crates/deltalake-core/src/storage/config.rs +++ b/crates/deltalake-core/src/storage/config.rs @@ -12,6 +12,8 @@ use url::Url; use super::file::FileStorageBackend; use super::utils::str_is_truthy; use crate::errors::{DeltaResult, DeltaTableError}; +use crate::logstore::default_logstore::DefaultLogStore; +use crate::logstore::LogStoreRef; #[cfg(any(feature = "s3", feature = "s3-native-tls"))] use super::s3::{S3StorageBackend, S3StorageOptions}; @@ -225,7 +227,18 @@ impl From> for StorageOptions { } } -pub(crate) fn configure_store( +/// Configure a [`LogStoreRef`] for the given url and configuration +pub fn configure_log_store( + url: Url, + options: impl Into + Clone, +) -> DeltaResult { + let mut options = options.into(); + let (_scheme, _prefix) = ObjectStoreScheme::parse(&url, &mut options)?; + Ok(Arc::new(DefaultLogStore::try_new(url, options)?)) +} + +/// Configure an instance of an [`ObjectStore`] for the given url and configuration +pub fn configure_store( url: &Url, options: &mut StorageOptions, ) -> DeltaResult> { diff --git a/crates/deltalake-core/src/storage/mod.rs b/crates/deltalake-core/src/storage/mod.rs index c7309531ea..b571905f8b 100644 --- a/crates/deltalake-core/src/storage/mod.rs +++ b/crates/deltalake-core/src/storage/mod.rs @@ -1,22 +1,8 @@ //! Object storage backend abstraction layer for Delta Table transaction logs and data -use std::collections::HashMap; -use std::fmt; -use std::ops::Range; use std::sync::Arc; -use bytes::Bytes; -use futures::{stream::BoxStream, StreamExt}; use lazy_static::lazy_static; -use object_store::GetOptions; -use serde::de::{Error, SeqAccess, Visitor}; -use serde::ser::SerializeSeq; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use tokio::io::AsyncWrite; -use url::Url; - -use self::config::StorageOptions; -use crate::errors::DeltaResult; pub mod config; pub mod file; @@ -25,9 +11,6 @@ pub mod utils; #[cfg(any(feature = "s3", feature = "s3-native-tls"))] pub mod s3; -#[cfg(feature = "datafusion")] -use datafusion::datasource::object_store::ObjectStoreUrl; - pub use object_store::path::{Path, DELIMITER}; pub use object_store::{ DynObjectStore, Error as ObjectStoreError, GetResult, ListResult, MultipartId, ObjectMeta, @@ -45,342 +28,5 @@ pub(crate) fn commit_uri_from_version(version: i64) -> Path { DELTA_LOG_PATH.child(version.as_str()) } -/// Sharable reference to [`DeltaObjectStore`] -pub type ObjectStoreRef = Arc; - -/// Object Store implementation for DeltaTable. -/// -/// The [DeltaObjectStore] implements the [object_store::ObjectStore] trait to facilitate -/// interoperability with the larger rust / arrow ecosystem. Specifically it can directly -/// be registered as store within datafusion. -/// -/// The table root is treated as the root of the object store. -/// All [Path] are reported relative to the table root. -#[derive(Debug, Clone)] -pub struct DeltaObjectStore { - storage: Arc, - location: Url, - options: StorageOptions, -} - -impl std::fmt::Display for DeltaObjectStore { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "DeltaObjectStore({})", self.location.as_ref()) - } -} - -impl DeltaObjectStore { - /// Create a new instance of [`DeltaObjectStore`] - /// - /// # Arguments - /// - /// * `storage` - A shared reference to an [`object_store::ObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located). - /// * `location` - A url corresponding to the storage location of `storage`. - pub fn new(storage: Arc, location: Url) -> Self { - Self { - storage, - location, - options: HashMap::new().into(), - } - } - - /// Try creating a new instance of [`DeltaObjectStore`] - /// - /// # Arguments - /// - /// * `location` - A url pointing to the root of the delta table. - /// * `options` - Options passed to underlying builders. See [`with_storage_options`](crate::table::builder::DeltaTableBuilder::with_storage_options) - pub fn try_new(location: Url, options: impl Into + Clone) -> DeltaResult { - let mut options = options.into(); - let storage = config::configure_store(&location, &mut options)?; - Ok(Self { - storage, - location, - options, - }) - } - - /// Get a reference to the underlying storage backend - pub fn storage_backend(&self) -> Arc { - self.storage.clone() - } - - /// Storage options used to initialize storage backend - pub fn storage_options(&self) -> &StorageOptions { - &self.options - } - - /// Get fully qualified uri for table root - pub fn root_uri(&self) -> String { - self.to_uri(&Path::from("")) - } - - #[cfg(feature = "datafusion")] - /// Generate a unique enough url to identify the store in datafusion. - /// The DF object store registry only cares about the scheme and the host of the url for - /// registering/fetching. In our case the scheme is hard-coded to "delta-rs", so to get a unique - /// host we convert the location from this `DeltaObjectStore` to a valid name, combining the - /// original scheme, host and path with invalid characters replaced. - pub fn object_store_url(&self) -> ObjectStoreUrl { - // we are certain, that the URL can be parsed, since - // we make sure when we are parsing the table uri - ObjectStoreUrl::parse(format!( - "delta-rs://{}-{}{}", - self.location.scheme(), - self.location.host_str().unwrap_or("-"), - self.location - .path() - .replace(DELIMITER, "-") - .replace(':', "-") - )) - .expect("Invalid object store url.") - } - - /// [Path] to Delta log - pub fn log_path(&self) -> &Path { - &DELTA_LOG_PATH - } - - /// [Path] to Delta log - pub fn to_uri(&self, location: &Path) -> String { - match self.location.scheme() { - "file" => { - #[cfg(windows)] - let uri = format!( - "{}/{}", - self.location.as_ref().trim_end_matches('/'), - location.as_ref() - ) - .replace("file:///", ""); - #[cfg(unix)] - let uri = format!( - "{}/{}", - self.location.as_ref().trim_end_matches('/'), - location.as_ref() - ) - .replace("file://", ""); - uri - } - _ => { - if location.as_ref().is_empty() || location.as_ref() == "/" { - self.location.as_ref().to_string() - } else { - format!("{}/{}", self.location.as_ref(), location.as_ref()) - } - } - } - } - - /// Deletes object by `paths`. - pub async fn delete_batch(&self, paths: &[Path]) -> ObjectStoreResult<()> { - for path in paths { - match self.delete(path).await { - Ok(_) => continue, - Err(ObjectStoreError::NotFound { .. }) => continue, - Err(e) => return Err(e), - } - } - Ok(()) - } - - /// Check if the location is a delta table location - pub async fn is_delta_table_location(&self) -> ObjectStoreResult { - // TODO We should really be using HEAD here, but this fails in windows tests - let mut stream = self.list(Some(self.log_path())).await?; - if let Some(res) = stream.next().await { - match res { - Ok(_) => Ok(true), - Err(ObjectStoreError::NotFound { .. }) => Ok(false), - Err(err) => Err(err), - } - } else { - Ok(false) - } - } -} - -#[async_trait::async_trait] -impl ObjectStore for DeltaObjectStore { - /// Save the provided bytes to the specified location. - async fn put(&self, location: &Path, bytes: Bytes) -> ObjectStoreResult<()> { - self.storage.put(location, bytes).await - } - - /// Return the bytes that are stored at the specified location. - async fn get(&self, location: &Path) -> ObjectStoreResult { - self.storage.get(location).await - } - - /// Perform a get request with options - /// - /// Note: options.range will be ignored if [`object_store::GetResultPayload::File`] - async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { - self.storage.get_opts(location, options).await - } - - /// Return the bytes that are stored at the specified location - /// in the given byte range - async fn get_range(&self, location: &Path, range: Range) -> ObjectStoreResult { - self.storage.get_range(location, range).await - } - - /// Return the metadata for the specified location - async fn head(&self, location: &Path) -> ObjectStoreResult { - self.storage.head(location).await - } - - /// Delete the object at the specified location. - async fn delete(&self, location: &Path) -> ObjectStoreResult<()> { - self.storage.delete(location).await - } - - /// List all the objects with the given prefix. - /// - /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of - /// `foo/bar_baz/x`. - async fn list( - &self, - prefix: Option<&Path>, - ) -> ObjectStoreResult>> { - self.storage.list(prefix).await - } - - /// List all the objects with the given prefix and a location greater than `offset` - /// - /// Some stores, such as S3 and GCS, may be able to push `offset` down to reduce - /// the number of network requests required - async fn list_with_offset( - &self, - prefix: Option<&Path>, - offset: &Path, - ) -> ObjectStoreResult>> { - self.storage.list_with_offset(prefix, offset).await - } - - /// List objects with the given prefix and an implementation specific - /// delimiter. Returns common prefixes (directories) in addition to object - /// metadata. - /// - /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of - /// `foo/bar_baz/x`. - async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult { - self.storage.list_with_delimiter(prefix).await - } - - /// Copy an object from one path to another in the same object store. - /// - /// If there exists an object at the destination, it will be overwritten. - async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { - self.storage.copy(from, to).await - } - - /// Copy an object from one path to another, only if destination is empty. - /// - /// Will return an error if the destination already has an object. - async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { - self.storage.copy_if_not_exists(from, to).await - } - - /// Move an object from one path to another in the same object store. - /// - /// Will return an error if the destination already has an object. - async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { - self.storage.rename_if_not_exists(from, to).await - } - - async fn put_multipart( - &self, - location: &Path, - ) -> ObjectStoreResult<(MultipartId, Box)> { - self.storage.put_multipart(location).await - } - - async fn abort_multipart( - &self, - location: &Path, - multipart_id: &MultipartId, - ) -> ObjectStoreResult<()> { - self.storage.abort_multipart(location, multipart_id).await - } -} - -impl Serialize for DeltaObjectStore { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let mut seq = serializer.serialize_seq(None)?; - seq.serialize_element(&self.location.to_string())?; - seq.serialize_element(&self.options.0)?; - seq.end() - } -} - -impl<'de> Deserialize<'de> for DeltaObjectStore { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - struct DeltaObjectStoreVisitor {} - - impl<'de> Visitor<'de> for DeltaObjectStoreVisitor { - type Value = DeltaObjectStore; - - fn expecting(&self, formatter: &mut std::fmt::Formatter) -> fmt::Result { - formatter.write_str("struct DeltaObjectStore") - } - - fn visit_seq(self, mut seq: A) -> Result - where - A: SeqAccess<'de>, - { - let location_str: String = seq - .next_element()? - .ok_or_else(|| A::Error::invalid_length(0, &self))?; - let options: HashMap = seq - .next_element()? - .ok_or_else(|| A::Error::invalid_length(0, &self))?; - let location = Url::parse(&location_str).unwrap(); - let table = DeltaObjectStore::try_new(location, options) - .map_err(|_| A::Error::custom("Failed deserializing DeltaObjectStore"))?; - Ok(table) - } - } - - deserializer.deserialize_seq(DeltaObjectStoreVisitor {}) - } -} - -#[cfg(feature = "datafusion")] -#[cfg(test)] -mod tests { - use crate::storage::DeltaObjectStore; - use object_store::memory::InMemory; - use std::sync::Arc; - use url::Url; - - #[tokio::test] - async fn test_unique_object_store_url() { - // Just a dummy store to be passed for initialization - let inner_store = Arc::from(InMemory::new()); - - for (location_1, location_2) in [ - // Same scheme, no host, different path - ("file:///path/to/table_1", "file:///path/to/table_2"), - // Different scheme/host, same path - ("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"), - // Same scheme, different host, same path - ("s3://bucket_1/table_1", "s3://bucket_2/table_1"), - ] { - let url_1 = Url::parse(location_1).unwrap(); - let url_2 = Url::parse(location_2).unwrap(); - let store_1 = DeltaObjectStore::new(inner_store.clone(), url_1); - let store_2 = DeltaObjectStore::new(inner_store.clone(), url_2); - - assert_ne!( - store_1.object_store_url().as_str(), - store_2.object_store_url().as_str(), - ); - } - } -} +/// Sharable reference to [`ObjectStore`] +pub type ObjectStoreRef = Arc; diff --git a/crates/deltalake-core/src/storage/utils.rs b/crates/deltalake-core/src/storage/utils.rs index 7e516c7217..768664b97b 100644 --- a/crates/deltalake-core/src/storage/utils.rs +++ b/crates/deltalake-core/src/storage/utils.rs @@ -28,7 +28,7 @@ pub async fn copy_table( .with_storage_options(to_options.unwrap_or_default()) .with_allow_http(allow_http) .build_storage()?; - sync_stores(from_store, to_store).await + sync_stores(from_store.object_store(), to_store.object_store()).await } /// Synchronize the contents of two object stores diff --git a/crates/deltalake-core/src/table/builder.rs b/crates/deltalake-core/src/table/builder.rs index 92fc4851ad..2a4f8aca41 100644 --- a/crates/deltalake-core/src/table/builder.rs +++ b/crates/deltalake-core/src/table/builder.rs @@ -11,8 +11,9 @@ use url::Url; use super::DeltaTable; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::storage::config::StorageOptions; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; +use crate::logstore::default_logstore::DefaultLogStore; +use crate::logstore::{LogStoreConfig, LogStoreRef}; +use crate::storage::config::{self, StorageOptions}; #[allow(dead_code)] #[derive(Debug, thiserror::Error)] @@ -243,18 +244,24 @@ impl DeltaTableBuilder { } /// Build a delta storage backend for the given config - pub fn build_storage(self) -> DeltaResult { + pub fn build_storage(self) -> DeltaResult { match self.options.storage_backend { - Some((storage, location)) => Ok(Arc::new(DeltaObjectStore::new( - storage, - ensure_table_uri(location.as_str())?, - ))), + Some((storage, location)) => { + let location = ensure_table_uri(location.as_str())?; + Ok(Arc::new(DefaultLogStore::new( + Arc::new(storage), + LogStoreConfig { + location, + options: HashMap::new().into(), + }, + ))) + } None => { let location = ensure_table_uri(&self.options.table_uri)?; - Ok(Arc::new(DeltaObjectStore::try_new( + Ok(config::configure_log_store( location, self.storage_options(), - )?)) + )?) } } } diff --git a/crates/deltalake-core/src/table/mod.rs b/crates/deltalake-core/src/table/mod.rs index 2b011ff608..cd0f1808f5 100644 --- a/crates/deltalake-core/src/table/mod.rs +++ b/crates/deltalake-core/src/table/mod.rs @@ -5,7 +5,6 @@ use std::convert::TryFrom; use std::fmt; use std::fmt::Formatter; use std::io::{BufRead, BufReader, Cursor}; -use std::sync::Arc; use std::{cmp::max, cmp::Ordering, collections::HashSet}; use chrono::{DateTime, Utc}; @@ -26,10 +25,13 @@ use crate::kernel::{ Action, Add, CommitInfo, DataType, Format, Metadata, ReaderFeatures, Remove, StructType, WriterFeatures, }; +use crate::logstore::LogStoreConfig; +use crate::logstore::LogStoreRef; use crate::partitions::PartitionFilter; use crate::protocol::{ find_latest_check_point_for_version, get_last_checkpoint, ProtocolError, Stats, }; +use crate::storage::config::configure_log_store; use crate::storage::{commit_uri_from_version, ObjectStoreRef}; pub mod builder; @@ -249,8 +251,8 @@ pub struct DeltaTable { pub state: DeltaTableState, /// the load options used during load pub config: DeltaTableConfig, - /// object store to access log and data files - pub(crate) storage: ObjectStoreRef, + /// log store + pub(crate) log_store: LogStoreRef, /// file metadata for latest checkpoint last_check_point: Option, /// table versions associated with timestamps @@ -265,7 +267,7 @@ impl Serialize for DeltaTable { let mut seq = serializer.serialize_seq(None)?; seq.serialize_element(&self.state)?; seq.serialize_element(&self.config)?; - seq.serialize_element(self.storage.as_ref())?; + seq.serialize_element(self.log_store.config())?; seq.serialize_element(&self.last_check_point)?; seq.serialize_element(&self.version_timestamp)?; seq.end() @@ -296,9 +298,12 @@ impl<'de> Deserialize<'de> for DeltaTable { let config = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?; - let storage = seq + let storage_config: LogStoreConfig = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?; + let log_store = + configure_log_store(storage_config.location, storage_config.options) + .map_err(|_| A::Error::custom("Failed deserializing LogStore"))?; let last_check_point = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?; @@ -309,7 +314,7 @@ impl<'de> Deserialize<'de> for DeltaTable { let table = DeltaTable { state, config, - storage: Arc::new(storage), + log_store, last_check_point, version_timestamp, }; @@ -326,10 +331,10 @@ impl DeltaTable { /// /// NOTE: This is for advanced users. If you don't know why you need to use this method, please /// call one of the `open_table` helper methods instead. - pub fn new(storage: ObjectStoreRef, config: DeltaTableConfig) -> Self { + pub fn new(log_store: LogStoreRef, config: DeltaTableConfig) -> Self { Self { state: DeltaTableState::with_version(-1), - storage, + log_store, config, last_check_point: None, version_timestamp: HashMap::new(), @@ -341,10 +346,10 @@ impl DeltaTable { /// /// NOTE: This is for advanced users. If you don't know why you need to use this method, /// please call one of the `open_table` helper methods instead. - pub(crate) fn new_with_state(storage: ObjectStoreRef, state: DeltaTableState) -> Self { + pub(crate) fn new_with_state(log_store: LogStoreRef, state: DeltaTableState) -> Self { Self { state, - storage, + log_store, config: Default::default(), last_check_point: None, version_timestamp: HashMap::new(), @@ -353,18 +358,23 @@ impl DeltaTable { /// get a shared reference to the delta object store pub fn object_store(&self) -> ObjectStoreRef { - self.storage.clone() + self.log_store.object_store() } /// The URI of the underlying data pub fn table_uri(&self) -> String { - self.storage.root_uri() + self.log_store.root_uri() + } + + /// get a shared reference to the log store + pub fn log_store(&self) -> LogStoreRef { + self.log_store.clone() } /// Return the list of paths of given checkpoint. pub fn get_checkpoint_data_paths(&self, check_point: &CheckPoint) -> Vec { let checkpoint_prefix = format!("{:020}", check_point.version); - let log_path = self.storage.log_path(); + let log_path = self.log_store.log_path(); let mut checkpoint_data_paths = Vec::new(); match check_point.parts { @@ -399,7 +409,8 @@ impl DeltaTable { let mut current_delta_log_ver = i64::MAX; // Get file objects from table. - let mut stream = self.storage.list(Some(self.storage.log_path())).await?; + let storage = self.object_store(); + let mut stream = storage.list(Some(self.log_store.log_path())).await?; while let Some(obj_meta) = stream.next().await { let obj_meta = obj_meta?; @@ -422,54 +433,7 @@ impl DeltaTable { } /// returns the latest available version of the table pub async fn get_latest_version(&mut self) -> Result { - let version_start = match get_last_checkpoint(&self.storage).await { - Ok(last_check_point) => last_check_point.version, - Err(ProtocolError::CheckpointNotFound) => { - // no checkpoint - -1 - } - Err(e) => { - return Err(DeltaTableError::from(e)); - } - }; - - debug!("latest checkpoint version: {version_start}"); - - let version_start = max(self.version(), version_start); - - lazy_static! { - static ref DELTA_LOG_REGEX: Regex = - Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint).*$").unwrap(); - } - - // list files to find max version - let version = async { - let mut max_version: i64 = version_start; - let prefix = Some(self.storage.log_path()); - let offset_path = commit_uri_from_version(max_version); - let mut files = self.storage.list_with_offset(prefix, &offset_path).await?; - - while let Some(obj_meta) = files.next().await { - let obj_meta = obj_meta?; - if let Some(captures) = DELTA_LOG_REGEX.captures(obj_meta.location.as_ref()) { - let log_version = captures.get(1).unwrap().as_str().parse().unwrap(); - // listing may not be ordered - max_version = max(max_version, log_version); - // also cache timestamp for version, for faster time-travel - self.version_timestamp - .insert(log_version, obj_meta.last_modified.timestamp()); - } - } - - if max_version < 0 { - return Err(DeltaTableError::not_a_table(self.table_uri())); - } - - Ok::(max_version) - } - .await?; - - Ok(version) + self.log_store.get_latest_version(self.version()).await } /// Currently loaded version of the table @@ -490,12 +454,12 @@ impl DeltaTable { current_version: i64, ) -> Result { let next_version = current_version + 1; - let commit_uri = commit_uri_from_version(next_version); - let commit_log_bytes = self.storage.get(&commit_uri).await; - let commit_log_bytes = match commit_log_bytes { - Err(ObjectStoreError::NotFound { .. }) => return Ok(PeekCommit::UpToDate), + let commit_log_bytes = match self.log_store.read_commit_entry(next_version).await { + Ok(bytes) => Ok(bytes), + Err(DeltaTableError::ObjectStore { + source: ObjectStoreError::NotFound { .. }, + }) => return Ok(PeekCommit::UpToDate), Err(err) => Err(err), - Ok(result) => result.bytes().await, }?; let actions = Self::get_actions(next_version, commit_log_bytes).await; @@ -529,7 +493,7 @@ impl DeltaTable { /// loading the last checkpoint and incrementally applying each version since. #[cfg(any(feature = "parquet", feature = "parquet2"))] pub async fn update(&mut self) -> Result<(), DeltaTableError> { - match get_last_checkpoint(&self.storage).await { + match get_last_checkpoint(self.log_store.as_ref()).await { Ok(last_check_point) => { debug!("update with latest checkpoint {last_check_point:?}"); if Some(last_check_point) == self.last_check_point { @@ -574,13 +538,12 @@ impl DeltaTable { let buf_size = self.config.log_buffer_size; - let store = self.storage.clone(); + let log_store = self.log_store.clone(); let mut log_stream = futures::stream::iter(self.version() + 1..max_version + 1) .map(|version| { - let store = store.clone(); - let loc = commit_uri_from_version(version); + let log_store = log_store.clone(); async move { - let data = store.get(&loc).await?.bytes().await?; + let data = log_store.read_commit_entry(version).await?; let actions = Self::get_actions(version, data).await?; Ok((version, actions)) } @@ -616,7 +579,7 @@ impl DeltaTable { pub async fn load_version(&mut self, version: i64) -> Result<(), DeltaTableError> { // check if version is valid let commit_uri = commit_uri_from_version(version); - match self.storage.head(&commit_uri).await { + match self.object_store().head(&commit_uri).await { Ok(_) => {} Err(ObjectStoreError::NotFound { .. }) => { return Err(DeltaTableError::InvalidVersion(version)); @@ -628,7 +591,7 @@ impl DeltaTable { // 1. find latest checkpoint below version #[cfg(any(feature = "parquet", feature = "parquet2"))] - match find_latest_check_point_for_version(&self.storage, version).await? { + match find_latest_check_point_for_version(self.log_store.as_ref(), version).await? { Some(check_point) => { self.restore_checkpoint(check_point).await?; } @@ -652,7 +615,10 @@ impl DeltaTable { match self.version_timestamp.get(&version) { Some(ts) => Ok(*ts), None => { - let meta = self.storage.head(&commit_uri_from_version(version)).await?; + let meta = self + .object_store() + .head(&commit_uri_from_version(version)) + .await?; let ts = meta.last_modified.timestamp(); // also cache timestamp for version self.version_timestamp.insert(version, ts); @@ -746,7 +712,7 @@ impl DeltaTable { let files = self.get_files_by_partitions(filters)?; Ok(files .iter() - .map(|fname| self.storage.to_uri(fname)) + .map(|fname| self.log_store.to_uri(fname)) .collect()) } @@ -771,7 +737,7 @@ impl DeltaTable { pub fn get_file_uris(&self) -> impl Iterator + '_ { self.state .file_paths_iter() - .map(|path| self.storage.to_uri(&path)) + .map(|path| self.log_store.to_uri(&path)) } /// Returns statistics for files, in order diff --git a/crates/deltalake-core/src/table/state.rs b/crates/deltalake-core/src/table/state.rs index 26becd0703..8fa51c55fd 100644 --- a/crates/deltalake-core/src/table/state.rs +++ b/crates/deltalake-core/src/table/state.rs @@ -71,7 +71,7 @@ impl DeltaTableState { /// Construct a delta table state object from commit version. pub async fn from_commit(table: &DeltaTable, version: i64) -> Result { let commit_uri = commit_uri_from_version(version); - let commit_log_bytes = match table.storage.get(&commit_uri).await { + let commit_log_bytes = match table.object_store().get(&commit_uri).await { Ok(get) => Ok(get.bytes().await?), Err(ObjectStoreError::NotFound { .. }) => Err(ProtocolError::EndOfLog), Err(source) => Err(ProtocolError::ObjectStore { source }), @@ -161,7 +161,7 @@ impl DeltaTableState { let mut new_state = Self::with_version(check_point.version); for f in &checkpoint_data_paths { - let obj = table.storage.get(f).await?.bytes().await?; + let obj = table.object_store().get(f).await?.bytes().await?; new_state.process_checkpoint_bytes(obj, &table.config)?; } diff --git a/crates/deltalake-core/src/test_utils.rs b/crates/deltalake-core/src/test_utils.rs index 124ec0365b..ce121106c4 100644 --- a/crates/deltalake-core/src/test_utils.rs +++ b/crates/deltalake-core/src/test_utils.rs @@ -88,7 +88,7 @@ impl IntegrationContext { _ => DeltaTableBuilder::from_uri(store_uri) .with_allow_http(true) .build_storage()? - .storage_backend(), + .object_store(), }; Ok(Self { diff --git a/crates/deltalake-core/src/writer/json.rs b/crates/deltalake-core/src/writer/json.rs index 044ffc20e2..7fec11fad2 100644 --- a/crates/deltalake-core/src/writer/json.rs +++ b/crates/deltalake-core/src/writer/json.rs @@ -26,14 +26,14 @@ use crate::errors::DeltaTableError; use crate::kernel::{Add, StructType}; use crate::table::builder::DeltaTableBuilder; use crate::table::DeltaTableMetaData; +use crate::writer::utils::ShareableBuffer; use crate::DeltaTable; -use crate::{storage::DeltaObjectStore, writer::utils::ShareableBuffer}; type BadValue = (Value, ParquetError); /// Writes messages to a delta lake table. pub struct JsonWriter { - storage: Arc, + storage: Arc, arrow_schema_ref: Arc, writer_properties: WriterProperties, partition_columns: Vec, @@ -195,7 +195,7 @@ impl JsonWriter { .build(); Ok(Self { - storage, + storage: storage.object_store(), arrow_schema_ref: schema, writer_properties, partition_columns: partition_columns.unwrap_or_default(), @@ -218,7 +218,7 @@ impl JsonWriter { .build(); Ok(Self { - storage: table.storage.clone(), + storage: table.object_store(), arrow_schema_ref, writer_properties, partition_columns, diff --git a/crates/deltalake-core/src/writer/mod.rs b/crates/deltalake-core/src/writer/mod.rs index 478a0b11f2..3b73fe2ef6 100644 --- a/crates/deltalake-core/src/writer/mod.rs +++ b/crates/deltalake-core/src/writer/mod.rs @@ -146,7 +146,14 @@ pub trait DeltaWriter { partition_by, predicate: None, }; - let version = commit(table.storage.as_ref(), &adds, operation, &table.state, None).await?; + let version = commit( + table.log_store.as_ref(), + &adds, + operation, + &table.state, + None, + ) + .await?; table.update().await?; Ok(version) } diff --git a/crates/deltalake-core/src/writer/record_batch.rs b/crates/deltalake-core/src/writer/record_batch.rs index b673146907..49b5dfebc9 100644 --- a/crates/deltalake-core/src/writer/record_batch.rs +++ b/crates/deltalake-core/src/writer/record_batch.rs @@ -29,11 +29,11 @@ use crate::errors::DeltaTableError; use crate::kernel::{Add, StructType}; use crate::table::builder::DeltaTableBuilder; use crate::table::DeltaTableMetaData; -use crate::{storage::DeltaObjectStore, DeltaTable}; +use crate::DeltaTable; /// Writes messages to a delta lake table. pub struct RecordBatchWriter { - storage: Arc, + storage: Arc, arrow_schema_ref: Arc, writer_properties: WriterProperties, partition_columns: Vec, @@ -56,7 +56,8 @@ impl RecordBatchWriter { ) -> Result { let storage = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(storage_options.unwrap_or_default()) - .build_storage()?; + .build_storage()? + .object_store(); // Initialize writer properties for the underlying arrow writer let writer_properties = WriterProperties::builder() @@ -89,7 +90,7 @@ impl RecordBatchWriter { .build(); Ok(Self { - storage: table.storage.clone(), + storage: table.object_store(), arrow_schema_ref, writer_properties, partition_columns, diff --git a/crates/deltalake-core/src/writer/test_utils.rs b/crates/deltalake-core/src/writer/test_utils.rs index d67931c096..1daf9e407b 100644 --- a/crates/deltalake-core/src/writer/test_utils.rs +++ b/crates/deltalake-core/src/writer/test_utils.rs @@ -323,7 +323,7 @@ pub mod datafusion { use std::sync::Arc; pub async fn get_data(table: &DeltaTable) -> Vec { - let table = DeltaTable::new_with_state(table.object_store(), table.state.clone()); + let table = DeltaTable::new_with_state(table.log_store.clone(), table.state.clone()); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(table)).unwrap(); ctx.sql("select * from test") diff --git a/crates/deltalake-core/tests/command_optimize.rs b/crates/deltalake-core/tests/command_optimize.rs index a923d0064d..14f9d4c410 100644 --- a/crates/deltalake-core/tests/command_optimize.rs +++ b/crates/deltalake-core/tests/command_optimize.rs @@ -298,7 +298,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { let operation = DeltaOperation::Delete { predicate: None }; commit( - other_dt.object_store().as_ref(), + other_dt.log_store().as_ref(), &vec![Action::Remove(remove)], operation, &other_dt.state, @@ -306,9 +306,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { ) .await?; - let maybe_metrics = plan - .execute(dt.object_store(), &dt.state, 1, 20, None) - .await; + let maybe_metrics = plan.execute(dt.log_store(), &dt.state, 1, 20, None).await; assert!(maybe_metrics.is_err()); assert_eq!(dt.version(), version + 1); @@ -357,9 +355,7 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box> { ) .await?; - let metrics = plan - .execute(dt.object_store(), &dt.state, 1, 20, None) - .await?; + let metrics = plan.execute(dt.log_store(), &dt.state, 1, 20, None).await?; assert_eq!(metrics.num_files_added, 1); assert_eq!(metrics.num_files_removed, 2); @@ -399,7 +395,7 @@ async fn test_commit_interval() -> Result<(), Box> { let metrics = plan .execute( - dt.object_store(), + dt.log_store(), &dt.state, 1, 20, diff --git a/crates/deltalake-core/tests/command_vacuum.rs b/crates/deltalake-core/tests/command_vacuum.rs index 0007f479d5..51ff3217b3 100644 --- a/crates/deltalake-core/tests/command_vacuum.rs +++ b/crates/deltalake-core/tests/command_vacuum.rs @@ -297,6 +297,6 @@ async fn test_non_managed_files() { async fn is_deleted(context: &mut TestContext, path: &Path) -> bool { let backend = context.get_storage(); - let res = backend.head(path).await; + let res = backend.object_store().head(path).await; matches!(res, Err(ObjectStoreError::NotFound { .. })) } diff --git a/crates/deltalake-core/tests/commit_info_format.rs b/crates/deltalake-core/tests/commit_info_format.rs index de69397e32..a9d05e4c11 100644 --- a/crates/deltalake-core/tests/commit_info_format.rs +++ b/crates/deltalake-core/tests/commit_info_format.rs @@ -22,7 +22,7 @@ async fn test_operational_parameters() -> Result<(), Box> { }; commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, diff --git a/crates/deltalake-core/tests/common/mod.rs b/crates/deltalake-core/tests/common/mod.rs index 80df899323..af2e6e1a7f 100644 --- a/crates/deltalake-core/tests/common/mod.rs +++ b/crates/deltalake-core/tests/common/mod.rs @@ -2,10 +2,10 @@ use bytes::Bytes; use deltalake_core::kernel::{Action, Add, Remove, StructType}; +use deltalake_core::logstore::LogStore; use deltalake_core::operations::create::CreateBuilder; use deltalake_core::operations::transaction::commit; use deltalake_core::protocol::{DeltaOperation, SaveMode}; -use deltalake_core::storage::DeltaObjectStore; use deltalake_core::DeltaTable; use deltalake_core::DeltaTableBuilder; use object_store::{path::Path, ObjectStore}; @@ -28,7 +28,7 @@ pub mod s3; pub struct TestContext { /// The main table under test pub table: Option, - pub backend: Option>, + pub backend: Option>, /// The configuration used to create the backend. pub config: HashMap, /// An object when it is dropped will clean up any temporary resources created for the test @@ -55,7 +55,7 @@ impl TestContext { } } - pub fn get_storage(&mut self) -> Arc { + pub fn get_storage(&mut self) -> Arc { if self.backend.is_none() { self.backend = Some(self.new_storage()) } @@ -63,7 +63,7 @@ impl TestContext { self.backend.as_ref().unwrap().clone() } - fn new_storage(&self) -> Arc { + fn new_storage(&self) -> Arc { let config = self.config.clone(); let uri = config.get("URI").unwrap().to_string(); DeltaTableBuilder::from_uri(uri) @@ -82,9 +82,9 @@ impl TestContext { .iter() .map(|s| s.to_string()) .collect::>(); - let backend = self.new_storage(); + let log_store = self.new_storage(); CreateBuilder::new() - .with_object_store(backend) + .with_log_store(log_store) .with_table_name("delta-rs_test_table") .with_comment("Table created by delta-rs tests") .with_columns(schema.fields().clone()) @@ -149,7 +149,7 @@ pub async fn add_file( }; let actions = vec![Action::Add(add)]; commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, @@ -187,7 +187,7 @@ pub async fn remove_file( let operation = DeltaOperation::Delete { predicate: None }; let actions = vec![Action::Remove(remove)]; commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, diff --git a/crates/deltalake-core/tests/fs_common/mod.rs b/crates/deltalake-core/tests/fs_common/mod.rs index dc9ec2547a..73593f26b1 100644 --- a/crates/deltalake-core/tests/fs_common/mod.rs +++ b/crates/deltalake-core/tests/fs_common/mod.rs @@ -5,7 +5,8 @@ use deltalake_core::kernel::{ use deltalake_core::operations::create::CreateBuilder; use deltalake_core::operations::transaction::commit; use deltalake_core::protocol::{DeltaOperation, SaveMode}; -use deltalake_core::storage::{DeltaObjectStore, GetResult, ObjectStoreResult}; +use deltalake_core::storage::config::configure_store; +use deltalake_core::storage::{GetResult, ObjectStoreResult}; use deltalake_core::DeltaTable; use object_store::path::Path as StorePath; use object_store::ObjectStore; @@ -13,6 +14,7 @@ use serde_json::Value; use std::collections::HashMap; use std::fs; use std::path::Path; +use std::sync::Arc; use url::Url; use uuid::Uuid; @@ -119,7 +121,7 @@ pub async fn commit_actions( operation: DeltaOperation, ) -> i64 { let version = commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, @@ -133,7 +135,7 @@ pub async fn commit_actions( #[derive(Debug)] pub struct SlowStore { - inner: DeltaObjectStore, + inner: Arc, } impl std::fmt::Display for SlowStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -147,8 +149,9 @@ impl SlowStore { location: Url, options: impl Into + Clone, ) -> deltalake_core::DeltaResult { + let mut options = options.into(); Ok(Self { - inner: DeltaObjectStore::try_new(location, options).unwrap(), + inner: configure_store(&location, &mut options).unwrap(), }) } } diff --git a/crates/deltalake-core/tests/integration_checkpoint.rs b/crates/deltalake-core/tests/integration_checkpoint.rs index 9b5b0a73ff..56d253eb85 100644 --- a/crates/deltalake-core/tests/integration_checkpoint.rs +++ b/crates/deltalake-core/tests/integration_checkpoint.rs @@ -59,15 +59,12 @@ async fn cleanup_metadata_hdfs_test() -> TestResult { // test to run longer but reliable async fn cleanup_metadata_test(context: &IntegrationContext) -> TestResult { let table_uri = context.root_uri(); - let object_store = DeltaTableBuilder::from_uri(table_uri) + let log_store = DeltaTableBuilder::from_uri(table_uri) .with_allow_http(true) .build_storage()?; + let object_store = log_store.object_store(); - let log_path = |version| { - object_store - .log_path() - .child(format!("{:020}.json", version)) - }; + let log_path = |version| log_store.log_path().child(format!("{:020}.json", version)); // we don't need to actually populate files with content as cleanup works only with file's metadata object_store @@ -98,7 +95,7 @@ async fn cleanup_metadata_test(context: &IntegrationContext) -> TestResult { assert!(retention_timestamp > v1time.timestamp_millis()); assert!(retention_timestamp < v2time.timestamp_millis()); - let removed = cleanup_expired_logs_for(3, object_store.as_ref(), retention_timestamp).await?; + let removed = cleanup_expired_logs_for(3, log_store.as_ref(), retention_timestamp).await?; assert_eq!(removed, 2); assert!(object_store.head(&log_path(0)).await.is_err()); @@ -142,7 +139,7 @@ async fn test_issue_1420_cleanup_expired_logs_for() -> DeltaResult<()> { // Should delete v1 but not v2 or v2.checkpoint.parquet cleanup_expired_logs_for( table.version(), - table.object_store().as_ref(), + table.log_store().as_ref(), ts.timestamp_millis(), ) .await?; @@ -184,7 +181,7 @@ async fn test_issue_1420_cleanup_expired_logs_for() -> DeltaResult<()> { cleanup_expired_logs_for( table.version(), - table.object_store().as_ref(), + table.log_store().as_ref(), ts.timestamp_millis(), ) .await?; diff --git a/crates/deltalake-core/tests/integration_concurrent_writes.rs b/crates/deltalake-core/tests/integration_concurrent_writes.rs index bef44d0693..90dba7659a 100644 --- a/crates/deltalake-core/tests/integration_concurrent_writes.rs +++ b/crates/deltalake-core/tests/integration_concurrent_writes.rs @@ -168,7 +168,7 @@ impl Worker { default_row_commit_version: None, })]; let version = commit( - self.table.object_store().as_ref(), + self.table.log_store().as_ref(), &actions, operation, &self.table.state, diff --git a/crates/deltalake-core/tests/integration_datafusion.rs b/crates/deltalake-core/tests/integration_datafusion.rs index 3476de6839..7a9c38463f 100644 --- a/crates/deltalake-core/tests/integration_datafusion.rs +++ b/crates/deltalake-core/tests/integration_datafusion.rs @@ -34,11 +34,11 @@ use deltalake_core::delta_datafusion::{DeltaPhysicalCodec, DeltaScan}; use deltalake_core::kernel::{DataType, MapType, PrimitiveType, StructField, StructType}; use deltalake_core::operations::create::CreateBuilder; use deltalake_core::protocol::SaveMode; -use deltalake_core::storage::DeltaObjectStore; use deltalake_core::writer::{DeltaWriter, RecordBatchWriter}; use deltalake_core::{ open_table, operations::{write::WriteBuilder, DeltaOps}, + storage::config::configure_log_store, DeltaTable, DeltaTableError, }; use std::error::Error; @@ -211,7 +211,7 @@ mod local { // Trying to execute the write from the input plan without providing Datafusion with a session // state containing the referenced object store in the registry results in an error. assert!( - WriteBuilder::new(target_table.object_store(), target_table.state.clone()) + WriteBuilder::new(target_table.log_store(), target_table.state.clone()) .with_input_execution_plan(source_scan.clone()) .await .unwrap_err() @@ -227,19 +227,18 @@ mod local { .table_uri .clone(); let source_location = Url::parse(&source_uri).unwrap(); - let source_store = DeltaObjectStore::try_new(source_location, HashMap::new()).unwrap(); + let source_store = configure_log_store(source_location, HashMap::new()).unwrap(); let object_store_url = source_store.object_store_url(); let source_store_url: &Url = object_store_url.as_ref(); state .runtime_env() - .register_object_store(source_store_url, Arc::from(source_store)); + .register_object_store(source_store_url, source_store.object_store()); // Execute write to the target table with the proper state - let target_table = - WriteBuilder::new(target_table.object_store(), target_table.state.clone()) - .with_input_execution_plan(source_scan) - .with_input_session_state(state) - .await?; + let target_table = WriteBuilder::new(target_table.log_store(), target_table.state.clone()) + .with_input_execution_plan(source_scan) + .with_input_session_state(state) + .await?; ctx.register_table("target", Arc::new(target_table))?; // Check results diff --git a/crates/deltalake-core/tests/integration_object_store.rs b/crates/deltalake-core/tests/integration_object_store.rs index 3988cbdb6d..9c5720db85 100644 --- a/crates/deltalake-core/tests/integration_object_store.rs +++ b/crates/deltalake-core/tests/integration_object_store.rs @@ -81,7 +81,8 @@ async fn read_write_test_onelake(integration: StorageIntegration, path: &Path) - let delta_store = DeltaTableBuilder::from_uri(&context.root_uri()) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); //println!("{:#?}",delta_store); @@ -103,7 +104,8 @@ async fn test_object_store(integration: StorageIntegration, skip_copy: bool) -> let context = IntegrationContext::new(integration)?; let delta_store = DeltaTableBuilder::from_uri(context.root_uri()) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); put_get_delete_list(delta_store.as_ref()).await?; list_with_delimiter(delta_store.as_ref()).await?; @@ -482,7 +484,8 @@ async fn test_object_store_prefixes(integration: StorageIntegration) -> TestResu let rooturi = format!("{}/{}", context.root_uri(), prefix); let delta_store = DeltaTableBuilder::from_uri(&rooturi) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); let contents = Bytes::from("cats"); let path = Path::from("test"); diff --git a/crates/deltalake-core/tests/integration_read.rs b/crates/deltalake-core/tests/integration_read.rs index 0e17d34397..a15679a09c 100644 --- a/crates/deltalake-core/tests/integration_read.rs +++ b/crates/deltalake-core/tests/integration_read.rs @@ -155,7 +155,8 @@ async fn verify_store(integration: &IntegrationContext, root_path: &str) -> Test let table_uri = format!("{}/{}", integration.root_uri(), root_path); let storage = DeltaTableBuilder::from_uri(table_uri.clone()) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); let files = storage.list_with_delimiter(None).await?; assert_eq!( diff --git a/crates/deltalake-core/tests/repair_s3_rename_test.rs b/crates/deltalake-core/tests/repair_s3_rename_test.rs index 3157fab896..ecfba0fbbe 100644 --- a/crates/deltalake-core/tests/repair_s3_rename_test.rs +++ b/crates/deltalake-core/tests/repair_s3_rename_test.rs @@ -117,7 +117,7 @@ fn create_s3_backend( .with_allow_http(true) .build_storage() .unwrap() - .storage_backend(); + .object_store(); let delayed_store = DelayedObjectStore { inner: store, diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 1f558b82fe..a8bfb6668a 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -51,7 +51,8 @@ impl DeltaFileSystemHandler { let storage = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(options.clone().unwrap_or_default()) .build_storage() - .map_err(PythonError::from)?; + .map_err(PythonError::from)? + .object_store(); Ok(Self { inner: storage, rt: Arc::new(rt()?), diff --git a/python/src/lib.rs b/python/src/lib.rs index 923a06d159..a58d1f9aa6 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -274,7 +274,7 @@ impl RawDeltaTable { retention_hours: Option, enforce_retention_duration: bool, ) -> PyResult> { - let mut cmd = VacuumBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = VacuumBuilder::new(self._table.log_store(), self._table.state.clone()) .with_enforce_retention_duration(enforce_retention_duration) .with_dry_run(dry_run); if let Some(retention_period) = retention_hours { @@ -296,7 +296,7 @@ impl RawDeltaTable { writer_properties: Option>, safe_cast: bool, ) -> PyResult { - let mut cmd = UpdateBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = UpdateBuilder::new(self._table.log_store(), self._table.state.clone()) .with_safe_cast(safe_cast); if let Some(writer_props) = writer_properties { @@ -349,7 +349,7 @@ impl RawDeltaTable { max_concurrent_tasks: Option, min_commit_interval: Option, ) -> PyResult { - let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = OptimizeBuilder::new(self._table.log_store(), self._table.state.clone()) .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)); if let Some(size) = target_size { cmd = cmd.with_target_size(size); @@ -379,7 +379,7 @@ impl RawDeltaTable { max_spill_size: usize, min_commit_interval: Option, ) -> PyResult { - let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = OptimizeBuilder::new(self._table.log_store(), self._table.state.clone()) .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)) .with_max_spill_size(max_spill_size) .with_type(OptimizeType::ZOrder(z_order_columns)); @@ -446,7 +446,7 @@ impl RawDeltaTable { let source_df = ctx.read_table(table_provider).unwrap(); let mut cmd = MergeBuilder::new( - self._table.object_store(), + self._table.log_store(), self._table.state.clone(), predicate, source_df, @@ -608,7 +608,7 @@ impl RawDeltaTable { ignore_missing_files: bool, protocol_downgrade_allowed: bool, ) -> PyResult { - let mut cmd = RestoreBuilder::new(self._table.object_store(), self._table.state.clone()); + let mut cmd = RestoreBuilder::new(self._table.log_store(), self._table.state.clone()); if let Some(val) = target { if let Ok(version) = val.extract::() { cmd = cmd.with_version_to_restore(version) @@ -822,7 +822,7 @@ impl RawDeltaTable { partition_by: Some(partition_by), predicate: None, }; - let store = self._table.object_store(); + let store = self._table.log_store(); rt()? .block_on(commit( @@ -866,7 +866,7 @@ impl RawDeltaTable { /// Run the delete command on the delta table: delete records following a predicate and return the delete metrics. #[pyo3(signature = (predicate = None))] pub fn delete(&mut self, predicate: Option) -> PyResult { - let mut cmd = DeleteBuilder::new(self._table.object_store(), self._table.state.clone()); + let mut cmd = DeleteBuilder::new(self._table.log_store(), self._table.state.clone()); if let Some(predicate) = predicate { cmd = cmd.with_predicate(predicate); } @@ -881,9 +881,8 @@ impl RawDeltaTable { /// have been deleted or are malformed #[pyo3(signature = (dry_run = true))] pub fn repair(&mut self, dry_run: bool) -> PyResult { - let cmd = - FileSystemCheckBuilder::new(self._table.object_store(), self._table.state.clone()) - .with_dry_run(dry_run); + let cmd = FileSystemCheckBuilder::new(self._table.log_store(), self._table.state.clone()) + .with_dry_run(dry_run); let (table, metrics) = rt()? .block_on(cmd.into_future())