diff --git a/crates/deltalake-core/Cargo.toml b/crates/deltalake-core/Cargo.toml index b3a6178203..9fa259fa39 100644 --- a/crates/deltalake-core/Cargo.toml +++ b/crates/deltalake-core/Cargo.toml @@ -117,7 +117,7 @@ sqlparser = { version = "0.38", optional = true } fs_extra = { version = "1.3.0", optional = true } tempdir = { version = "0", optional = true } -dynamodb_lock = { version = "0.6.0", default-features = false, optional = true } +dynamodb_lock = { version = "0.6", default-features = false, optional = true } [dev-dependencies] dotenvy = "0" diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index 0147c250f9..38bf135739 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -71,8 +71,8 @@ use url::Url; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Add, DataType as DeltaDataType, Invariant, PrimitiveType}; +use crate::logstore::LogStoreRef; use crate::protocol::{self}; -use crate::storage::ObjectStoreRef; use crate::table::builder::ensure_table_uri; use crate::table::state::DeltaTableState; use crate::{open_table, open_table_with_storage_options, DeltaTable}; @@ -357,10 +357,10 @@ impl PruningStatistics for DeltaTable { // each delta table must register a specific object store, since paths are internally // handled relative to the table root. -pub(crate) fn register_store(store: ObjectStoreRef, env: Arc) { +pub(crate) fn register_store(store: LogStoreRef, env: Arc) { let object_store_url = store.object_store_url(); let url: &Url = object_store_url.as_ref(); - env.register_object_store(url, store); + env.register_object_store(url, store.object_store()); } pub(crate) fn logical_schema( @@ -467,7 +467,7 @@ pub struct DeltaScanConfig { #[derive(Debug)] pub(crate) struct DeltaScanBuilder<'a> { snapshot: &'a DeltaTableState, - object_store: ObjectStoreRef, + log_store: LogStoreRef, filter: Option, state: &'a SessionState, projection: Option<&'a Vec>, @@ -480,12 +480,12 @@ pub(crate) struct DeltaScanBuilder<'a> { impl<'a> DeltaScanBuilder<'a> { pub fn new( snapshot: &'a DeltaTableState, - object_store: ObjectStoreRef, + log_store: LogStoreRef, state: &'a SessionState, ) -> Self { DeltaScanBuilder { snapshot, - object_store, + log_store, filter: None, state, files: None, @@ -532,7 +532,7 @@ impl<'a> DeltaScanBuilder<'a> { Some(schema) => schema, None => { self.snapshot - .physical_arrow_schema(self.object_store.clone()) + .physical_arrow_schema(self.log_store.object_store()) .await? } }; @@ -632,7 +632,7 @@ impl<'a> DeltaScanBuilder<'a> { .create_physical_plan( self.state, FileScanConfig { - object_store_url: self.object_store.object_store_url(), + object_store_url: self.log_store.object_store_url(), file_schema, file_groups: file_groups.into_values().collect(), statistics: self.snapshot.datafusion_table_statistics(), @@ -647,9 +647,7 @@ impl<'a> DeltaScanBuilder<'a> { .await?; Ok(DeltaScan { - table_uri: ensure_table_uri(self.object_store.root_uri())? - .as_str() - .into(), + table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(), parquet_scan: scan, config, logical_schema, @@ -686,10 +684,10 @@ impl TableProvider for DeltaTable { filters: &[Expr], limit: Option, ) -> DataFusionResult> { - register_store(self.object_store(), session.runtime_env().clone()); + register_store(self.log_store(), session.runtime_env().clone()); let filter_expr = conjunction(filters.iter().cloned()); - let scan = DeltaScanBuilder::new(&self.state, self.object_store(), session) + let scan = DeltaScanBuilder::new(&self.state, self.log_store(), session) .with_projection(projection) .with_limit(limit) .with_filter(filter_expr) @@ -714,7 +712,7 @@ impl TableProvider for DeltaTable { /// A Delta table provider that enables additional metadata columns to be included during the scan pub struct DeltaTableProvider { snapshot: DeltaTableState, - store: ObjectStoreRef, + log_store: LogStoreRef, config: DeltaScanConfig, schema: Arc, } @@ -723,13 +721,13 @@ impl DeltaTableProvider { /// Build a DeltaTableProvider pub fn try_new( snapshot: DeltaTableState, - store: ObjectStoreRef, + log_store: LogStoreRef, config: DeltaScanConfig, ) -> DeltaResult { Ok(DeltaTableProvider { schema: logical_schema(&snapshot, &config)?, snapshot, - store, + log_store, config, }) } @@ -764,10 +762,10 @@ impl TableProvider for DeltaTableProvider { filters: &[Expr], limit: Option, ) -> DataFusionResult> { - register_store(self.store.clone(), session.runtime_env().clone()); + register_store(self.log_store.clone(), session.runtime_env().clone()); let filter_expr = conjunction(filters.iter().cloned()); - let scan = DeltaScanBuilder::new(&self.snapshot, self.store.clone(), session) + let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session) .with_projection(projection) .with_limit(limit) .with_filter(filter_expr) @@ -1462,7 +1460,7 @@ fn join_batches_with_add_actions( /// Determine which files contain a record that statisfies the predicate pub(crate) async fn find_files_scan<'a>( snapshot: &DeltaTableState, - store: ObjectStoreRef, + log_store: LogStoreRef, state: &SessionState, expression: Expr, ) -> DeltaResult> { @@ -1489,7 +1487,7 @@ pub(crate) async fn find_files_scan<'a>( // Add path column used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?); - let scan = DeltaScanBuilder::new(snapshot, store.clone(), state) + let scan = DeltaScanBuilder::new(snapshot, log_store, state) .with_filter(Some(expression.clone())) .with_projection(Some(&used_columns)) .with_scan_config(scan_config) @@ -1580,7 +1578,7 @@ pub(crate) async fn scan_memory_table( /// Finds files in a snapshot that match the provided predicate. pub async fn find_files<'a>( snapshot: &DeltaTableState, - object_store: ObjectStoreRef, + log_store: LogStoreRef, state: &SessionState, predicate: Option, ) -> DeltaResult { @@ -1608,8 +1606,7 @@ pub async fn find_files<'a>( }) } else { let candidates = - find_files_scan(snapshot, object_store.clone(), state, predicate.to_owned()) - .await?; + find_files_scan(snapshot, log_store, state, predicate.to_owned()).await?; Ok(FindFiles { candidates, @@ -1924,7 +1921,8 @@ mod tests { .build(&table.state) .unwrap(); - let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap(); + let log_store = table.log_store(); + let provider = DeltaTableProvider::try_new(table.state, log_store, config).unwrap(); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(provider)).unwrap(); @@ -1983,7 +1981,8 @@ mod tests { let config = DeltaScanConfigBuilder::new().build(&table.state).unwrap(); - let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap(); + let log_store = table.log_store(); + let provider = DeltaTableProvider::try_new(table.state, log_store, config).unwrap(); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(provider)).unwrap(); diff --git a/crates/deltalake-core/src/lib.rs b/crates/deltalake-core/src/lib.rs index d683b906dd..644da2dcac 100644 --- a/crates/deltalake-core/src/lib.rs +++ b/crates/deltalake-core/src/lib.rs @@ -85,6 +85,7 @@ compile_error!( pub mod data_catalog; pub mod errors; pub mod kernel; +pub mod logstore; pub mod operations; pub mod protocol; pub mod schema; diff --git a/crates/deltalake-core/src/logstore/default_logstore.rs b/crates/deltalake-core/src/logstore/default_logstore.rs new file mode 100644 index 0000000000..715d810535 --- /dev/null +++ b/crates/deltalake-core/src/logstore/default_logstore.rs @@ -0,0 +1,89 @@ +//! Default implementation of [`LogStore`] for storage backends with atomic put-if-absent operation + +use std::sync::Arc; + +use bytes::Bytes; +#[cfg(feature = "datafusion")] +use datafusion::execution::object_store::ObjectStoreUrl; +use object_store::{path::Path, ObjectStore}; +use url::Url; + +use super::{LogStore, LogStoreConfig}; +use crate::{ + operations::transaction::TransactionError, + storage::{ + config::{self, StorageOptions}, + ObjectStoreRef, + }, + DeltaResult, +}; + +/// Default [`LogStore`] implementation +#[derive(Debug, Clone)] +pub struct DefaultLogStore { + pub(crate) storage: Arc, + config: LogStoreConfig, +} + +impl DefaultLogStore { + /// Create a new instance of [`DefaultLogStore`] + /// + /// # Arguments + /// + /// * `storage` - A shared reference to an [`object_store::ObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located). + /// * `location` - A url corresponding to the storage location of `storage`. + pub fn new(storage: ObjectStoreRef, config: LogStoreConfig) -> Self { + Self { storage, config } + } + + /// Create log store + pub fn try_new(location: Url, options: impl Into + Clone) -> DeltaResult { + let mut options = options.into(); + let storage = config::configure_store(&location, &mut options)?; + Ok(Self { + storage: Arc::new(storage), + config: LogStoreConfig { location, options }, + }) + } +} + +#[async_trait::async_trait] +impl LogStore for DefaultLogStore { + async fn read_commit_entry(&self, version: i64) -> DeltaResult { + super::read_commit_entry(self.storage.as_ref(), version).await + } + + /// Tries to commit a prepared commit file. Returns [`TransactionError`] + /// if the given `version` already exists. The caller should handle the retry logic itself. + /// This is low-level transaction API. If user does not want to maintain the commit loop then + /// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction` + /// with retry logic. + async fn write_commit_entry( + &self, + version: i64, + tmp_commit: &Path, + ) -> Result<(), TransactionError> { + super::write_commit_entry(self.storage.as_ref(), version, tmp_commit).await + } + + async fn get_latest_version(&self, current_version: i64) -> DeltaResult { + super::get_latest_version(self, current_version).await + } + + fn object_store(&self) -> Arc { + self.storage.clone() + } + + fn to_uri(&self, location: &Path) -> String { + super::to_uri(&self.config.location, location) + } + + #[cfg(feature = "datafusion")] + fn object_store_url(&self) -> ObjectStoreUrl { + super::object_store_url(&self.config.location) + } + + fn config(&self) -> &LogStoreConfig { + &self.config + } +} diff --git a/crates/deltalake-core/src/logstore/mod.rs b/crates/deltalake-core/src/logstore/mod.rs new file mode 100644 index 0000000000..7f1009b1de --- /dev/null +++ b/crates/deltalake-core/src/logstore/mod.rs @@ -0,0 +1,325 @@ +//! Delta log store. +use futures::StreamExt; +use lazy_static::lazy_static; +use regex::Regex; +use serde::{ + de::{Error, SeqAccess, Visitor}, + ser::SerializeSeq, + Deserialize, Serialize, +}; +use std::{cmp::max, collections::HashMap, sync::Arc}; +use url::Url; + +use crate::{ + errors::DeltaResult, + operations::transaction::TransactionError, + protocol::{get_last_checkpoint, ProtocolError}, + storage::{commit_uri_from_version, config::StorageOptions}, + DeltaTableError, +}; +use bytes::Bytes; +use log::debug; +use object_store::{ + path::Path, Error as ObjectStoreError, ObjectStore, Result as ObjectStoreResult, +}; + +#[cfg(feature = "datafusion")] +use datafusion::datasource::object_store::ObjectStoreUrl; + +pub mod default_logstore; + +/// Sharable reference to [`LogStore`] +pub type LogStoreRef = Arc; + +lazy_static! { + static ref DELTA_LOG_PATH: Path = Path::from("_delta_log"); +} + +/// Configuration parameters for a log store +#[derive(Debug, Clone)] +pub struct LogStoreConfig { + /// url corresponding to the storage location. + pub location: Url, + /// Options used for configuring backend storage + pub options: StorageOptions, +} + +/// Trait for critical operations required to read and write commit entries in Delta logs. +/// +/// The correctness is predicated on the atomicity and durability guarantees of +/// the implementation of this interface. Specifically, +/// +/// - Atomic visibility: Any commit created via `write_commit_entry` must become visible atomically. +/// - Mutual exclusion: Only one writer must be able to create a commit for a specific version. +/// - Consistent listing: Once a commit entry for version `v` has been written, any future call to +/// `get_latest_version` must return a version >= `v`, i.e. the underlying file system entry must +/// become visible immediately. +#[async_trait::async_trait] +pub trait LogStore: Sync + Send { + /// Read data for commit entry with the given version. + async fn read_commit_entry(&self, version: i64) -> DeltaResult; + + /// Write list of actions as delta commit entry for given version. + /// + /// This operation can be retried with a higher version in case the write + /// fails with [`TransactionError::VersionAlreadyExists`]. + async fn write_commit_entry( + &self, + version: i64, + tmp_commit: &Path, + ) -> Result<(), TransactionError>; + + /// Find latest version currently stored in the delta log. + async fn get_latest_version(&self, start_version: i64) -> DeltaResult; + + /// Get underlying object store. + fn object_store(&self) -> Arc; + + /// [Path] to Delta log + fn to_uri(&self, location: &Path) -> String; + + /// Get fully qualified uri for table root + fn root_uri(&self) -> String { + self.to_uri(&Path::from("")) + } + + /// [Path] to Delta log + fn log_path(&self) -> &Path { + &DELTA_LOG_PATH + } + + /// Check if the location is a delta table location + async fn is_delta_table_location(&self) -> ObjectStoreResult { + // TODO We should really be using HEAD here, but this fails in windows tests + let object_store = self.object_store(); + let mut stream = object_store.list(Some(self.log_path())).await?; + if let Some(res) = stream.next().await { + match res { + Ok(_) => Ok(true), + Err(ObjectStoreError::NotFound { .. }) => Ok(false), + Err(err) => Err(err), + } + } else { + Ok(false) + } + } + + #[cfg(feature = "datafusion")] + /// Generate a unique enough url to identify the store in datafusion. + /// The DF object store registry only cares about the scheme and the host of the url for + /// registering/fetching. In our case the scheme is hard-coded to "delta-rs", so to get a unique + /// host we convert the location from this `LogStore` to a valid name, combining the + /// original scheme, host and path with invalid characters replaced. + fn object_store_url(&self) -> ObjectStoreUrl; + + /// Get configuration representing configured log store. + fn config(&self) -> &LogStoreConfig; +} + +// TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders +impl std::fmt::Debug for dyn LogStore + '_ { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "LogStore({})", self.root_uri()) + } +} + +impl Serialize for LogStoreConfig { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut seq = serializer.serialize_seq(None)?; + seq.serialize_element(&self.location.to_string())?; + seq.serialize_element(&self.options.0)?; + seq.end() + } +} + +impl<'de> Deserialize<'de> for LogStoreConfig { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct LogStoreConfigVisitor {} + + impl<'de> Visitor<'de> for LogStoreConfigVisitor { + type Value = LogStoreConfig; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("struct LogStoreConfig") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: SeqAccess<'de>, + { + let location_str: String = seq + .next_element()? + .ok_or_else(|| A::Error::invalid_length(0, &self))?; + let options: HashMap = seq + .next_element()? + .ok_or_else(|| A::Error::invalid_length(0, &self))?; + let location = Url::parse(&location_str).unwrap(); + Ok(LogStoreConfig { + location, + options: options.into(), + }) + } + } + + deserializer.deserialize_seq(LogStoreConfigVisitor {}) + } +} + +lazy_static! { + static ref DELTA_LOG_REGEX: Regex = Regex::new(r"(\d{20})\.(json|checkpoint).*$").unwrap(); +} + +fn to_uri(root: &Url, location: &Path) -> String { + match root.scheme() { + "file" => { + #[cfg(windows)] + let uri = format!( + "{}/{}", + root.as_ref().trim_end_matches('/'), + location.as_ref() + ) + .replace("file:///", ""); + #[cfg(unix)] + let uri = format!( + "{}/{}", + root.as_ref().trim_end_matches('/'), + location.as_ref() + ) + .replace("file://", ""); + uri + } + _ => { + if location.as_ref().is_empty() || location.as_ref() == "/" { + root.as_ref().to_string() + } else { + format!("{}/{}", root.as_ref(), location.as_ref()) + } + } + } +} + +#[cfg(feature = "datafusion")] +fn object_store_url(location: &Url) -> ObjectStoreUrl { + // we are certain, that the URL can be parsed, since + // we make sure when we are parsing the table uri + + use object_store::path::DELIMITER; + ObjectStoreUrl::parse(format!( + "delta-rs://{}-{}{}", + location.scheme(), + location.host_str().unwrap_or("-"), + location.path().replace(DELIMITER, "-").replace(':', "-") + )) + .expect("Invalid object store url.") +} + +/// Extract version from a file name in the delta log +pub fn extract_version_from_filename(name: &str) -> Option { + DELTA_LOG_REGEX + .captures(name) + .map(|captures| captures.get(1).unwrap().as_str().parse().unwrap()) +} + +async fn get_latest_version(log_store: &dyn LogStore, current_version: i64) -> DeltaResult { + let version_start = match get_last_checkpoint(log_store).await { + Ok(last_check_point) => last_check_point.version, + Err(ProtocolError::CheckpointNotFound) => { + // no checkpoint + -1 + } + Err(e) => { + return Err(DeltaTableError::from(e)); + } + }; + + debug!("latest checkpoint version: {version_start}"); + + let version_start = max(current_version, version_start); + + // list files to find max version + let version = async { + let mut max_version: i64 = version_start; + let prefix = Some(log_store.log_path()); + let offset_path = commit_uri_from_version(max_version); + let object_store = log_store.object_store(); + let mut files = object_store.list_with_offset(prefix, &offset_path).await?; + + while let Some(obj_meta) = files.next().await { + let obj_meta = obj_meta?; + if let Some(log_version) = extract_version_from_filename(obj_meta.location.as_ref()) { + max_version = max(max_version, log_version); + // also cache timestamp for version, for faster time-travel + // TODO: temporarily disabled because `version_timestamp` is not available in the [`LogStore`] + // self.version_timestamp + // .insert(log_version, obj_meta.last_modified.timestamp()); + } + } + + if max_version < 0 { + return Err(DeltaTableError::not_a_table(log_store.root_uri())); + } + + Ok::(max_version) + } + .await?; + Ok(version) +} + +async fn read_commit_entry(storage: &dyn ObjectStore, version: i64) -> DeltaResult { + let commit_uri = commit_uri_from_version(version); + let data = storage.get(&commit_uri).await?.bytes().await?; + Ok(data) +} + +async fn write_commit_entry( + storage: &dyn ObjectStore, + version: i64, + tmp_commit: &Path, +) -> Result<(), TransactionError> { + // move temporary commit file to delta log directory + // rely on storage to fail if the file already exists - + storage + .rename_if_not_exists(tmp_commit, &commit_uri_from_version(version)) + .await + .map_err(|err| -> TransactionError { + match err { + ObjectStoreError::AlreadyExists { .. } => { + TransactionError::VersionAlreadyExists(version) + } + _ => TransactionError::from(err), + } + })?; + Ok(()) +} + +#[cfg(feature = "datafusion")] +#[cfg(test)] +mod tests { + use url::Url; + + #[tokio::test] + async fn test_unique_object_store_url() { + for (location_1, location_2) in [ + // Same scheme, no host, different path + ("file:///path/to/table_1", "file:///path/to/table_2"), + // Different scheme/host, same path + ("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"), + // Same scheme, different host, same path + ("s3://bucket_1/table_1", "s3://bucket_2/table_1"), + ] { + let url_1 = Url::parse(location_1).unwrap(); + let url_2 = Url::parse(location_2).unwrap(); + + assert_ne!( + super::object_store_url(&url_1).as_str(), + super::object_store_url(&url_2).as_str(), + ); + } + } +} diff --git a/crates/deltalake-core/src/operations/create.rs b/crates/deltalake-core/src/operations/create.rs index 1dc9fdf8b2..71398faf97 100644 --- a/crates/deltalake-core/src/operations/create.rs +++ b/crates/deltalake-core/src/operations/create.rs @@ -11,8 +11,8 @@ use super::transaction::commit; use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, DataType, Metadata, Protocol, StructField, StructType}; +use crate::logstore::{LogStore, LogStoreRef}; use crate::protocol::{DeltaOperation, SaveMode}; -use crate::storage::DeltaObjectStore; use crate::table::builder::ensure_table_uri; use crate::table::config::DeltaConfigKey; use crate::table::DeltaTableMetaData; @@ -55,7 +55,7 @@ pub struct CreateBuilder { partition_columns: Option>, storage_options: Option>, actions: Vec, - object_store: Option>, + log_store: Option, configuration: HashMap>, metadata: Option>, } @@ -78,7 +78,7 @@ impl CreateBuilder { partition_columns: None, storage_options: None, actions: Default::default(), - object_store: None, + log_store: None, configuration: Default::default(), metadata: Default::default(), } @@ -198,9 +198,9 @@ impl CreateBuilder { self } - /// Provide a [`DeltaObjectStore`] instance, that points at table location - pub fn with_object_store(mut self, object_store: Arc) -> Self { - self.object_store = Some(object_store); + /// Provide a [`LogStore`] instance, that points at table location + pub fn with_log_store(mut self, log_store: Arc) -> Self { + self.log_store = Some(log_store); self } @@ -219,12 +219,10 @@ impl CreateBuilder { return Err(CreateError::MissingSchema.into()); } - let (storage_url, table) = if let Some(object_store) = self.object_store { + let (storage_url, table) = if let Some(log_store) = self.log_store { ( - ensure_table_uri(object_store.root_uri())? - .as_str() - .to_string(), - DeltaTable::new(object_store, Default::default()), + ensure_table_uri(log_store.root_uri())?.as_str().to_string(), + DeltaTable::new(log_store, Default::default()), ) } else { let storage_url = ensure_table_uri(self.location.ok_or(CreateError::MissingLocation)?)?; @@ -293,7 +291,8 @@ impl std::future::IntoFuture for CreateBuilder { Box::pin(async move { let mode = this.mode.clone(); let (mut table, actions, operation) = this.into_table_and_actions()?; - let table_state = if table.object_store().is_delta_table_location().await? { + let log_store = table.log_store(); + let table_state = if log_store.is_delta_table_location().await? { match mode { SaveMode::ErrorIfExists => return Err(CreateError::TableAlreadyExists.into()), SaveMode::Append => return Err(CreateError::AppendNotAllowed.into()), @@ -311,7 +310,7 @@ impl std::future::IntoFuture for CreateBuilder { }; let version = commit( - table.object_store().as_ref(), + table.log_store.as_ref(), &actions, operation, table_state, @@ -443,11 +442,11 @@ mod tests { assert_eq!(table.version(), 0); let first_id = table.get_metadata().unwrap().id.clone(); - let object_store = table.object_store(); + let log_store = table.log_store; // Check an error is raised when a table exists at location let table = CreateBuilder::new() - .with_object_store(object_store.clone()) + .with_log_store(log_store.clone()) .with_columns(schema.fields().clone()) .with_save_mode(SaveMode::ErrorIfExists) .await; @@ -455,7 +454,7 @@ mod tests { // Check current table is returned when ignore option is chosen. let table = CreateBuilder::new() - .with_object_store(object_store.clone()) + .with_log_store(log_store.clone()) .with_columns(schema.fields().clone()) .with_save_mode(SaveMode::Ignore) .await @@ -464,7 +463,7 @@ mod tests { // Check table is overwritten let table = CreateBuilder::new() - .with_object_store(object_store.clone()) + .with_log_store(log_store) .with_columns(schema.fields().iter().cloned()) .with_save_mode(SaveMode::Overwrite) .await diff --git a/crates/deltalake-core/src/operations/delete.rs b/crates/deltalake-core/src/operations/delete.rs index 98e4bd1ebe..bd361c9707 100644 --- a/crates/deltalake-core/src/operations/delete.rs +++ b/crates/deltalake-core/src/operations/delete.rs @@ -21,6 +21,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Instant, SystemTime, UNIX_EPOCH}; +use crate::logstore::LogStoreRef; use datafusion::execution::context::{SessionContext, SessionState}; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_plan::filter::FilterExec; @@ -40,7 +41,6 @@ use crate::kernel::{Action, Add, Remove}; use crate::operations::transaction::commit; use crate::operations::write::write_execution_plan; use crate::protocol::DeltaOperation; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -54,7 +54,7 @@ pub struct DeleteBuilder { /// A snapshot of the table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan state: Option, /// Properties passed to underlying parquet writer for when files are rewritten @@ -84,11 +84,11 @@ pub struct DeleteMetrics { impl DeleteBuilder { /// Create a new [`DeleteBuilder`] - pub fn new(object_store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { predicate: None, snapshot, - store: object_store, + log_store, state: None, app_metadata: None, writer_properties: None, @@ -125,7 +125,7 @@ impl DeleteBuilder { async fn excute_non_empty_expr( snapshot: &DeltaTableState, - object_store: ObjectStoreRef, + log_store: LogStoreRef, state: &SessionState, expression: &Expr, metrics: &mut DeleteMetrics, @@ -144,7 +144,7 @@ async fn excute_non_empty_expr( .partition_columns .clone(); - let scan = DeltaScanBuilder::new(snapshot, object_store.clone(), state) + let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), state) .with_files(rewrite) .build() .await?; @@ -167,7 +167,7 @@ async fn excute_non_empty_expr( state.clone(), filter.clone(), table_partition_cols.clone(), - object_store.clone(), + log_store.object_store(), Some(snapshot.table_config().target_file_size() as usize), None, writer_properties, @@ -187,7 +187,7 @@ async fn excute_non_empty_expr( async fn execute( predicate: Option, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, state: SessionState, writer_properties: Option, @@ -197,7 +197,7 @@ async fn execute( let mut metrics = DeleteMetrics::default(); let scan_start = Instant::now(); - let candidates = find_files(snapshot, object_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_micros(); let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); @@ -208,7 +208,7 @@ async fn execute( let write_start = Instant::now(); let add = excute_non_empty_expr( snapshot, - object_store.clone(), + log_store.clone(), &state, &predicate, &mut metrics, @@ -254,7 +254,7 @@ async fn execute( predicate: Some(fmt_expr_to_sql(&predicate)?), }; version = commit( - object_store.as_ref(), + log_store.as_ref(), &actions, operation, snapshot, @@ -278,7 +278,7 @@ impl std::future::IntoFuture for DeleteBuilder { let session = SessionContext::new(); // If a user provides their own their DF state then they must register the store themselves - register_store(this.store.clone(), session.runtime_env()); + register_store(this.log_store.clone(), session.runtime_env()); session.state() }); @@ -295,7 +295,7 @@ impl std::future::IntoFuture for DeleteBuilder { let ((actions, version), metrics) = execute( predicate, - this.store.clone(), + this.log_store.clone(), &this.snapshot, state, this.writer_properties, @@ -305,7 +305,7 @@ impl std::future::IntoFuture for DeleteBuilder { this.snapshot .merge(DeltaTableState::from_actions(actions, version)?, true, true); - let table = DeltaTable::new_with_state(this.store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/filesystem_check.rs b/crates/deltalake-core/src/operations/filesystem_check.rs index 31b261c4cb..b79f22b1f4 100644 --- a/crates/deltalake-core/src/operations/filesystem_check.rs +++ b/crates/deltalake-core/src/operations/filesystem_check.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; use std::fmt::Debug; -use std::sync::Arc; use std::time::SystemTime; use std::time::UNIX_EPOCH; @@ -27,9 +26,9 @@ use url::{ParseError, Url}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, Add, Remove}; +use crate::logstore::LogStoreRef; use crate::operations::transaction::commit; use crate::protocol::DeltaOperation; -use crate::storage::DeltaObjectStore; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -40,7 +39,7 @@ pub struct FileSystemCheckBuilder { /// A snapshot of the to-be-checked table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Don't remove actions to the table log. Just determine which files can be removed dry_run: bool, } @@ -56,7 +55,7 @@ pub struct FileSystemCheckMetrics { struct FileSystemCheckPlan { /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Files that no longer exists in undlying ObjectStore but have active add actions pub files_to_remove: Vec, } @@ -74,10 +73,10 @@ fn is_absolute_path(path: &str) -> DeltaResult { impl FileSystemCheckBuilder { /// Create a new [`FileSystemCheckBuilder`] - pub fn new(store: Arc, state: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, state: DeltaTableState) -> Self { FileSystemCheckBuilder { snapshot: state, - store, + log_store, dry_run: false, } } @@ -91,7 +90,7 @@ impl FileSystemCheckBuilder { async fn create_fsck_plan(&self) -> DeltaResult { let mut files_relative: HashMap<&str, &Add> = HashMap::with_capacity(self.snapshot.files().len()); - let store = self.store.clone(); + let log_store = self.log_store.clone(); for active in self.snapshot.files() { if is_absolute_path(&active.path)? { @@ -103,7 +102,8 @@ impl FileSystemCheckBuilder { } } - let mut files = self.store.list(None).await?; + let object_store = log_store.object_store(); + let mut files = object_store.list(None).await?; while let Some(result) = files.next().await { let file = result?; files_relative.remove(file.location.as_ref()); @@ -120,7 +120,7 @@ impl FileSystemCheckBuilder { Ok(FileSystemCheckPlan { files_to_remove, - store, + log_store, }) } } @@ -156,7 +156,7 @@ impl FileSystemCheckPlan { } commit( - self.store.as_ref(), + self.log_store.as_ref(), &actions, DeltaOperation::FileSystemCheck {}, snapshot, @@ -183,7 +183,7 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { let plan = this.create_fsck_plan().await?; if this.dry_run { return Ok(( - DeltaTable::new_with_state(this.store, this.snapshot), + DeltaTable::new_with_state(this.log_store, this.snapshot), FileSystemCheckMetrics { files_removed: plan.files_to_remove.into_iter().map(|f| f.path).collect(), dry_run: true, @@ -192,7 +192,7 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { } let metrics = plan.execute(&this.snapshot).await?; - let mut table = DeltaTable::new_with_state(this.store, this.snapshot); + let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot); table.update().await?; Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/load.rs b/crates/deltalake-core/src/operations/load.rs index 7baa59e3e1..1a4c5c4cc6 100644 --- a/crates/deltalake-core/src/operations/load.rs +++ b/crates/deltalake-core/src/operations/load.rs @@ -7,7 +7,7 @@ use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use futures::future::BoxFuture; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::storage::DeltaObjectStore; +use crate::logstore::LogStoreRef; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -16,17 +16,17 @@ pub struct LoadBuilder { /// A snapshot of the to-be-loaded table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// A sub-selection of columns to be loaded columns: Option>, } impl LoadBuilder { /// Create a new [`LoadBuilder`] - pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, columns: None, } } @@ -46,7 +46,7 @@ impl std::future::IntoFuture for LoadBuilder { let this = self; Box::pin(async move { - let table = DeltaTable::new_with_state(this.store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); let schema = table.state.arrow_schema()?; let projection = this .columns diff --git a/crates/deltalake-core/src/operations/merge.rs b/crates/deltalake-core/src/operations/merge.rs index 57621cb316..d38ddf0efb 100644 --- a/crates/deltalake-core/src/operations/merge.rs +++ b/crates/deltalake-core/src/operations/merge.rs @@ -68,10 +68,10 @@ use super::transaction::commit; use crate::delta_datafusion::expr::{fmt_expr_to_sql, parse_predicate_expression}; use crate::delta_datafusion::{register_store, DeltaScanBuilder}; use crate::kernel::{Action, Remove}; +use crate::logstore::LogStoreRef; use crate::operations::datafusion_utils::MetricObserverExec; use crate::operations::write::write_execution_plan; use crate::protocol::{DeltaOperation, MergePredicate}; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableError}; @@ -107,7 +107,7 @@ pub struct MergeBuilder { /// The source data source: DataFrame, /// Delta object store for handling data files - object_store: Arc, + log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan state: Option, /// Properties passed to underlying parquet writer for when files are rewritten @@ -122,7 +122,7 @@ pub struct MergeBuilder { impl MergeBuilder { /// Create a new [`MergeBuilder`] pub fn new>( - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: DeltaTableState, predicate: E, source: DataFrame, @@ -132,7 +132,7 @@ impl MergeBuilder { predicate, source, snapshot, - object_store, + log_store, source_alias: None, target_alias: None, state: None, @@ -561,7 +561,7 @@ pub struct MergeMetrics { async fn execute( predicate: Expression, source: DataFrame, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, state: SessionState, writer_properties: Option, @@ -590,7 +590,7 @@ async fn execute( // predicates also need to be considered when pruning let target = Arc::new( - DeltaScanBuilder::new(snapshot, object_store.clone(), &state) + DeltaScanBuilder::new(snapshot, log_store.clone(), &state) .with_schema(snapshot.input_schema()?) .build() .await?, @@ -1126,7 +1126,7 @@ async fn execute( state.clone(), projection.clone(), table_partition_cols.clone(), - object_store.clone(), + log_store.object_store().clone(), Some(snapshot.table_config().target_file_size() as usize), None, writer_properties, @@ -1188,7 +1188,7 @@ async fn execute( not_matched_by_source_predicates: not_match_source_operations, }; version = commit( - object_store.as_ref(), + log_store.as_ref(), &actions, operation, snapshot, @@ -1212,7 +1212,7 @@ impl std::future::IntoFuture for MergeBuilder { let session = SessionContext::new(); // If a user provides their own their DF state then they must register the store themselves - register_store(this.object_store.clone(), session.runtime_env()); + register_store(this.log_store.clone(), session.runtime_env()); session.state() }); @@ -1220,7 +1220,7 @@ impl std::future::IntoFuture for MergeBuilder { let ((actions, version), metrics) = execute( this.predicate, this.source, - this.object_store.clone(), + this.log_store.clone(), &this.snapshot, state, this.writer_properties, @@ -1236,7 +1236,7 @@ impl std::future::IntoFuture for MergeBuilder { this.snapshot .merge(DeltaTableState::from_actions(actions, version)?, true, true); - let table = DeltaTable::new_with_state(this.object_store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/mod.rs b/crates/deltalake-core/src/operations/mod.rs index 35301f067e..0406272a5b 100644 --- a/crates/deltalake-core/src/operations/mod.rs +++ b/crates/deltalake-core/src/operations/mod.rs @@ -107,60 +107,60 @@ impl DeltaOps { /// ``` #[must_use] pub fn create(self) -> CreateBuilder { - CreateBuilder::default().with_object_store(self.0.object_store()) + CreateBuilder::default().with_log_store(self.0.log_store) } /// Load data from a DeltaTable #[cfg(feature = "datafusion")] #[must_use] pub fn load(self) -> LoadBuilder { - LoadBuilder::new(self.0.object_store(), self.0.state) + LoadBuilder::new(self.0.log_store, self.0.state) } /// Write data to Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn write(self, batches: impl IntoIterator) -> WriteBuilder { - WriteBuilder::new(self.0.object_store(), self.0.state).with_input_batches(batches) + WriteBuilder::new(self.0.log_store, self.0.state).with_input_batches(batches) } /// Vacuum stale files from delta table #[must_use] pub fn vacuum(self) -> VacuumBuilder { - VacuumBuilder::new(self.0.object_store(), self.0.state) + VacuumBuilder::new(self.0.log_store, self.0.state) } /// Audit active files with files present on the filesystem #[must_use] pub fn filesystem_check(self) -> FileSystemCheckBuilder { - FileSystemCheckBuilder::new(self.0.object_store(), self.0.state) + FileSystemCheckBuilder::new(self.0.log_store, self.0.state) } /// Audit active files with files present on the filesystem #[cfg(all(feature = "arrow", feature = "parquet"))] #[must_use] pub fn optimize<'a>(self) -> OptimizeBuilder<'a> { - OptimizeBuilder::new(self.0.object_store(), self.0.state) + OptimizeBuilder::new(self.0.log_store, self.0.state) } /// Delete data from Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn delete(self) -> DeleteBuilder { - DeleteBuilder::new(self.0.object_store(), self.0.state) + DeleteBuilder::new(self.0.log_store, self.0.state) } /// Update data from Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn update(self) -> UpdateBuilder { - UpdateBuilder::new(self.0.object_store(), self.0.state) + UpdateBuilder::new(self.0.log_store, self.0.state) } /// Restore delta table to a specified version or datetime #[must_use] pub fn restore(self) -> RestoreBuilder { - RestoreBuilder::new(self.0.object_store(), self.0.state) + RestoreBuilder::new(self.0.log_store, self.0.state) } /// Update data from Delta table @@ -171,12 +171,7 @@ impl DeltaOps { source: datafusion::prelude::DataFrame, predicate: E, ) -> MergeBuilder { - MergeBuilder::new( - self.0.object_store(), - self.0.state, - predicate.into(), - source, - ) + MergeBuilder::new(self.0.log_store, self.0.state, predicate.into(), source) } } diff --git a/crates/deltalake-core/src/operations/optimize.rs b/crates/deltalake-core/src/operations/optimize.rs index 7feecd1e56..0467d43a8b 100644 --- a/crates/deltalake-core/src/operations/optimize.rs +++ b/crates/deltalake-core/src/operations/optimize.rs @@ -42,6 +42,7 @@ use super::transaction::commit; use super::writer::{PartitionWriter, PartitionWriterConfig}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, Remove}; +use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; @@ -155,7 +156,7 @@ pub struct OptimizeBuilder<'a> { /// A snapshot of the to-be-optimized table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: ObjectStoreRef, + log_store: LogStoreRef, /// Filters to select specific table partitions to be optimized filters: &'a [PartitionFilter], /// Desired file size after bin-packing files @@ -177,10 +178,10 @@ pub struct OptimizeBuilder<'a> { impl<'a> OptimizeBuilder<'a> { /// Create a new [`OptimizeBuilder`] - pub fn new(store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, filters: &[], target_size: None, writer_properties: None, @@ -274,14 +275,14 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { )?; let metrics = plan .execute( - this.store.clone(), + this.log_store.clone(), &this.snapshot, this.max_concurrent_tasks, this.max_spill_size, this.min_commit_interval, ) .await?; - let mut table = DeltaTable::new_with_state(this.store, this.snapshot); + let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot); table.update().await?; Ok((table, metrics)) }) @@ -584,7 +585,7 @@ impl MergePlan { /// Perform the operations outlined in the plan. pub async fn execute( mut self, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, max_concurrent_tasks: usize, #[allow(unused_variables)] // used behind a feature flag @@ -607,7 +608,7 @@ impl MergePlan { for file in files.iter() { debug!(" file {}", file.location); } - let object_store_ref = object_store.clone(); + let object_store_ref = log_store.object_store().clone(); let batch_stream = futures::stream::iter(files.clone()) .then(move |file| { let object_store_ref = object_store_ref.clone(); @@ -625,7 +626,7 @@ impl MergePlan { self.task_parameters.clone(), partition, files, - object_store.clone(), + log_store.object_store().clone(), futures::future::ready(Ok(batch_stream)), )); util::flatten_join_error(rewrite_result) @@ -635,13 +636,15 @@ impl MergePlan { #[cfg(not(feature = "datafusion"))] let exec_context = Arc::new(zorder::ZOrderExecContext::new( zorder_columns, - object_store.clone(), + log_store.object_store().clone(), // If there aren't enough bins to use all threads, then instead // use threads within the bins. This is important for the case where // the table is un-partitioned, in which case the entire table is just // one big bin. bins.len() <= num_cpus::get(), )); + let object_store = log_store.object_store().clone(); + #[cfg(feature = "datafusion")] let exec_context = Arc::new(zorder::ZOrderExecContext::new( zorder_columns, @@ -649,7 +652,6 @@ impl MergePlan { max_spill_size, )?); let task_parameters = self.task_parameters.clone(); - let object_store = object_store.clone(); futures::stream::iter(bins) .map(move |(partition, files)| { let batch_stream = Self::read_zorder(files.clone(), exec_context.clone()); @@ -671,7 +673,7 @@ impl MergePlan { let mut stream = stream.buffer_unordered(max_concurrent_tasks); - let mut table = DeltaTable::new_with_state(object_store.clone(), snapshot.clone()); + let mut table = DeltaTable::new_with_state(log_store.clone(), snapshot.clone()); // Actions buffered so far. These will be flushed either at the end // or when we reach the commit interval. @@ -720,7 +722,7 @@ impl MergePlan { //// TODO: Check for remove actions on optimized partitions. If a //// optimized partition was updated then abort the commit. Requires (#593). commit( - table.object_store().as_ref(), + table.log_store.as_ref(), &actions, self.task_parameters.input_parameters.clone().into(), table.get_state(), diff --git a/crates/deltalake-core/src/operations/restore.rs b/crates/deltalake-core/src/operations/restore.rs index a356b5b312..c391de6f04 100644 --- a/crates/deltalake-core/src/operations/restore.rs +++ b/crates/deltalake-core/src/operations/restore.rs @@ -10,7 +10,8 @@ //! 5) If ignore_missing_files option is false (default value) check availability of AddFile //! in file system. //! 6) Commit Protocol, all RemoveFile and AddFile actions -//! into delta log using `try_commit_transaction` (commit will be failed in case of parallel transaction) +//! into delta log using `LogStore::write_commit_entry` (commit will be failed in case of parallel transaction) +//! TODO: comment is outdated //! 7) If table was modified in parallel then ignore restore and raise exception. //! //! # Example @@ -31,9 +32,9 @@ use object_store::ObjectStore; use serde::Serialize; use crate::kernel::{Action, Add, Protocol, Remove}; -use crate::operations::transaction::{prepare_commit, try_commit_transaction, TransactionError}; +use crate::logstore::LogStoreRef; +use crate::operations::transaction::{prepare_commit, TransactionError}; use crate::protocol::DeltaOperation; -use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableConfig, DeltaTableError, ObjectStoreError}; @@ -74,7 +75,7 @@ pub struct RestoreBuilder { /// A snapshot of the to-be-restored table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: ObjectStoreRef, + log_store: LogStoreRef, /// Version to restore version_to_restore: Option, /// Datetime to restore @@ -87,10 +88,10 @@ pub struct RestoreBuilder { impl RestoreBuilder { /// Create a new [`RestoreBuilder`] - pub fn new(store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, version_to_restore: None, datetime_to_restore: None, ignore_missing_files: false, @@ -125,7 +126,7 @@ impl RestoreBuilder { } async fn execute( - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: DeltaTableState, version_to_restore: Option, datetime_to_restore: Option>, @@ -138,7 +139,7 @@ async fn execute( { return Err(DeltaTableError::from(RestoreError::InvalidRestoreParameter)); } - let mut table = DeltaTable::new(object_store.clone(), DeltaTableConfig::default()); + let mut table = DeltaTable::new(log_store.clone(), DeltaTableConfig::default()); let version = match datetime_to_restore { Some(datetime) => { table.load_with_datetime(datetime).await?; @@ -195,7 +196,7 @@ async fn execute( .collect(); if !ignore_missing_files { - check_files_available(object_store.as_ref(), &files_to_add).await?; + check_files_available(log_store.object_store().as_ref(), &files_to_add).await?; } let metrics = RestoreMetrics { @@ -238,7 +239,7 @@ async fn execute( actions.extend(files_to_remove.into_iter().map(Action::Remove)); let commit = prepare_commit( - object_store.as_ref(), + log_store.object_store().as_ref(), &DeltaOperation::Restore { version: version_to_restore, datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }), @@ -249,13 +250,13 @@ async fn execute( ) .await?; let commit_version = snapshot.version() + 1; - match try_commit_transaction(object_store.as_ref(), &commit, commit_version).await { + match log_store.write_commit_entry(commit_version, &commit).await { Ok(_) => {} Err(err @ TransactionError::VersionAlreadyExists(_)) => { return Err(err.into()); } Err(err) => { - object_store.delete(&commit).await?; + log_store.object_store().delete(&commit).await?; return Err(err.into()); } } @@ -291,7 +292,7 @@ impl std::future::IntoFuture for RestoreBuilder { Box::pin(async move { let metrics = execute( - this.store.clone(), + this.log_store.clone(), this.snapshot.clone(), this.version_to_restore, this.datetime_to_restore, @@ -299,7 +300,7 @@ impl std::future::IntoFuture for RestoreBuilder { this.protocol_downgrade_allowed, ) .await?; - let mut table = DeltaTable::new_with_state(this.store, this.snapshot); + let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot); table.update().await?; Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/transaction/mod.rs b/crates/deltalake-core/src/operations/transaction/mod.rs index c31c349fd7..e5e808d2d5 100644 --- a/crates/deltalake-core/src/operations/transaction/mod.rs +++ b/crates/deltalake-core/src/operations/transaction/mod.rs @@ -10,8 +10,8 @@ use serde_json::Value; use crate::crate_version; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, CommitInfo}; +use crate::logstore::LogStore; use crate::protocol::DeltaOperation; -use crate::storage::commit_uri_from_version; use crate::table::state::DeltaTableState; mod conflict_checker; @@ -130,6 +130,7 @@ pub(crate) fn get_commit_bytes( /// Low-level transaction API. Creates a temporary commit file. Once created, /// the transaction object could be dropped and the actual commit could be executed /// with `DeltaTable.try_commit_transaction`. +/// TODO: comment is outdated now pub(crate) async fn prepare_commit<'a>( storage: &dyn ObjectStore, operation: &DeltaOperation, @@ -150,42 +151,26 @@ pub(crate) async fn prepare_commit<'a>( Ok(path) } -/// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists] -/// if the given `version` already exists. The caller should handle the retry logic itself. -/// This is low-level transaction API. If user does not want to maintain the commit loop then -/// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction` -/// with retry logic. -pub(crate) async fn try_commit_transaction( - storage: &dyn ObjectStore, - tmp_commit: &Path, - version: i64, -) -> Result { - // move temporary commit file to delta log directory - // rely on storage to fail if the file already exists - - storage - .rename_if_not_exists(tmp_commit, &commit_uri_from_version(version)) - .await - .map_err(|err| match err { - ObjectStoreError::AlreadyExists { .. } => { - TransactionError::VersionAlreadyExists(version) - } - _ => TransactionError::from(err), - })?; - Ok(version) -} - /// Commit a transaction, with up to 15 retries. This is higher-level transaction API. /// /// Will error early if the a concurrent transaction has already been committed /// and conflicts with this transaction. pub async fn commit( - storage: &dyn ObjectStore, + log_store: &dyn LogStore, actions: &Vec, operation: DeltaOperation, read_snapshot: &DeltaTableState, app_metadata: Option>, ) -> DeltaResult { - commit_with_retries(storage, actions, operation, read_snapshot, app_metadata, 15).await + commit_with_retries( + log_store, + actions, + operation, + read_snapshot, + app_metadata, + 15, + ) + .await } /// Commit a transaction, with up configurable number of retries. This is higher-level transaction API. @@ -193,24 +178,35 @@ pub async fn commit( /// The function will error early if the a concurrent transaction has already been committed /// and conflicts with this transaction. pub async fn commit_with_retries( - storage: &dyn ObjectStore, + log_store: &dyn LogStore, actions: &Vec, operation: DeltaOperation, read_snapshot: &DeltaTableState, app_metadata: Option>, max_retries: usize, ) -> DeltaResult { - let tmp_commit = - prepare_commit(storage, &operation, actions, read_snapshot, app_metadata).await?; + let tmp_commit = prepare_commit( + log_store.object_store().as_ref(), + &operation, + actions, + read_snapshot, + app_metadata, + ) + .await?; let mut attempt_number = 1; while attempt_number <= max_retries { let version = read_snapshot.version() + attempt_number as i64; - match try_commit_transaction(storage, &tmp_commit, version).await { - Ok(version) => return Ok(version), + match log_store.write_commit_entry(version, &tmp_commit).await { + Ok(()) => return Ok(version), Err(TransactionError::VersionAlreadyExists(version)) => { - let summary = WinningCommitSummary::try_new(storage, version - 1, version).await?; + let summary = WinningCommitSummary::try_new( + log_store.object_store().as_ref(), + version - 1, + version, + ) + .await?; let transaction_info = TransactionInfo::try_new( read_snapshot, operation.read_predicate(), @@ -225,13 +221,13 @@ pub async fn commit_with_retries( attempt_number += 1; } Err(err) => { - storage.delete(&tmp_commit).await?; + log_store.object_store().delete(&tmp_commit).await?; return Err(TransactionError::CommitConflict(err).into()); } }; } Err(err) => { - storage.delete(&tmp_commit).await?; + log_store.object_store().delete(&tmp_commit).await?; return Err(err.into()); } } @@ -242,11 +238,16 @@ pub async fn commit_with_retries( #[cfg(all(test, feature = "parquet"))] mod tests { + use std::{collections::HashMap, sync::Arc}; + use self::test_utils::{create_remove_action, init_table_actions}; use super::*; - use crate::DeltaConfigKey; + use crate::{ + logstore::default_logstore::DefaultLogStore, storage::commit_uri_from_version, + DeltaConfigKey, + }; use object_store::memory::InMemory; - use std::collections::HashMap; + use url::Url; #[test] fn test_commit_uri_from_version() { @@ -290,18 +291,25 @@ mod tests { #[tokio::test] async fn test_try_commit_transaction() { - let store = InMemory::new(); + let store = Arc::new(InMemory::new()); + let url = Url::parse("mem://what/is/this").unwrap(); + let log_store = DefaultLogStore::new( + store.clone(), + crate::logstore::LogStoreConfig { + location: url, + options: HashMap::new().into(), + }, + ); let tmp_path = Path::from("_delta_log/tmp"); let version_path = Path::from("_delta_log/00000000000000000000.json"); store.put(&tmp_path, bytes::Bytes::new()).await.unwrap(); store.put(&version_path, bytes::Bytes::new()).await.unwrap(); + let res = log_store.write_commit_entry(0, &tmp_path).await; // fails if file version already exists - let res = try_commit_transaction(&store, &tmp_path, 0).await; assert!(res.is_err()); // succeeds for next version - let res = try_commit_transaction(&store, &tmp_path, 1).await.unwrap(); - assert_eq!(res, 1); + log_store.write_commit_entry(1, &tmp_path).await.unwrap(); } } diff --git a/crates/deltalake-core/src/operations/transaction/test_utils.rs b/crates/deltalake-core/src/operations/transaction/test_utils.rs index b52b1a1c7b..56b0894019 100644 --- a/crates/deltalake-core/src/operations/transaction/test_utils.rs +++ b/crates/deltalake-core/src/operations/transaction/test_utils.rs @@ -1,7 +1,7 @@ #![allow(unused)] use std::collections::HashMap; -use super::{prepare_commit, try_commit_transaction}; +use super::prepare_commit; use crate::kernel::{ Action, Add, CommitInfo, DataType, Metadata, PrimitiveType, Protocol, Remove, StructField, StructType, @@ -121,7 +121,7 @@ pub async fn create_initialized_table( partition_cols: &[String], configuration: Option>>, ) -> DeltaTable { - let storage = DeltaTableBuilder::from_uri("memory://") + let log_store = DeltaTableBuilder::from_uri("memory://") .build_storage() .unwrap(); let table_schema = StructType::new(vec![ @@ -161,11 +161,19 @@ pub async fn create_initialized_table( ), }; let actions = init_table_actions(None); - let prepared_commit = prepare_commit(storage.as_ref(), &operation, &actions, &state, None) - .await - .unwrap(); - try_commit_transaction(storage.as_ref(), &prepared_commit, 0) + let prepared_commit = prepare_commit( + log_store.object_store().as_ref(), + &operation, + &actions, + &state, + None, + ) + .await + .unwrap(); + + log_store + .write_commit_entry(0, &prepared_commit) .await .unwrap(); - DeltaTable::new_with_state(storage, state) + DeltaTable::new_with_state(log_store, state) } diff --git a/crates/deltalake-core/src/operations/update.rs b/crates/deltalake-core/src/operations/update.rs index bc458bdd53..559f28868d 100644 --- a/crates/deltalake-core/src/operations/update.rs +++ b/crates/deltalake-core/src/operations/update.rs @@ -49,8 +49,8 @@ use super::write::write_execution_plan; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; use crate::kernel::{Action, Remove}; +use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableError}; @@ -64,7 +64,7 @@ pub struct UpdateBuilder { /// A snapshot of the table's state snapshot: DeltaTableState, /// Delta object store for handling data files - object_store: Arc, + log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan state: Option, /// Properties passed to underlying parquet writer for when files are rewritten @@ -95,12 +95,12 @@ pub struct UpdateMetrics { impl UpdateBuilder { /// Create a new ['UpdateBuilder'] - pub fn new(object_store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { predicate: None, updates: HashMap::new(), snapshot, - object_store, + log_store, state: None, writer_properties: None, app_metadata: None, @@ -164,7 +164,7 @@ impl UpdateBuilder { async fn execute( predicate: Option, updates: HashMap, - object_store: ObjectStoreRef, + log_store: LogStoreRef, snapshot: &DeltaTableState, state: SessionState, writer_properties: Option, @@ -213,7 +213,7 @@ async fn execute( let table_partition_cols = current_metadata.partition_columns.clone(); let scan_start = Instant::now(); - let candidates = find_files(snapshot, object_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis() as u64; if candidates.candidates.is_empty() { @@ -225,7 +225,7 @@ async fn execute( let execution_props = state.execution_props(); // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches - let scan = DeltaScanBuilder::new(snapshot, object_store.clone(), &state) + let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) .with_files(&candidates.candidates) .build() .await?; @@ -357,7 +357,7 @@ async fn execute( state.clone(), projection.clone(), table_partition_cols.clone(), - object_store.clone(), + log_store.object_store().clone(), Some(snapshot.table_config().target_file_size() as usize), None, writer_properties, @@ -407,7 +407,7 @@ async fn execute( predicate: Some(fmt_expr_to_sql(&predicate)?), }; version = commit( - object_store.as_ref(), + log_store.as_ref(), &actions, operation, snapshot, @@ -430,7 +430,7 @@ impl std::future::IntoFuture for UpdateBuilder { let session = SessionContext::new(); // If a user provides their own their DF state then they must register the store themselves - register_store(this.object_store.clone(), session.runtime_env()); + register_store(this.log_store.clone(), session.runtime_env()); session.state() }); @@ -438,7 +438,7 @@ impl std::future::IntoFuture for UpdateBuilder { let ((actions, version), metrics) = execute( this.predicate, this.updates, - this.object_store.clone(), + this.log_store.clone(), &this.snapshot, state, this.writer_properties, @@ -449,7 +449,7 @@ impl std::future::IntoFuture for UpdateBuilder { this.snapshot .merge(DeltaTableState::from_actions(actions, version)?, true, true); - let table = DeltaTable::new_with_state(this.object_store, this.snapshot); + let table = DeltaTable::new_with_state(this.log_store, this.snapshot); Ok((table, metrics)) }) diff --git a/crates/deltalake-core/src/operations/vacuum.rs b/crates/deltalake-core/src/operations/vacuum.rs index 47f7c1d5c9..efdde55347 100644 --- a/crates/deltalake-core/src/operations/vacuum.rs +++ b/crates/deltalake-core/src/operations/vacuum.rs @@ -37,8 +37,8 @@ use super::transaction::commit; use crate::crate_version; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::Action; +use crate::logstore::{LogStore, LogStoreRef}; use crate::protocol::DeltaOperation; -use crate::storage::DeltaObjectStore; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -82,7 +82,7 @@ pub struct VacuumBuilder { /// A snapshot of the to-be-vacuumed table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// Period of stale files allowed. retention_period: Option, /// Validate the retention period is not below the retention period configured in the table @@ -125,10 +125,10 @@ pub struct VacuumEndOperationMetrics { /// Methods to specify various vacuum options and to execute the operation impl VacuumBuilder { /// Create a new [`VacuumBuilder`] - pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { VacuumBuilder { snapshot, - store, + log_store, retention_period: None, enforce_retention_duration: true, dry_run: false, @@ -184,8 +184,11 @@ impl VacuumBuilder { let mut files_to_delete = vec![]; let mut file_sizes = vec![]; - let mut all_files = self.store.list(None).await.map_err(DeltaTableError::from)?; - + let object_store = self.log_store.object_store(); + let mut all_files = object_store + .list(None) + .await + .map_err(DeltaTableError::from)?; let partition_columns = &self .snapshot .current_metadata() @@ -227,7 +230,7 @@ impl std::future::IntoFuture for VacuumBuilder { let plan = this.create_vacuum_plan().await?; if this.dry_run { return Ok(( - DeltaTable::new_with_state(this.store, this.snapshot), + DeltaTable::new_with_state(this.log_store, this.snapshot), VacuumMetrics { files_deleted: plan.files_to_delete.iter().map(|f| f.to_string()).collect(), dry_run: true, @@ -235,9 +238,11 @@ impl std::future::IntoFuture for VacuumBuilder { )); } - let metrics = plan.execute(&this.store, &this.snapshot).await?; + let metrics = plan + .execute(this.log_store.as_ref(), &this.snapshot) + .await?; Ok(( - DeltaTable::new_with_state(this.store, this.snapshot), + DeltaTable::new_with_state(this.log_store, this.snapshot), metrics, )) }) @@ -262,7 +267,7 @@ impl VacuumPlan { /// Execute the vacuum plan and delete files from underlying storage pub async fn execute( self, - store: &DeltaObjectStore, + store: &dyn LogStore, snapshot: &DeltaTableState, ) -> Result { if self.files_to_delete.is_empty() { @@ -311,6 +316,7 @@ impl VacuumPlan { .boxed(); let files_deleted = store + .object_store() .delete_stream(locations) .map(|res| match res { Ok(path) => Ok(path.to_string()), @@ -395,7 +401,7 @@ mod tests { async fn vacuum_delta_8_0_table() { let table = open_table("./tests/data/delta-0.8.0").await.unwrap(); - let result = VacuumBuilder::new(table.object_store(), table.state.clone()) + let result = VacuumBuilder::new(table.log_store, table.state.clone()) .with_retention_period(Duration::hours(1)) .with_dry_run(true) .await; @@ -403,7 +409,7 @@ mod tests { assert!(result.is_err()); let table = open_table("./tests/data/delta-0.8.0").await.unwrap(); - let (table, result) = VacuumBuilder::new(table.object_store(), table.state) + let (table, result) = VacuumBuilder::new(table.log_store, table.state) .with_retention_period(Duration::hours(0)) .with_dry_run(true) .with_enforce_retention_duration(false) @@ -415,7 +421,7 @@ mod tests { vec!["part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"] ); - let (table, result) = VacuumBuilder::new(table.object_store(), table.state) + let (table, result) = VacuumBuilder::new(table.log_store, table.state) .with_retention_period(Duration::hours(169)) .with_dry_run(true) .await @@ -432,7 +438,7 @@ mod tests { .as_secs() / 3600; let empty: Vec = Vec::new(); - let (_table, result) = VacuumBuilder::new(table.object_store(), table.state) + let (_table, result) = VacuumBuilder::new(table.log_store, table.state) .with_retention_period(Duration::hours(retention_hours as i64)) .with_dry_run(true) .await diff --git a/crates/deltalake-core/src/operations/write.rs b/crates/deltalake-core/src/operations/write.rs index 45bdaaeff5..dec4b7ced7 100644 --- a/crates/deltalake-core/src/operations/write.rs +++ b/crates/deltalake-core/src/operations/write.rs @@ -44,8 +44,9 @@ use super::{transaction::commit, CreateBuilder}; use crate::delta_datafusion::DeltaDataChecker; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, Add, Remove, StructType}; +use crate::logstore::LogStoreRef; use crate::protocol::{DeltaOperation, SaveMode}; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; +use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; use crate::writer::record_batch::divide_by_partition_values; use crate::writer::utils::PartitionPath; @@ -90,7 +91,7 @@ pub struct WriteBuilder { /// A snapshot of the to-be-loaded table's state snapshot: DeltaTableState, /// Delta object store for handling data files - store: Arc, + log_store: LogStoreRef, /// The input plan input: Option>, /// Datafusion session state relevant for executing the input plan @@ -117,10 +118,10 @@ pub struct WriteBuilder { impl WriteBuilder { /// Create a new [`WriteBuilder`] - pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { Self { snapshot, - store, + log_store, input: None, state: None, mode: SaveMode::Append, @@ -210,7 +211,7 @@ impl WriteBuilder { } async fn check_preconditions(&self) -> DeltaResult> { - match self.store.is_delta_table_location().await? { + match self.log_store.is_delta_table_location().await? { true => { let min_writer = self.snapshot.min_writer_version(); if min_writer > MAX_SUPPORTED_WRITER_VERSION { @@ -218,7 +219,7 @@ impl WriteBuilder { } else { match self.mode { SaveMode::ErrorIfExists => { - Err(WriteError::AlreadyExists(self.store.root_uri()).into()) + Err(WriteError::AlreadyExists(self.log_store.root_uri()).into()) } _ => Ok(vec![]), } @@ -236,7 +237,7 @@ impl WriteBuilder { Err(WriteError::MissingData) }?; let mut builder = CreateBuilder::new() - .with_object_store(self.store.clone()) + .with_log_store(self.log_store.clone()) .with_columns(schema.fields().clone()); if let Some(partition_columns) = self.partition_columns.as_ref() { builder = builder.with_partition_columns(partition_columns.clone()) @@ -356,7 +357,7 @@ impl std::future::IntoFuture for WriteBuilder { let schema = batches[0].schema(); let table_schema = this .snapshot - .physical_arrow_schema(this.store.clone()) + .physical_arrow_schema(this.log_store.object_store().clone()) .await .or_else(|_| this.snapshot.arrow_schema()) .unwrap_or(schema.clone()); @@ -418,7 +419,7 @@ impl std::future::IntoFuture for WriteBuilder { state, plan, partition_columns.clone(), - this.store.clone(), + this.log_store.object_store().clone(), this.target_file_size, this.write_batch_size, this.writer_properties, @@ -468,7 +469,7 @@ impl std::future::IntoFuture for WriteBuilder { }; let version = commit( - this.store.as_ref(), + this.log_store.as_ref(), &actions, DeltaOperation::Write { mode: this.mode, @@ -492,7 +493,7 @@ impl std::future::IntoFuture for WriteBuilder { // TODO should we build checkpoints based on config? - Ok(DeltaTable::new_with_state(this.store, this.snapshot)) + Ok(DeltaTable::new_with_state(this.log_store, this.snapshot)) }) } } diff --git a/crates/deltalake-core/src/operations/writer.rs b/crates/deltalake-core/src/operations/writer.rs index 0bba167e33..6d551ecb96 100644 --- a/crates/deltalake-core/src/operations/writer.rs +++ b/crates/deltalake-core/src/operations/writer.rs @@ -406,9 +406,10 @@ mod tests { #[tokio::test] async fn test_write_partition() { - let object_store = DeltaTableBuilder::from_uri("memory://") + let log_store = DeltaTableBuilder::from_uri("memory://") .build_storage() .unwrap(); + let object_store = log_store.object_store(); let batch = get_record_batch(None, false); // write single un-partitioned batch @@ -439,12 +440,13 @@ mod tests { let object_store = DeltaTableBuilder::from_uri("memory://") .build_storage() - .unwrap(); + .unwrap() + .object_store(); let properties = WriterProperties::builder() .set_max_row_group_size(1024) .build(); // configure small target file size and and row group size so we can observe multiple files written - let mut writer = get_writer(object_store.clone(), &batch, Some(properties), Some(10_000)); + let mut writer = get_writer(object_store, &batch, Some(properties), Some(10_000)); writer.write(&batch).await.unwrap(); // check that we have written more then once file, and no more then 1 is below target size diff --git a/crates/deltalake-core/src/protocol/checkpoints.rs b/crates/deltalake-core/src/protocol/checkpoints.rs index fc23c1d28b..f48fbfbd76 100644 --- a/crates/deltalake-core/src/protocol/checkpoints.rs +++ b/crates/deltalake-core/src/protocol/checkpoints.rs @@ -23,7 +23,7 @@ use crate::kernel::{ Action, Add as AddAction, DataType, Metadata, PrimitiveType, Protocol, StructField, StructType, Txn, }; -use crate::storage::DeltaObjectStore; +use crate::logstore::LogStore; use crate::table::state::DeltaTableState; use crate::table::{CheckPoint, CheckPointBuilder}; use crate::{open_table_with_version, DeltaTable}; @@ -70,7 +70,7 @@ pub const CHECKPOINT_RECORD_BATCH_SIZE: usize = 5000; /// Creates checkpoint at current table version pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), ProtocolError> { - create_checkpoint_for(table.version(), table.get_state(), table.storage.as_ref()).await?; + create_checkpoint_for(table.version(), table.get_state(), table.log_store.as_ref()).await?; Ok(()) } @@ -81,7 +81,7 @@ pub async fn cleanup_metadata(table: &DeltaTable) -> Result Result<(), ProtocolError> { // TODO: checkpoints _can_ be multi-part... haven't actually found a good reference for // an appropriate split point yet though so only writing a single part currently. // See https://github.com/delta-io/delta-rs/issues/288 - let last_checkpoint_path = storage.log_path().child("_last_checkpoint"); + let last_checkpoint_path = log_store.log_path().child("_last_checkpoint"); debug!("Writing parquet bytes to checkpoint buffer."); let (checkpoint, parquet_bytes) = parquet_bytes_from_state(state)?; let file_name = format!("{version:020}.checkpoint.parquet"); - let checkpoint_path = storage.log_path().child(file_name); + let checkpoint_path = log_store.log_path().child(file_name); + let object_store = log_store.object_store(); debug!("Writing checkpoint to {:?}.", checkpoint_path); - storage.put(&checkpoint_path, parquet_bytes).await?; + object_store.put(&checkpoint_path, parquet_bytes).await?; let last_checkpoint_content: Value = serde_json::to_value(checkpoint)?; let last_checkpoint_content = bytes::Bytes::from(serde_json::to_vec(&last_checkpoint_content)?); debug!("Writing _last_checkpoint to {:?}.", last_checkpoint_path); - storage + object_store .put(&last_checkpoint_path, last_checkpoint_content) .await?; @@ -146,7 +147,7 @@ pub async fn create_checkpoint_for( /// and less than the specified version. pub async fn cleanup_expired_logs_for( until_version: i64, - storage: &DeltaObjectStore, + log_store: &dyn LogStore, cutoff_timestamp: i64, ) -> Result { lazy_static! { @@ -157,10 +158,11 @@ pub async fn cleanup_expired_logs_for( // Feed a stream of candidate deletion files directly into the delete_stream // function to try to improve the speed of cleanup and reduce the need for // intermediate memory. - let deleted = storage + let object_store = log_store.object_store(); + let deleted = object_store .delete_stream( - storage - .list(Some(storage.log_path())) + object_store + .list(Some(log_store.log_path())) .await? // This predicate function will filter out any locations that don't // match the given timestamp range diff --git a/crates/deltalake-core/src/protocol/mod.rs b/crates/deltalake-core/src/protocol/mod.rs index 47e24cd959..8a5cd9f858 100644 --- a/crates/deltalake-core/src/protocol/mod.rs +++ b/crates/deltalake-core/src/protocol/mod.rs @@ -26,7 +26,7 @@ use std::mem::take; use crate::errors::DeltaResult; use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove}; -use crate::storage::ObjectStoreRef; +use crate::logstore::LogStore; use crate::table::CheckPoint; use crate::table::DeltaTableMetaData; @@ -601,14 +601,15 @@ pub enum OutputMode { } pub(crate) async fn get_last_checkpoint( - object_store: &ObjectStoreRef, + log_store: &dyn LogStore, ) -> Result { let last_checkpoint_path = Path::from_iter(["_delta_log", "_last_checkpoint"]); debug!("loading checkpoint from {last_checkpoint_path}"); + let object_store = log_store.object_store(); match object_store.get(&last_checkpoint_path).await { Ok(data) => Ok(serde_json::from_slice(&data.bytes().await?)?), Err(ObjectStoreError::NotFound { .. }) => { - match find_latest_check_point_for_version(object_store, i64::MAX).await { + match find_latest_check_point_for_version(log_store, i64::MAX).await { Ok(Some(cp)) => Ok(cp), _ => Err(ProtocolError::CheckpointNotFound), } @@ -618,7 +619,7 @@ pub(crate) async fn get_last_checkpoint( } pub(crate) async fn find_latest_check_point_for_version( - object_store: &ObjectStoreRef, + log_store: &dyn LogStore, version: i64, ) -> Result, ProtocolError> { lazy_static! { @@ -629,7 +630,8 @@ pub(crate) async fn find_latest_check_point_for_version( } let mut cp: Option = None; - let mut stream = object_store.list(Some(object_store.log_path())).await?; + let object_store = log_store.object_store(); + let mut stream = object_store.list(Some(log_store.log_path())).await?; while let Some(obj_meta) = stream.next().await { // Exit early if any objects can't be listed. diff --git a/crates/deltalake-core/src/storage/config.rs b/crates/deltalake-core/src/storage/config.rs index 1cba57b579..1ddf7d9c8a 100644 --- a/crates/deltalake-core/src/storage/config.rs +++ b/crates/deltalake-core/src/storage/config.rs @@ -12,6 +12,8 @@ use url::Url; use super::file::FileStorageBackend; use super::utils::str_is_truthy; use crate::errors::{DeltaResult, DeltaTableError}; +use crate::logstore::default_logstore::DefaultLogStore; +use crate::logstore::LogStoreRef; #[cfg(any(feature = "s3", feature = "s3-native-tls"))] use super::s3::{S3StorageBackend, S3StorageOptions}; @@ -225,7 +227,18 @@ impl From> for StorageOptions { } } -pub(crate) fn configure_store( +/// Configure a [`LogStoreRef`] for the given url and configuration +pub fn configure_log_store( + url: Url, + options: impl Into + Clone, +) -> DeltaResult { + let mut options = options.into(); + let (_scheme, _prefix) = ObjectStoreScheme::parse(&url, &mut options)?; + Ok(Arc::new(DefaultLogStore::try_new(url, options)?)) +} + +/// Configure an instance of an [`ObjectStore`] for the given url and configuration +pub fn configure_store( url: &Url, options: &mut StorageOptions, ) -> DeltaResult> { diff --git a/crates/deltalake-core/src/storage/mod.rs b/crates/deltalake-core/src/storage/mod.rs index c7309531ea..b571905f8b 100644 --- a/crates/deltalake-core/src/storage/mod.rs +++ b/crates/deltalake-core/src/storage/mod.rs @@ -1,22 +1,8 @@ //! Object storage backend abstraction layer for Delta Table transaction logs and data -use std::collections::HashMap; -use std::fmt; -use std::ops::Range; use std::sync::Arc; -use bytes::Bytes; -use futures::{stream::BoxStream, StreamExt}; use lazy_static::lazy_static; -use object_store::GetOptions; -use serde::de::{Error, SeqAccess, Visitor}; -use serde::ser::SerializeSeq; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use tokio::io::AsyncWrite; -use url::Url; - -use self::config::StorageOptions; -use crate::errors::DeltaResult; pub mod config; pub mod file; @@ -25,9 +11,6 @@ pub mod utils; #[cfg(any(feature = "s3", feature = "s3-native-tls"))] pub mod s3; -#[cfg(feature = "datafusion")] -use datafusion::datasource::object_store::ObjectStoreUrl; - pub use object_store::path::{Path, DELIMITER}; pub use object_store::{ DynObjectStore, Error as ObjectStoreError, GetResult, ListResult, MultipartId, ObjectMeta, @@ -45,342 +28,5 @@ pub(crate) fn commit_uri_from_version(version: i64) -> Path { DELTA_LOG_PATH.child(version.as_str()) } -/// Sharable reference to [`DeltaObjectStore`] -pub type ObjectStoreRef = Arc; - -/// Object Store implementation for DeltaTable. -/// -/// The [DeltaObjectStore] implements the [object_store::ObjectStore] trait to facilitate -/// interoperability with the larger rust / arrow ecosystem. Specifically it can directly -/// be registered as store within datafusion. -/// -/// The table root is treated as the root of the object store. -/// All [Path] are reported relative to the table root. -#[derive(Debug, Clone)] -pub struct DeltaObjectStore { - storage: Arc, - location: Url, - options: StorageOptions, -} - -impl std::fmt::Display for DeltaObjectStore { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "DeltaObjectStore({})", self.location.as_ref()) - } -} - -impl DeltaObjectStore { - /// Create a new instance of [`DeltaObjectStore`] - /// - /// # Arguments - /// - /// * `storage` - A shared reference to an [`object_store::ObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located). - /// * `location` - A url corresponding to the storage location of `storage`. - pub fn new(storage: Arc, location: Url) -> Self { - Self { - storage, - location, - options: HashMap::new().into(), - } - } - - /// Try creating a new instance of [`DeltaObjectStore`] - /// - /// # Arguments - /// - /// * `location` - A url pointing to the root of the delta table. - /// * `options` - Options passed to underlying builders. See [`with_storage_options`](crate::table::builder::DeltaTableBuilder::with_storage_options) - pub fn try_new(location: Url, options: impl Into + Clone) -> DeltaResult { - let mut options = options.into(); - let storage = config::configure_store(&location, &mut options)?; - Ok(Self { - storage, - location, - options, - }) - } - - /// Get a reference to the underlying storage backend - pub fn storage_backend(&self) -> Arc { - self.storage.clone() - } - - /// Storage options used to initialize storage backend - pub fn storage_options(&self) -> &StorageOptions { - &self.options - } - - /// Get fully qualified uri for table root - pub fn root_uri(&self) -> String { - self.to_uri(&Path::from("")) - } - - #[cfg(feature = "datafusion")] - /// Generate a unique enough url to identify the store in datafusion. - /// The DF object store registry only cares about the scheme and the host of the url for - /// registering/fetching. In our case the scheme is hard-coded to "delta-rs", so to get a unique - /// host we convert the location from this `DeltaObjectStore` to a valid name, combining the - /// original scheme, host and path with invalid characters replaced. - pub fn object_store_url(&self) -> ObjectStoreUrl { - // we are certain, that the URL can be parsed, since - // we make sure when we are parsing the table uri - ObjectStoreUrl::parse(format!( - "delta-rs://{}-{}{}", - self.location.scheme(), - self.location.host_str().unwrap_or("-"), - self.location - .path() - .replace(DELIMITER, "-") - .replace(':', "-") - )) - .expect("Invalid object store url.") - } - - /// [Path] to Delta log - pub fn log_path(&self) -> &Path { - &DELTA_LOG_PATH - } - - /// [Path] to Delta log - pub fn to_uri(&self, location: &Path) -> String { - match self.location.scheme() { - "file" => { - #[cfg(windows)] - let uri = format!( - "{}/{}", - self.location.as_ref().trim_end_matches('/'), - location.as_ref() - ) - .replace("file:///", ""); - #[cfg(unix)] - let uri = format!( - "{}/{}", - self.location.as_ref().trim_end_matches('/'), - location.as_ref() - ) - .replace("file://", ""); - uri - } - _ => { - if location.as_ref().is_empty() || location.as_ref() == "/" { - self.location.as_ref().to_string() - } else { - format!("{}/{}", self.location.as_ref(), location.as_ref()) - } - } - } - } - - /// Deletes object by `paths`. - pub async fn delete_batch(&self, paths: &[Path]) -> ObjectStoreResult<()> { - for path in paths { - match self.delete(path).await { - Ok(_) => continue, - Err(ObjectStoreError::NotFound { .. }) => continue, - Err(e) => return Err(e), - } - } - Ok(()) - } - - /// Check if the location is a delta table location - pub async fn is_delta_table_location(&self) -> ObjectStoreResult { - // TODO We should really be using HEAD here, but this fails in windows tests - let mut stream = self.list(Some(self.log_path())).await?; - if let Some(res) = stream.next().await { - match res { - Ok(_) => Ok(true), - Err(ObjectStoreError::NotFound { .. }) => Ok(false), - Err(err) => Err(err), - } - } else { - Ok(false) - } - } -} - -#[async_trait::async_trait] -impl ObjectStore for DeltaObjectStore { - /// Save the provided bytes to the specified location. - async fn put(&self, location: &Path, bytes: Bytes) -> ObjectStoreResult<()> { - self.storage.put(location, bytes).await - } - - /// Return the bytes that are stored at the specified location. - async fn get(&self, location: &Path) -> ObjectStoreResult { - self.storage.get(location).await - } - - /// Perform a get request with options - /// - /// Note: options.range will be ignored if [`object_store::GetResultPayload::File`] - async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { - self.storage.get_opts(location, options).await - } - - /// Return the bytes that are stored at the specified location - /// in the given byte range - async fn get_range(&self, location: &Path, range: Range) -> ObjectStoreResult { - self.storage.get_range(location, range).await - } - - /// Return the metadata for the specified location - async fn head(&self, location: &Path) -> ObjectStoreResult { - self.storage.head(location).await - } - - /// Delete the object at the specified location. - async fn delete(&self, location: &Path) -> ObjectStoreResult<()> { - self.storage.delete(location).await - } - - /// List all the objects with the given prefix. - /// - /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of - /// `foo/bar_baz/x`. - async fn list( - &self, - prefix: Option<&Path>, - ) -> ObjectStoreResult>> { - self.storage.list(prefix).await - } - - /// List all the objects with the given prefix and a location greater than `offset` - /// - /// Some stores, such as S3 and GCS, may be able to push `offset` down to reduce - /// the number of network requests required - async fn list_with_offset( - &self, - prefix: Option<&Path>, - offset: &Path, - ) -> ObjectStoreResult>> { - self.storage.list_with_offset(prefix, offset).await - } - - /// List objects with the given prefix and an implementation specific - /// delimiter. Returns common prefixes (directories) in addition to object - /// metadata. - /// - /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of - /// `foo/bar_baz/x`. - async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult { - self.storage.list_with_delimiter(prefix).await - } - - /// Copy an object from one path to another in the same object store. - /// - /// If there exists an object at the destination, it will be overwritten. - async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { - self.storage.copy(from, to).await - } - - /// Copy an object from one path to another, only if destination is empty. - /// - /// Will return an error if the destination already has an object. - async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { - self.storage.copy_if_not_exists(from, to).await - } - - /// Move an object from one path to another in the same object store. - /// - /// Will return an error if the destination already has an object. - async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { - self.storage.rename_if_not_exists(from, to).await - } - - async fn put_multipart( - &self, - location: &Path, - ) -> ObjectStoreResult<(MultipartId, Box)> { - self.storage.put_multipart(location).await - } - - async fn abort_multipart( - &self, - location: &Path, - multipart_id: &MultipartId, - ) -> ObjectStoreResult<()> { - self.storage.abort_multipart(location, multipart_id).await - } -} - -impl Serialize for DeltaObjectStore { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let mut seq = serializer.serialize_seq(None)?; - seq.serialize_element(&self.location.to_string())?; - seq.serialize_element(&self.options.0)?; - seq.end() - } -} - -impl<'de> Deserialize<'de> for DeltaObjectStore { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - struct DeltaObjectStoreVisitor {} - - impl<'de> Visitor<'de> for DeltaObjectStoreVisitor { - type Value = DeltaObjectStore; - - fn expecting(&self, formatter: &mut std::fmt::Formatter) -> fmt::Result { - formatter.write_str("struct DeltaObjectStore") - } - - fn visit_seq(self, mut seq: A) -> Result - where - A: SeqAccess<'de>, - { - let location_str: String = seq - .next_element()? - .ok_or_else(|| A::Error::invalid_length(0, &self))?; - let options: HashMap = seq - .next_element()? - .ok_or_else(|| A::Error::invalid_length(0, &self))?; - let location = Url::parse(&location_str).unwrap(); - let table = DeltaObjectStore::try_new(location, options) - .map_err(|_| A::Error::custom("Failed deserializing DeltaObjectStore"))?; - Ok(table) - } - } - - deserializer.deserialize_seq(DeltaObjectStoreVisitor {}) - } -} - -#[cfg(feature = "datafusion")] -#[cfg(test)] -mod tests { - use crate::storage::DeltaObjectStore; - use object_store::memory::InMemory; - use std::sync::Arc; - use url::Url; - - #[tokio::test] - async fn test_unique_object_store_url() { - // Just a dummy store to be passed for initialization - let inner_store = Arc::from(InMemory::new()); - - for (location_1, location_2) in [ - // Same scheme, no host, different path - ("file:///path/to/table_1", "file:///path/to/table_2"), - // Different scheme/host, same path - ("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"), - // Same scheme, different host, same path - ("s3://bucket_1/table_1", "s3://bucket_2/table_1"), - ] { - let url_1 = Url::parse(location_1).unwrap(); - let url_2 = Url::parse(location_2).unwrap(); - let store_1 = DeltaObjectStore::new(inner_store.clone(), url_1); - let store_2 = DeltaObjectStore::new(inner_store.clone(), url_2); - - assert_ne!( - store_1.object_store_url().as_str(), - store_2.object_store_url().as_str(), - ); - } - } -} +/// Sharable reference to [`ObjectStore`] +pub type ObjectStoreRef = Arc; diff --git a/crates/deltalake-core/src/storage/utils.rs b/crates/deltalake-core/src/storage/utils.rs index 7e516c7217..768664b97b 100644 --- a/crates/deltalake-core/src/storage/utils.rs +++ b/crates/deltalake-core/src/storage/utils.rs @@ -28,7 +28,7 @@ pub async fn copy_table( .with_storage_options(to_options.unwrap_or_default()) .with_allow_http(allow_http) .build_storage()?; - sync_stores(from_store, to_store).await + sync_stores(from_store.object_store(), to_store.object_store()).await } /// Synchronize the contents of two object stores diff --git a/crates/deltalake-core/src/table/builder.rs b/crates/deltalake-core/src/table/builder.rs index 92fc4851ad..2a4f8aca41 100644 --- a/crates/deltalake-core/src/table/builder.rs +++ b/crates/deltalake-core/src/table/builder.rs @@ -11,8 +11,9 @@ use url::Url; use super::DeltaTable; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::storage::config::StorageOptions; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; +use crate::logstore::default_logstore::DefaultLogStore; +use crate::logstore::{LogStoreConfig, LogStoreRef}; +use crate::storage::config::{self, StorageOptions}; #[allow(dead_code)] #[derive(Debug, thiserror::Error)] @@ -243,18 +244,24 @@ impl DeltaTableBuilder { } /// Build a delta storage backend for the given config - pub fn build_storage(self) -> DeltaResult { + pub fn build_storage(self) -> DeltaResult { match self.options.storage_backend { - Some((storage, location)) => Ok(Arc::new(DeltaObjectStore::new( - storage, - ensure_table_uri(location.as_str())?, - ))), + Some((storage, location)) => { + let location = ensure_table_uri(location.as_str())?; + Ok(Arc::new(DefaultLogStore::new( + Arc::new(storage), + LogStoreConfig { + location, + options: HashMap::new().into(), + }, + ))) + } None => { let location = ensure_table_uri(&self.options.table_uri)?; - Ok(Arc::new(DeltaObjectStore::try_new( + Ok(config::configure_log_store( location, self.storage_options(), - )?)) + )?) } } } diff --git a/crates/deltalake-core/src/table/mod.rs b/crates/deltalake-core/src/table/mod.rs index 2b011ff608..cd0f1808f5 100644 --- a/crates/deltalake-core/src/table/mod.rs +++ b/crates/deltalake-core/src/table/mod.rs @@ -5,7 +5,6 @@ use std::convert::TryFrom; use std::fmt; use std::fmt::Formatter; use std::io::{BufRead, BufReader, Cursor}; -use std::sync::Arc; use std::{cmp::max, cmp::Ordering, collections::HashSet}; use chrono::{DateTime, Utc}; @@ -26,10 +25,13 @@ use crate::kernel::{ Action, Add, CommitInfo, DataType, Format, Metadata, ReaderFeatures, Remove, StructType, WriterFeatures, }; +use crate::logstore::LogStoreConfig; +use crate::logstore::LogStoreRef; use crate::partitions::PartitionFilter; use crate::protocol::{ find_latest_check_point_for_version, get_last_checkpoint, ProtocolError, Stats, }; +use crate::storage::config::configure_log_store; use crate::storage::{commit_uri_from_version, ObjectStoreRef}; pub mod builder; @@ -249,8 +251,8 @@ pub struct DeltaTable { pub state: DeltaTableState, /// the load options used during load pub config: DeltaTableConfig, - /// object store to access log and data files - pub(crate) storage: ObjectStoreRef, + /// log store + pub(crate) log_store: LogStoreRef, /// file metadata for latest checkpoint last_check_point: Option, /// table versions associated with timestamps @@ -265,7 +267,7 @@ impl Serialize for DeltaTable { let mut seq = serializer.serialize_seq(None)?; seq.serialize_element(&self.state)?; seq.serialize_element(&self.config)?; - seq.serialize_element(self.storage.as_ref())?; + seq.serialize_element(self.log_store.config())?; seq.serialize_element(&self.last_check_point)?; seq.serialize_element(&self.version_timestamp)?; seq.end() @@ -296,9 +298,12 @@ impl<'de> Deserialize<'de> for DeltaTable { let config = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?; - let storage = seq + let storage_config: LogStoreConfig = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?; + let log_store = + configure_log_store(storage_config.location, storage_config.options) + .map_err(|_| A::Error::custom("Failed deserializing LogStore"))?; let last_check_point = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?; @@ -309,7 +314,7 @@ impl<'de> Deserialize<'de> for DeltaTable { let table = DeltaTable { state, config, - storage: Arc::new(storage), + log_store, last_check_point, version_timestamp, }; @@ -326,10 +331,10 @@ impl DeltaTable { /// /// NOTE: This is for advanced users. If you don't know why you need to use this method, please /// call one of the `open_table` helper methods instead. - pub fn new(storage: ObjectStoreRef, config: DeltaTableConfig) -> Self { + pub fn new(log_store: LogStoreRef, config: DeltaTableConfig) -> Self { Self { state: DeltaTableState::with_version(-1), - storage, + log_store, config, last_check_point: None, version_timestamp: HashMap::new(), @@ -341,10 +346,10 @@ impl DeltaTable { /// /// NOTE: This is for advanced users. If you don't know why you need to use this method, /// please call one of the `open_table` helper methods instead. - pub(crate) fn new_with_state(storage: ObjectStoreRef, state: DeltaTableState) -> Self { + pub(crate) fn new_with_state(log_store: LogStoreRef, state: DeltaTableState) -> Self { Self { state, - storage, + log_store, config: Default::default(), last_check_point: None, version_timestamp: HashMap::new(), @@ -353,18 +358,23 @@ impl DeltaTable { /// get a shared reference to the delta object store pub fn object_store(&self) -> ObjectStoreRef { - self.storage.clone() + self.log_store.object_store() } /// The URI of the underlying data pub fn table_uri(&self) -> String { - self.storage.root_uri() + self.log_store.root_uri() + } + + /// get a shared reference to the log store + pub fn log_store(&self) -> LogStoreRef { + self.log_store.clone() } /// Return the list of paths of given checkpoint. pub fn get_checkpoint_data_paths(&self, check_point: &CheckPoint) -> Vec { let checkpoint_prefix = format!("{:020}", check_point.version); - let log_path = self.storage.log_path(); + let log_path = self.log_store.log_path(); let mut checkpoint_data_paths = Vec::new(); match check_point.parts { @@ -399,7 +409,8 @@ impl DeltaTable { let mut current_delta_log_ver = i64::MAX; // Get file objects from table. - let mut stream = self.storage.list(Some(self.storage.log_path())).await?; + let storage = self.object_store(); + let mut stream = storage.list(Some(self.log_store.log_path())).await?; while let Some(obj_meta) = stream.next().await { let obj_meta = obj_meta?; @@ -422,54 +433,7 @@ impl DeltaTable { } /// returns the latest available version of the table pub async fn get_latest_version(&mut self) -> Result { - let version_start = match get_last_checkpoint(&self.storage).await { - Ok(last_check_point) => last_check_point.version, - Err(ProtocolError::CheckpointNotFound) => { - // no checkpoint - -1 - } - Err(e) => { - return Err(DeltaTableError::from(e)); - } - }; - - debug!("latest checkpoint version: {version_start}"); - - let version_start = max(self.version(), version_start); - - lazy_static! { - static ref DELTA_LOG_REGEX: Regex = - Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint).*$").unwrap(); - } - - // list files to find max version - let version = async { - let mut max_version: i64 = version_start; - let prefix = Some(self.storage.log_path()); - let offset_path = commit_uri_from_version(max_version); - let mut files = self.storage.list_with_offset(prefix, &offset_path).await?; - - while let Some(obj_meta) = files.next().await { - let obj_meta = obj_meta?; - if let Some(captures) = DELTA_LOG_REGEX.captures(obj_meta.location.as_ref()) { - let log_version = captures.get(1).unwrap().as_str().parse().unwrap(); - // listing may not be ordered - max_version = max(max_version, log_version); - // also cache timestamp for version, for faster time-travel - self.version_timestamp - .insert(log_version, obj_meta.last_modified.timestamp()); - } - } - - if max_version < 0 { - return Err(DeltaTableError::not_a_table(self.table_uri())); - } - - Ok::(max_version) - } - .await?; - - Ok(version) + self.log_store.get_latest_version(self.version()).await } /// Currently loaded version of the table @@ -490,12 +454,12 @@ impl DeltaTable { current_version: i64, ) -> Result { let next_version = current_version + 1; - let commit_uri = commit_uri_from_version(next_version); - let commit_log_bytes = self.storage.get(&commit_uri).await; - let commit_log_bytes = match commit_log_bytes { - Err(ObjectStoreError::NotFound { .. }) => return Ok(PeekCommit::UpToDate), + let commit_log_bytes = match self.log_store.read_commit_entry(next_version).await { + Ok(bytes) => Ok(bytes), + Err(DeltaTableError::ObjectStore { + source: ObjectStoreError::NotFound { .. }, + }) => return Ok(PeekCommit::UpToDate), Err(err) => Err(err), - Ok(result) => result.bytes().await, }?; let actions = Self::get_actions(next_version, commit_log_bytes).await; @@ -529,7 +493,7 @@ impl DeltaTable { /// loading the last checkpoint and incrementally applying each version since. #[cfg(any(feature = "parquet", feature = "parquet2"))] pub async fn update(&mut self) -> Result<(), DeltaTableError> { - match get_last_checkpoint(&self.storage).await { + match get_last_checkpoint(self.log_store.as_ref()).await { Ok(last_check_point) => { debug!("update with latest checkpoint {last_check_point:?}"); if Some(last_check_point) == self.last_check_point { @@ -574,13 +538,12 @@ impl DeltaTable { let buf_size = self.config.log_buffer_size; - let store = self.storage.clone(); + let log_store = self.log_store.clone(); let mut log_stream = futures::stream::iter(self.version() + 1..max_version + 1) .map(|version| { - let store = store.clone(); - let loc = commit_uri_from_version(version); + let log_store = log_store.clone(); async move { - let data = store.get(&loc).await?.bytes().await?; + let data = log_store.read_commit_entry(version).await?; let actions = Self::get_actions(version, data).await?; Ok((version, actions)) } @@ -616,7 +579,7 @@ impl DeltaTable { pub async fn load_version(&mut self, version: i64) -> Result<(), DeltaTableError> { // check if version is valid let commit_uri = commit_uri_from_version(version); - match self.storage.head(&commit_uri).await { + match self.object_store().head(&commit_uri).await { Ok(_) => {} Err(ObjectStoreError::NotFound { .. }) => { return Err(DeltaTableError::InvalidVersion(version)); @@ -628,7 +591,7 @@ impl DeltaTable { // 1. find latest checkpoint below version #[cfg(any(feature = "parquet", feature = "parquet2"))] - match find_latest_check_point_for_version(&self.storage, version).await? { + match find_latest_check_point_for_version(self.log_store.as_ref(), version).await? { Some(check_point) => { self.restore_checkpoint(check_point).await?; } @@ -652,7 +615,10 @@ impl DeltaTable { match self.version_timestamp.get(&version) { Some(ts) => Ok(*ts), None => { - let meta = self.storage.head(&commit_uri_from_version(version)).await?; + let meta = self + .object_store() + .head(&commit_uri_from_version(version)) + .await?; let ts = meta.last_modified.timestamp(); // also cache timestamp for version self.version_timestamp.insert(version, ts); @@ -746,7 +712,7 @@ impl DeltaTable { let files = self.get_files_by_partitions(filters)?; Ok(files .iter() - .map(|fname| self.storage.to_uri(fname)) + .map(|fname| self.log_store.to_uri(fname)) .collect()) } @@ -771,7 +737,7 @@ impl DeltaTable { pub fn get_file_uris(&self) -> impl Iterator + '_ { self.state .file_paths_iter() - .map(|path| self.storage.to_uri(&path)) + .map(|path| self.log_store.to_uri(&path)) } /// Returns statistics for files, in order diff --git a/crates/deltalake-core/src/table/state.rs b/crates/deltalake-core/src/table/state.rs index 26becd0703..8fa51c55fd 100644 --- a/crates/deltalake-core/src/table/state.rs +++ b/crates/deltalake-core/src/table/state.rs @@ -71,7 +71,7 @@ impl DeltaTableState { /// Construct a delta table state object from commit version. pub async fn from_commit(table: &DeltaTable, version: i64) -> Result { let commit_uri = commit_uri_from_version(version); - let commit_log_bytes = match table.storage.get(&commit_uri).await { + let commit_log_bytes = match table.object_store().get(&commit_uri).await { Ok(get) => Ok(get.bytes().await?), Err(ObjectStoreError::NotFound { .. }) => Err(ProtocolError::EndOfLog), Err(source) => Err(ProtocolError::ObjectStore { source }), @@ -161,7 +161,7 @@ impl DeltaTableState { let mut new_state = Self::with_version(check_point.version); for f in &checkpoint_data_paths { - let obj = table.storage.get(f).await?.bytes().await?; + let obj = table.object_store().get(f).await?.bytes().await?; new_state.process_checkpoint_bytes(obj, &table.config)?; } diff --git a/crates/deltalake-core/src/test_utils.rs b/crates/deltalake-core/src/test_utils.rs index 124ec0365b..ce121106c4 100644 --- a/crates/deltalake-core/src/test_utils.rs +++ b/crates/deltalake-core/src/test_utils.rs @@ -88,7 +88,7 @@ impl IntegrationContext { _ => DeltaTableBuilder::from_uri(store_uri) .with_allow_http(true) .build_storage()? - .storage_backend(), + .object_store(), }; Ok(Self { diff --git a/crates/deltalake-core/src/writer/json.rs b/crates/deltalake-core/src/writer/json.rs index 044ffc20e2..7fec11fad2 100644 --- a/crates/deltalake-core/src/writer/json.rs +++ b/crates/deltalake-core/src/writer/json.rs @@ -26,14 +26,14 @@ use crate::errors::DeltaTableError; use crate::kernel::{Add, StructType}; use crate::table::builder::DeltaTableBuilder; use crate::table::DeltaTableMetaData; +use crate::writer::utils::ShareableBuffer; use crate::DeltaTable; -use crate::{storage::DeltaObjectStore, writer::utils::ShareableBuffer}; type BadValue = (Value, ParquetError); /// Writes messages to a delta lake table. pub struct JsonWriter { - storage: Arc, + storage: Arc, arrow_schema_ref: Arc, writer_properties: WriterProperties, partition_columns: Vec, @@ -195,7 +195,7 @@ impl JsonWriter { .build(); Ok(Self { - storage, + storage: storage.object_store(), arrow_schema_ref: schema, writer_properties, partition_columns: partition_columns.unwrap_or_default(), @@ -218,7 +218,7 @@ impl JsonWriter { .build(); Ok(Self { - storage: table.storage.clone(), + storage: table.object_store(), arrow_schema_ref, writer_properties, partition_columns, diff --git a/crates/deltalake-core/src/writer/mod.rs b/crates/deltalake-core/src/writer/mod.rs index 478a0b11f2..3b73fe2ef6 100644 --- a/crates/deltalake-core/src/writer/mod.rs +++ b/crates/deltalake-core/src/writer/mod.rs @@ -146,7 +146,14 @@ pub trait DeltaWriter { partition_by, predicate: None, }; - let version = commit(table.storage.as_ref(), &adds, operation, &table.state, None).await?; + let version = commit( + table.log_store.as_ref(), + &adds, + operation, + &table.state, + None, + ) + .await?; table.update().await?; Ok(version) } diff --git a/crates/deltalake-core/src/writer/record_batch.rs b/crates/deltalake-core/src/writer/record_batch.rs index b673146907..49b5dfebc9 100644 --- a/crates/deltalake-core/src/writer/record_batch.rs +++ b/crates/deltalake-core/src/writer/record_batch.rs @@ -29,11 +29,11 @@ use crate::errors::DeltaTableError; use crate::kernel::{Add, StructType}; use crate::table::builder::DeltaTableBuilder; use crate::table::DeltaTableMetaData; -use crate::{storage::DeltaObjectStore, DeltaTable}; +use crate::DeltaTable; /// Writes messages to a delta lake table. pub struct RecordBatchWriter { - storage: Arc, + storage: Arc, arrow_schema_ref: Arc, writer_properties: WriterProperties, partition_columns: Vec, @@ -56,7 +56,8 @@ impl RecordBatchWriter { ) -> Result { let storage = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(storage_options.unwrap_or_default()) - .build_storage()?; + .build_storage()? + .object_store(); // Initialize writer properties for the underlying arrow writer let writer_properties = WriterProperties::builder() @@ -89,7 +90,7 @@ impl RecordBatchWriter { .build(); Ok(Self { - storage: table.storage.clone(), + storage: table.object_store(), arrow_schema_ref, writer_properties, partition_columns, diff --git a/crates/deltalake-core/src/writer/test_utils.rs b/crates/deltalake-core/src/writer/test_utils.rs index d67931c096..1daf9e407b 100644 --- a/crates/deltalake-core/src/writer/test_utils.rs +++ b/crates/deltalake-core/src/writer/test_utils.rs @@ -323,7 +323,7 @@ pub mod datafusion { use std::sync::Arc; pub async fn get_data(table: &DeltaTable) -> Vec { - let table = DeltaTable::new_with_state(table.object_store(), table.state.clone()); + let table = DeltaTable::new_with_state(table.log_store.clone(), table.state.clone()); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(table)).unwrap(); ctx.sql("select * from test") diff --git a/crates/deltalake-core/tests/command_optimize.rs b/crates/deltalake-core/tests/command_optimize.rs index a923d0064d..14f9d4c410 100644 --- a/crates/deltalake-core/tests/command_optimize.rs +++ b/crates/deltalake-core/tests/command_optimize.rs @@ -298,7 +298,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { let operation = DeltaOperation::Delete { predicate: None }; commit( - other_dt.object_store().as_ref(), + other_dt.log_store().as_ref(), &vec![Action::Remove(remove)], operation, &other_dt.state, @@ -306,9 +306,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { ) .await?; - let maybe_metrics = plan - .execute(dt.object_store(), &dt.state, 1, 20, None) - .await; + let maybe_metrics = plan.execute(dt.log_store(), &dt.state, 1, 20, None).await; assert!(maybe_metrics.is_err()); assert_eq!(dt.version(), version + 1); @@ -357,9 +355,7 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box> { ) .await?; - let metrics = plan - .execute(dt.object_store(), &dt.state, 1, 20, None) - .await?; + let metrics = plan.execute(dt.log_store(), &dt.state, 1, 20, None).await?; assert_eq!(metrics.num_files_added, 1); assert_eq!(metrics.num_files_removed, 2); @@ -399,7 +395,7 @@ async fn test_commit_interval() -> Result<(), Box> { let metrics = plan .execute( - dt.object_store(), + dt.log_store(), &dt.state, 1, 20, diff --git a/crates/deltalake-core/tests/command_vacuum.rs b/crates/deltalake-core/tests/command_vacuum.rs index 0007f479d5..51ff3217b3 100644 --- a/crates/deltalake-core/tests/command_vacuum.rs +++ b/crates/deltalake-core/tests/command_vacuum.rs @@ -297,6 +297,6 @@ async fn test_non_managed_files() { async fn is_deleted(context: &mut TestContext, path: &Path) -> bool { let backend = context.get_storage(); - let res = backend.head(path).await; + let res = backend.object_store().head(path).await; matches!(res, Err(ObjectStoreError::NotFound { .. })) } diff --git a/crates/deltalake-core/tests/commit_info_format.rs b/crates/deltalake-core/tests/commit_info_format.rs index de69397e32..a9d05e4c11 100644 --- a/crates/deltalake-core/tests/commit_info_format.rs +++ b/crates/deltalake-core/tests/commit_info_format.rs @@ -22,7 +22,7 @@ async fn test_operational_parameters() -> Result<(), Box> { }; commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, diff --git a/crates/deltalake-core/tests/common/mod.rs b/crates/deltalake-core/tests/common/mod.rs index 80df899323..af2e6e1a7f 100644 --- a/crates/deltalake-core/tests/common/mod.rs +++ b/crates/deltalake-core/tests/common/mod.rs @@ -2,10 +2,10 @@ use bytes::Bytes; use deltalake_core::kernel::{Action, Add, Remove, StructType}; +use deltalake_core::logstore::LogStore; use deltalake_core::operations::create::CreateBuilder; use deltalake_core::operations::transaction::commit; use deltalake_core::protocol::{DeltaOperation, SaveMode}; -use deltalake_core::storage::DeltaObjectStore; use deltalake_core::DeltaTable; use deltalake_core::DeltaTableBuilder; use object_store::{path::Path, ObjectStore}; @@ -28,7 +28,7 @@ pub mod s3; pub struct TestContext { /// The main table under test pub table: Option, - pub backend: Option>, + pub backend: Option>, /// The configuration used to create the backend. pub config: HashMap, /// An object when it is dropped will clean up any temporary resources created for the test @@ -55,7 +55,7 @@ impl TestContext { } } - pub fn get_storage(&mut self) -> Arc { + pub fn get_storage(&mut self) -> Arc { if self.backend.is_none() { self.backend = Some(self.new_storage()) } @@ -63,7 +63,7 @@ impl TestContext { self.backend.as_ref().unwrap().clone() } - fn new_storage(&self) -> Arc { + fn new_storage(&self) -> Arc { let config = self.config.clone(); let uri = config.get("URI").unwrap().to_string(); DeltaTableBuilder::from_uri(uri) @@ -82,9 +82,9 @@ impl TestContext { .iter() .map(|s| s.to_string()) .collect::>(); - let backend = self.new_storage(); + let log_store = self.new_storage(); CreateBuilder::new() - .with_object_store(backend) + .with_log_store(log_store) .with_table_name("delta-rs_test_table") .with_comment("Table created by delta-rs tests") .with_columns(schema.fields().clone()) @@ -149,7 +149,7 @@ pub async fn add_file( }; let actions = vec![Action::Add(add)]; commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, @@ -187,7 +187,7 @@ pub async fn remove_file( let operation = DeltaOperation::Delete { predicate: None }; let actions = vec![Action::Remove(remove)]; commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, diff --git a/crates/deltalake-core/tests/fs_common/mod.rs b/crates/deltalake-core/tests/fs_common/mod.rs index dc9ec2547a..73593f26b1 100644 --- a/crates/deltalake-core/tests/fs_common/mod.rs +++ b/crates/deltalake-core/tests/fs_common/mod.rs @@ -5,7 +5,8 @@ use deltalake_core::kernel::{ use deltalake_core::operations::create::CreateBuilder; use deltalake_core::operations::transaction::commit; use deltalake_core::protocol::{DeltaOperation, SaveMode}; -use deltalake_core::storage::{DeltaObjectStore, GetResult, ObjectStoreResult}; +use deltalake_core::storage::config::configure_store; +use deltalake_core::storage::{GetResult, ObjectStoreResult}; use deltalake_core::DeltaTable; use object_store::path::Path as StorePath; use object_store::ObjectStore; @@ -13,6 +14,7 @@ use serde_json::Value; use std::collections::HashMap; use std::fs; use std::path::Path; +use std::sync::Arc; use url::Url; use uuid::Uuid; @@ -119,7 +121,7 @@ pub async fn commit_actions( operation: DeltaOperation, ) -> i64 { let version = commit( - table.object_store().as_ref(), + table.log_store().as_ref(), &actions, operation, &table.state, @@ -133,7 +135,7 @@ pub async fn commit_actions( #[derive(Debug)] pub struct SlowStore { - inner: DeltaObjectStore, + inner: Arc, } impl std::fmt::Display for SlowStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -147,8 +149,9 @@ impl SlowStore { location: Url, options: impl Into + Clone, ) -> deltalake_core::DeltaResult { + let mut options = options.into(); Ok(Self { - inner: DeltaObjectStore::try_new(location, options).unwrap(), + inner: configure_store(&location, &mut options).unwrap(), }) } } diff --git a/crates/deltalake-core/tests/integration_checkpoint.rs b/crates/deltalake-core/tests/integration_checkpoint.rs index 9b5b0a73ff..56d253eb85 100644 --- a/crates/deltalake-core/tests/integration_checkpoint.rs +++ b/crates/deltalake-core/tests/integration_checkpoint.rs @@ -59,15 +59,12 @@ async fn cleanup_metadata_hdfs_test() -> TestResult { // test to run longer but reliable async fn cleanup_metadata_test(context: &IntegrationContext) -> TestResult { let table_uri = context.root_uri(); - let object_store = DeltaTableBuilder::from_uri(table_uri) + let log_store = DeltaTableBuilder::from_uri(table_uri) .with_allow_http(true) .build_storage()?; + let object_store = log_store.object_store(); - let log_path = |version| { - object_store - .log_path() - .child(format!("{:020}.json", version)) - }; + let log_path = |version| log_store.log_path().child(format!("{:020}.json", version)); // we don't need to actually populate files with content as cleanup works only with file's metadata object_store @@ -98,7 +95,7 @@ async fn cleanup_metadata_test(context: &IntegrationContext) -> TestResult { assert!(retention_timestamp > v1time.timestamp_millis()); assert!(retention_timestamp < v2time.timestamp_millis()); - let removed = cleanup_expired_logs_for(3, object_store.as_ref(), retention_timestamp).await?; + let removed = cleanup_expired_logs_for(3, log_store.as_ref(), retention_timestamp).await?; assert_eq!(removed, 2); assert!(object_store.head(&log_path(0)).await.is_err()); @@ -142,7 +139,7 @@ async fn test_issue_1420_cleanup_expired_logs_for() -> DeltaResult<()> { // Should delete v1 but not v2 or v2.checkpoint.parquet cleanup_expired_logs_for( table.version(), - table.object_store().as_ref(), + table.log_store().as_ref(), ts.timestamp_millis(), ) .await?; @@ -184,7 +181,7 @@ async fn test_issue_1420_cleanup_expired_logs_for() -> DeltaResult<()> { cleanup_expired_logs_for( table.version(), - table.object_store().as_ref(), + table.log_store().as_ref(), ts.timestamp_millis(), ) .await?; diff --git a/crates/deltalake-core/tests/integration_concurrent_writes.rs b/crates/deltalake-core/tests/integration_concurrent_writes.rs index bef44d0693..90dba7659a 100644 --- a/crates/deltalake-core/tests/integration_concurrent_writes.rs +++ b/crates/deltalake-core/tests/integration_concurrent_writes.rs @@ -168,7 +168,7 @@ impl Worker { default_row_commit_version: None, })]; let version = commit( - self.table.object_store().as_ref(), + self.table.log_store().as_ref(), &actions, operation, &self.table.state, diff --git a/crates/deltalake-core/tests/integration_datafusion.rs b/crates/deltalake-core/tests/integration_datafusion.rs index 3476de6839..7a9c38463f 100644 --- a/crates/deltalake-core/tests/integration_datafusion.rs +++ b/crates/deltalake-core/tests/integration_datafusion.rs @@ -34,11 +34,11 @@ use deltalake_core::delta_datafusion::{DeltaPhysicalCodec, DeltaScan}; use deltalake_core::kernel::{DataType, MapType, PrimitiveType, StructField, StructType}; use deltalake_core::operations::create::CreateBuilder; use deltalake_core::protocol::SaveMode; -use deltalake_core::storage::DeltaObjectStore; use deltalake_core::writer::{DeltaWriter, RecordBatchWriter}; use deltalake_core::{ open_table, operations::{write::WriteBuilder, DeltaOps}, + storage::config::configure_log_store, DeltaTable, DeltaTableError, }; use std::error::Error; @@ -211,7 +211,7 @@ mod local { // Trying to execute the write from the input plan without providing Datafusion with a session // state containing the referenced object store in the registry results in an error. assert!( - WriteBuilder::new(target_table.object_store(), target_table.state.clone()) + WriteBuilder::new(target_table.log_store(), target_table.state.clone()) .with_input_execution_plan(source_scan.clone()) .await .unwrap_err() @@ -227,19 +227,18 @@ mod local { .table_uri .clone(); let source_location = Url::parse(&source_uri).unwrap(); - let source_store = DeltaObjectStore::try_new(source_location, HashMap::new()).unwrap(); + let source_store = configure_log_store(source_location, HashMap::new()).unwrap(); let object_store_url = source_store.object_store_url(); let source_store_url: &Url = object_store_url.as_ref(); state .runtime_env() - .register_object_store(source_store_url, Arc::from(source_store)); + .register_object_store(source_store_url, source_store.object_store()); // Execute write to the target table with the proper state - let target_table = - WriteBuilder::new(target_table.object_store(), target_table.state.clone()) - .with_input_execution_plan(source_scan) - .with_input_session_state(state) - .await?; + let target_table = WriteBuilder::new(target_table.log_store(), target_table.state.clone()) + .with_input_execution_plan(source_scan) + .with_input_session_state(state) + .await?; ctx.register_table("target", Arc::new(target_table))?; // Check results diff --git a/crates/deltalake-core/tests/integration_object_store.rs b/crates/deltalake-core/tests/integration_object_store.rs index 3988cbdb6d..9c5720db85 100644 --- a/crates/deltalake-core/tests/integration_object_store.rs +++ b/crates/deltalake-core/tests/integration_object_store.rs @@ -81,7 +81,8 @@ async fn read_write_test_onelake(integration: StorageIntegration, path: &Path) - let delta_store = DeltaTableBuilder::from_uri(&context.root_uri()) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); //println!("{:#?}",delta_store); @@ -103,7 +104,8 @@ async fn test_object_store(integration: StorageIntegration, skip_copy: bool) -> let context = IntegrationContext::new(integration)?; let delta_store = DeltaTableBuilder::from_uri(context.root_uri()) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); put_get_delete_list(delta_store.as_ref()).await?; list_with_delimiter(delta_store.as_ref()).await?; @@ -482,7 +484,8 @@ async fn test_object_store_prefixes(integration: StorageIntegration) -> TestResu let rooturi = format!("{}/{}", context.root_uri(), prefix); let delta_store = DeltaTableBuilder::from_uri(&rooturi) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); let contents = Bytes::from("cats"); let path = Path::from("test"); diff --git a/crates/deltalake-core/tests/integration_read.rs b/crates/deltalake-core/tests/integration_read.rs index 0e17d34397..a15679a09c 100644 --- a/crates/deltalake-core/tests/integration_read.rs +++ b/crates/deltalake-core/tests/integration_read.rs @@ -155,7 +155,8 @@ async fn verify_store(integration: &IntegrationContext, root_path: &str) -> Test let table_uri = format!("{}/{}", integration.root_uri(), root_path); let storage = DeltaTableBuilder::from_uri(table_uri.clone()) .with_allow_http(true) - .build_storage()?; + .build_storage()? + .object_store(); let files = storage.list_with_delimiter(None).await?; assert_eq!( diff --git a/crates/deltalake-core/tests/repair_s3_rename_test.rs b/crates/deltalake-core/tests/repair_s3_rename_test.rs index 3157fab896..ecfba0fbbe 100644 --- a/crates/deltalake-core/tests/repair_s3_rename_test.rs +++ b/crates/deltalake-core/tests/repair_s3_rename_test.rs @@ -117,7 +117,7 @@ fn create_s3_backend( .with_allow_http(true) .build_storage() .unwrap() - .storage_backend(); + .object_store(); let delayed_store = DelayedObjectStore { inner: store, diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 1f558b82fe..a8bfb6668a 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -51,7 +51,8 @@ impl DeltaFileSystemHandler { let storage = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(options.clone().unwrap_or_default()) .build_storage() - .map_err(PythonError::from)?; + .map_err(PythonError::from)? + .object_store(); Ok(Self { inner: storage, rt: Arc::new(rt()?), diff --git a/python/src/lib.rs b/python/src/lib.rs index 923a06d159..a58d1f9aa6 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -274,7 +274,7 @@ impl RawDeltaTable { retention_hours: Option, enforce_retention_duration: bool, ) -> PyResult> { - let mut cmd = VacuumBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = VacuumBuilder::new(self._table.log_store(), self._table.state.clone()) .with_enforce_retention_duration(enforce_retention_duration) .with_dry_run(dry_run); if let Some(retention_period) = retention_hours { @@ -296,7 +296,7 @@ impl RawDeltaTable { writer_properties: Option>, safe_cast: bool, ) -> PyResult { - let mut cmd = UpdateBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = UpdateBuilder::new(self._table.log_store(), self._table.state.clone()) .with_safe_cast(safe_cast); if let Some(writer_props) = writer_properties { @@ -349,7 +349,7 @@ impl RawDeltaTable { max_concurrent_tasks: Option, min_commit_interval: Option, ) -> PyResult { - let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = OptimizeBuilder::new(self._table.log_store(), self._table.state.clone()) .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)); if let Some(size) = target_size { cmd = cmd.with_target_size(size); @@ -379,7 +379,7 @@ impl RawDeltaTable { max_spill_size: usize, min_commit_interval: Option, ) -> PyResult { - let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()) + let mut cmd = OptimizeBuilder::new(self._table.log_store(), self._table.state.clone()) .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)) .with_max_spill_size(max_spill_size) .with_type(OptimizeType::ZOrder(z_order_columns)); @@ -446,7 +446,7 @@ impl RawDeltaTable { let source_df = ctx.read_table(table_provider).unwrap(); let mut cmd = MergeBuilder::new( - self._table.object_store(), + self._table.log_store(), self._table.state.clone(), predicate, source_df, @@ -608,7 +608,7 @@ impl RawDeltaTable { ignore_missing_files: bool, protocol_downgrade_allowed: bool, ) -> PyResult { - let mut cmd = RestoreBuilder::new(self._table.object_store(), self._table.state.clone()); + let mut cmd = RestoreBuilder::new(self._table.log_store(), self._table.state.clone()); if let Some(val) = target { if let Ok(version) = val.extract::() { cmd = cmd.with_version_to_restore(version) @@ -822,7 +822,7 @@ impl RawDeltaTable { partition_by: Some(partition_by), predicate: None, }; - let store = self._table.object_store(); + let store = self._table.log_store(); rt()? .block_on(commit( @@ -866,7 +866,7 @@ impl RawDeltaTable { /// Run the delete command on the delta table: delete records following a predicate and return the delete metrics. #[pyo3(signature = (predicate = None))] pub fn delete(&mut self, predicate: Option) -> PyResult { - let mut cmd = DeleteBuilder::new(self._table.object_store(), self._table.state.clone()); + let mut cmd = DeleteBuilder::new(self._table.log_store(), self._table.state.clone()); if let Some(predicate) = predicate { cmd = cmd.with_predicate(predicate); } @@ -881,9 +881,8 @@ impl RawDeltaTable { /// have been deleted or are malformed #[pyo3(signature = (dry_run = true))] pub fn repair(&mut self, dry_run: bool) -> PyResult { - let cmd = - FileSystemCheckBuilder::new(self._table.object_store(), self._table.state.clone()) - .with_dry_run(dry_run); + let cmd = FileSystemCheckBuilder::new(self._table.log_store(), self._table.state.clone()) + .with_dry_run(dry_run); let (table, metrics) = rt()? .block_on(cmd.into_future())