From 4493f56e9f3b34d7732420f6c1a34b719dbf1fbc Mon Sep 17 00:00:00 2001 From: Thomas Peiselt Date: Mon, 6 Nov 2023 20:57:42 +0100 Subject: [PATCH] DeltaObjectStore fold `root_uri` and `to_uri` into `LogStore` --- .../src/delta_datafusion/mod.rs | 44 +++++++------- .../src/logstore/default_logstore.rs | 20 ++++++- crates/deltalake-core/src/logstore/mod.rs | 58 +++++++++++++++++-- .../deltalake-core/src/operations/create.rs | 4 +- .../deltalake-core/src/operations/delete.rs | 17 ++---- crates/deltalake-core/src/operations/merge.rs | 2 +- .../src/operations/transaction/mod.rs | 6 +- .../deltalake-core/src/operations/update.rs | 10 +--- crates/deltalake-core/src/operations/write.rs | 2 +- crates/deltalake-core/src/storage/mod.rs | 35 +---------- crates/deltalake-core/src/table/builder.rs | 11 ++-- crates/deltalake-core/src/table/mod.rs | 14 +++-- 12 files changed, 119 insertions(+), 104 deletions(-) diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index 645c62c53c..25f326f036 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -71,6 +71,7 @@ use url::Url; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Add, DataType as DeltaDataType, Invariant, PrimitiveType}; +use crate::logstore::LogStoreRef; use crate::protocol::{self}; use crate::storage::ObjectStoreRef; use crate::table::builder::ensure_table_uri; @@ -467,7 +468,7 @@ pub struct DeltaScanConfig { #[derive(Debug)] pub(crate) struct DeltaScanBuilder<'a> { snapshot: &'a DeltaTableState, - object_store: ObjectStoreRef, + log_store: LogStoreRef, filter: Option, state: &'a SessionState, projection: Option<&'a Vec>, @@ -480,12 +481,12 @@ pub(crate) struct DeltaScanBuilder<'a> { impl<'a> DeltaScanBuilder<'a> { pub fn new( snapshot: &'a DeltaTableState, - object_store: ObjectStoreRef, + log_store: LogStoreRef, state: &'a SessionState, ) -> Self { DeltaScanBuilder { snapshot, - object_store, + log_store, filter: None, state, files: None, @@ -532,7 +533,7 @@ impl<'a> DeltaScanBuilder<'a> { Some(schema) => schema, None => { self.snapshot - .physical_arrow_schema(self.object_store.clone()) + .physical_arrow_schema(self.log_store.object_store()) .await? } }; @@ -632,7 +633,7 @@ impl<'a> DeltaScanBuilder<'a> { .create_physical_plan( self.state, FileScanConfig { - object_store_url: self.object_store.object_store_url(), + object_store_url: self.log_store.object_store().object_store_url(), file_schema, file_groups: file_groups.into_values().collect(), statistics: self.snapshot.datafusion_table_statistics(), @@ -647,9 +648,7 @@ impl<'a> DeltaScanBuilder<'a> { .await?; Ok(DeltaScan { - table_uri: ensure_table_uri(self.object_store.root_uri())? - .as_str() - .into(), + table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(), parquet_scan: scan, config, logical_schema, @@ -689,7 +688,7 @@ impl TableProvider for DeltaTable { register_store(self.object_store(), session.runtime_env().clone()); let filter_expr = conjunction(filters.iter().cloned()); - let scan = DeltaScanBuilder::new(&self.state, self.object_store(), session) + let scan = DeltaScanBuilder::new(&self.state, self.log_store(), session) .with_projection(projection) .with_limit(limit) .with_filter(filter_expr) @@ -714,7 +713,7 @@ impl TableProvider for DeltaTable { /// A Delta table provider that enables additonal metadata columns to be included during the scan pub struct DeltaTableProvider { snapshot: DeltaTableState, - store: ObjectStoreRef, + log_store: LogStoreRef, config: DeltaScanConfig, schema: Arc, } @@ -723,13 +722,13 @@ impl DeltaTableProvider { /// Build a DeltaTableProvider pub fn try_new( snapshot: DeltaTableState, - store: ObjectStoreRef, + log_store: LogStoreRef, config: DeltaScanConfig, ) -> DeltaResult { Ok(DeltaTableProvider { schema: logical_schema(&snapshot, &config)?, snapshot, - store, + log_store, config, }) } @@ -764,10 +763,10 @@ impl TableProvider for DeltaTableProvider { filters: &[Expr], limit: Option, ) -> DataFusionResult> { - register_store(self.store.clone(), session.runtime_env().clone()); + register_store(self.log_store.object_store(), session.runtime_env().clone()); let filter_expr = conjunction(filters.iter().cloned()); - let scan = DeltaScanBuilder::new(&self.snapshot, self.store.clone(), session) + let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session) .with_projection(projection) .with_limit(limit) .with_filter(filter_expr) @@ -1462,7 +1461,7 @@ fn join_batches_with_add_actions( /// Determine which files contain a record that statisfies the predicate pub(crate) async fn find_files_scan<'a>( snapshot: &DeltaTableState, - store: ObjectStoreRef, + log_store: LogStoreRef, state: &SessionState, expression: Expr, ) -> DeltaResult> { @@ -1489,7 +1488,7 @@ pub(crate) async fn find_files_scan<'a>( // Add path column used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?); - let scan = DeltaScanBuilder::new(snapshot, store.clone(), state) + let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), state) .with_filter(Some(expression.clone())) .with_projection(Some(&used_columns)) .with_scan_config(scan_config) @@ -1580,7 +1579,7 @@ pub(crate) async fn scan_memory_table( /// Finds files in a snapshot that match the provided predicate. pub async fn find_files<'a>( snapshot: &DeltaTableState, - object_store: ObjectStoreRef, + log_store: LogStoreRef, state: &SessionState, predicate: Option, ) -> DeltaResult { @@ -1608,8 +1607,7 @@ pub async fn find_files<'a>( }) } else { let candidates = - find_files_scan(snapshot, object_store.clone(), state, predicate.to_owned()) - .await?; + find_files_scan(snapshot, log_store, state, predicate.to_owned()).await?; Ok(FindFiles { candidates, @@ -1924,8 +1922,8 @@ mod tests { .build(&table.state) .unwrap(); - let storage = table.object_store(); - let provider = DeltaTableProvider::try_new(table.state, storage, config).unwrap(); + let log_store = table.log_store(); + let provider = DeltaTableProvider::try_new(table.state, log_store, config).unwrap(); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(provider)).unwrap(); @@ -1984,8 +1982,8 @@ mod tests { let config = DeltaScanConfigBuilder::new().build(&table.state).unwrap(); - let storage = table.object_store(); - let provider = DeltaTableProvider::try_new(table.state, storage, config).unwrap(); + let log_store = table.log_store(); + let provider = DeltaTableProvider::try_new(table.state, log_store, config).unwrap(); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(provider)).unwrap(); diff --git a/crates/deltalake-core/src/logstore/default_logstore.rs b/crates/deltalake-core/src/logstore/default_logstore.rs index 3ebe5620a1..e74e3f76c7 100644 --- a/crates/deltalake-core/src/logstore/default_logstore.rs +++ b/crates/deltalake-core/src/logstore/default_logstore.rs @@ -17,14 +17,26 @@ use crate::{ #[derive(Debug, Clone)] pub struct DefaultLogStore { pub(crate) storage: ObjectStoreRef, + location: Url, } impl DefaultLogStore { + /// Create a new instance of [`DefaultLogStore`] + /// + /// # Arguments + /// + /// * `storage` - A shared reference to an [`object_store::ObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located). + /// * `location` - A url corresponding to the storage location of `storage`. + pub fn new(storage: ObjectStoreRef, location: Url) -> Self { + DefaultLogStore { storage, location } + } + /// Create log store pub fn try_new(location: Url, options: impl Into + Clone) -> DeltaResult { - let object_store = DeltaObjectStore::try_new(location, options.clone())?; + let object_store = DeltaObjectStore::try_new(location.clone(), options.clone())?; Ok(Self { storage: Arc::new(object_store), + location, }) } } @@ -49,10 +61,14 @@ impl LogStore for DefaultLogStore { } async fn get_latest_version(&self, current_version: i64) -> DeltaResult { - super::get_latest_version(&self.storage, current_version).await + super::get_latest_version(self, current_version).await } fn object_store(&self) -> ObjectStoreRef { self.storage.clone() } + + fn to_uri(&self, location: &Path) -> String { + super::to_uri(&self.location, location) + } } diff --git a/crates/deltalake-core/src/logstore/mod.rs b/crates/deltalake-core/src/logstore/mod.rs index 081b1bdb21..d4459e4e61 100644 --- a/crates/deltalake-core/src/logstore/mod.rs +++ b/crates/deltalake-core/src/logstore/mod.rs @@ -3,6 +3,7 @@ use futures::StreamExt; use lazy_static::lazy_static; use regex::Regex; use std::{cmp::max, sync::Arc}; +use url::Url; use crate::{ errors::DeltaResult, @@ -20,6 +21,10 @@ pub mod default_logstore; /// Sharable reference to [`LogStore`] pub type LogStoreRef = Arc; +lazy_static! { + static ref DELTA_LOG_PATH: Path = Path::from("_delta_log"); +} + /// Trait for critical operations required to read and write commit entries in Delta logs. /// /// The correctness is predicated on the atomicity and durability guarantees of @@ -51,6 +56,19 @@ pub trait LogStore: Sync + Send { /// Get underlying object store. fn object_store(&self) -> ObjectStoreRef; + + /// [Path] to Delta log + fn to_uri(&self, location: &Path) -> String; + + /// Get fully qualified uri for table root + fn root_uri(&self) -> String { + self.to_uri(&Path::from("")) + } + + /// [Path] to Delta log + fn log_path(&self) -> &Path { + &DELTA_LOG_PATH + } } // TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders @@ -64,6 +82,35 @@ lazy_static! { static ref DELTA_LOG_REGEX: Regex = Regex::new(r"(\d{20})\.(json|checkpoint).*$").unwrap(); } +fn to_uri(root: &Url, location: &Path) -> String { + match root.scheme() { + "file" => { + #[cfg(windows)] + let uri = format!( + "{}/{}", + root.as_ref().trim_end_matches('/'), + location.as_ref() + ) + .replace("file:///", ""); + #[cfg(unix)] + let uri = format!( + "{}/{}", + root.as_ref().trim_end_matches('/'), + location.as_ref() + ) + .replace("file://", ""); + uri + } + _ => { + if location.as_ref().is_empty() || location.as_ref() == "/" { + root.as_ref().to_string() + } else { + format!("{}/{}", root.as_ref(), location.as_ref()) + } + } + } +} + /// Extract version from a file name in the delta log pub fn extract_version_from_filename(name: &str) -> Option { DELTA_LOG_REGEX @@ -71,8 +118,9 @@ pub fn extract_version_from_filename(name: &str) -> Option { .map(|captures| captures.get(1).unwrap().as_str().parse().unwrap()) } -async fn get_latest_version(storage: &ObjectStoreRef, current_version: i64) -> DeltaResult { - let version_start = match get_last_checkpoint(storage).await { +async fn get_latest_version(log_store: &dyn LogStore, current_version: i64) -> DeltaResult { + let object_store = log_store.object_store(); + let version_start = match get_last_checkpoint(&object_store).await { Ok(last_check_point) => last_check_point.version, Err(ProtocolError::CheckpointNotFound) => { // no checkpoint @@ -90,9 +138,9 @@ async fn get_latest_version(storage: &ObjectStoreRef, current_version: i64) -> D // list files to find max version let version = async { let mut max_version: i64 = version_start; - let prefix = Some(storage.log_path()); + let prefix = Some(log_store.log_path()); let offset_path = commit_uri_from_version(max_version); - let mut files = storage.list_with_offset(prefix, &offset_path).await?; + let mut files = object_store.list_with_offset(prefix, &offset_path).await?; while let Some(obj_meta) = files.next().await { let obj_meta = obj_meta?; @@ -106,7 +154,7 @@ async fn get_latest_version(storage: &ObjectStoreRef, current_version: i64) -> D } if max_version < 0 { - return Err(DeltaTableError::not_a_table(storage.root_uri())); + return Err(DeltaTableError::not_a_table(log_store.root_uri())); } Ok::(max_version) diff --git a/crates/deltalake-core/src/operations/create.rs b/crates/deltalake-core/src/operations/create.rs index c8c24a9d00..4f4e07c2f6 100644 --- a/crates/deltalake-core/src/operations/create.rs +++ b/crates/deltalake-core/src/operations/create.rs @@ -221,9 +221,7 @@ impl CreateBuilder { let (storage_url, table) = if let Some(log_store) = self.log_store { ( - ensure_table_uri(log_store.object_store().root_uri())? - .as_str() - .to_string(), + ensure_table_uri(log_store.root_uri())?.as_str().to_string(), DeltaTable::new(log_store, Default::default()), ) } else { diff --git a/crates/deltalake-core/src/operations/delete.rs b/crates/deltalake-core/src/operations/delete.rs index b697d0cdbe..5cea334654 100644 --- a/crates/deltalake-core/src/operations/delete.rs +++ b/crates/deltalake-core/src/operations/delete.rs @@ -41,7 +41,6 @@ use crate::kernel::{Action, Add, Remove}; use crate::operations::transaction::commit; use crate::operations::write::write_execution_plan; use crate::protocol::DeltaOperation; -use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -126,7 +125,7 @@ impl DeleteBuilder { async fn excute_non_empty_expr( snapshot: &DeltaTableState, - object_store: ObjectStoreRef, + log_store: LogStoreRef, state: &SessionState, expression: &Expr, metrics: &mut DeleteMetrics, @@ -145,7 +144,7 @@ async fn excute_non_empty_expr( .partition_columns .clone(); - let scan = DeltaScanBuilder::new(snapshot, object_store.clone(), state) + let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), state) .with_files(rewrite) .build() .await?; @@ -168,7 +167,7 @@ async fn excute_non_empty_expr( state.clone(), filter.clone(), table_partition_cols.clone(), - object_store.clone(), + log_store.object_store(), Some(snapshot.table_config().target_file_size() as usize), None, writer_properties, @@ -198,13 +197,7 @@ async fn execute( let mut metrics = DeleteMetrics::default(); let scan_start = Instant::now(); - let candidates = find_files( - snapshot, - log_store.object_store().clone(), - &state, - predicate.clone(), - ) - .await?; + let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_micros(); let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); @@ -215,7 +208,7 @@ async fn execute( let write_start = Instant::now(); let add = excute_non_empty_expr( snapshot, - log_store.object_store().clone(), + log_store.clone(), &state, &predicate, &mut metrics, diff --git a/crates/deltalake-core/src/operations/merge.rs b/crates/deltalake-core/src/operations/merge.rs index eb28e14f69..20618c16e5 100644 --- a/crates/deltalake-core/src/operations/merge.rs +++ b/crates/deltalake-core/src/operations/merge.rs @@ -590,7 +590,7 @@ async fn execute( // predicates also need to be considered when pruning let target = Arc::new( - DeltaScanBuilder::new(snapshot, log_store.object_store(), &state) + DeltaScanBuilder::new(snapshot, log_store.clone(), &state) .with_schema(snapshot.input_schema()?) .build() .await?, diff --git a/crates/deltalake-core/src/operations/transaction/mod.rs b/crates/deltalake-core/src/operations/transaction/mod.rs index 8b66bd38fd..6524d1e520 100644 --- a/crates/deltalake-core/src/operations/transaction/mod.rs +++ b/crates/deltalake-core/src/operations/transaction/mod.rs @@ -294,10 +294,8 @@ mod tests { async fn test_try_commit_transaction() { let store = Arc::new(InMemory::new()); let url = Url::parse("mem://what/is/this").unwrap(); - let delta_store = DeltaObjectStore::new(store.clone(), url); - let log_store = DefaultLogStore { - storage: Arc::new(delta_store), - }; + let delta_store = DeltaObjectStore::new(store.clone(), url.clone()); + let log_store = DefaultLogStore::new(Arc::new(delta_store), url); let tmp_path = Path::from("_delta_log/tmp"); let version_path = Path::from("_delta_log/00000000000000000000.json"); store.put(&tmp_path, bytes::Bytes::new()).await.unwrap(); diff --git a/crates/deltalake-core/src/operations/update.rs b/crates/deltalake-core/src/operations/update.rs index 54479a73de..21077c1e6e 100644 --- a/crates/deltalake-core/src/operations/update.rs +++ b/crates/deltalake-core/src/operations/update.rs @@ -213,13 +213,7 @@ async fn execute( let table_partition_cols = current_metadata.partition_columns.clone(); let scan_start = Instant::now(); - let candidates = find_files( - snapshot, - log_store.object_store(), - &state, - predicate.clone(), - ) - .await?; + let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis() as u64; if candidates.candidates.is_empty() { @@ -231,7 +225,7 @@ async fn execute( let execution_props = state.execution_props(); // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches - let scan = DeltaScanBuilder::new(snapshot, log_store.object_store().clone(), &state) + let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) .with_files(&candidates.candidates) .build() .await?; diff --git a/crates/deltalake-core/src/operations/write.rs b/crates/deltalake-core/src/operations/write.rs index 46fb579ffc..1fbf1a3e80 100644 --- a/crates/deltalake-core/src/operations/write.rs +++ b/crates/deltalake-core/src/operations/write.rs @@ -220,7 +220,7 @@ impl WriteBuilder { } else { match self.mode { SaveMode::ErrorIfExists => { - Err(WriteError::AlreadyExists(object_store.root_uri()).into()) + Err(WriteError::AlreadyExists(self.log_store.root_uri()).into()) } _ => Ok(vec![]), } diff --git a/crates/deltalake-core/src/storage/mod.rs b/crates/deltalake-core/src/storage/mod.rs index c7309531ea..ac841fb862 100644 --- a/crates/deltalake-core/src/storage/mod.rs +++ b/crates/deltalake-core/src/storage/mod.rs @@ -110,9 +110,8 @@ impl DeltaObjectStore { &self.options } - /// Get fully qualified uri for table root - pub fn root_uri(&self) -> String { - self.to_uri(&Path::from("")) + pub(crate) fn location(&self) -> Url { + self.location.clone() } #[cfg(feature = "datafusion")] @@ -141,36 +140,6 @@ impl DeltaObjectStore { &DELTA_LOG_PATH } - /// [Path] to Delta log - pub fn to_uri(&self, location: &Path) -> String { - match self.location.scheme() { - "file" => { - #[cfg(windows)] - let uri = format!( - "{}/{}", - self.location.as_ref().trim_end_matches('/'), - location.as_ref() - ) - .replace("file:///", ""); - #[cfg(unix)] - let uri = format!( - "{}/{}", - self.location.as_ref().trim_end_matches('/'), - location.as_ref() - ) - .replace("file://", ""); - uri - } - _ => { - if location.as_ref().is_empty() || location.as_ref() == "/" { - self.location.as_ref().to_string() - } else { - format!("{}/{}", self.location.as_ref(), location.as_ref()) - } - } - } - } - /// Deletes object by `paths`. pub async fn delete_batch(&self, paths: &[Path]) -> ObjectStoreResult<()> { for path in paths { diff --git a/crates/deltalake-core/src/table/builder.rs b/crates/deltalake-core/src/table/builder.rs index 80f2c2bc2b..a8e7d15e01 100644 --- a/crates/deltalake-core/src/table/builder.rs +++ b/crates/deltalake-core/src/table/builder.rs @@ -247,12 +247,11 @@ impl DeltaTableBuilder { /// Build a delta storage backend for the given config pub fn build_storage(self) -> DeltaResult { match self.options.storage_backend { - Some((storage, location)) => Ok(Arc::new(DefaultLogStore { - storage: Arc::new(DeltaObjectStore::new( - storage, - ensure_table_uri(location.as_str())?, - )), - })), + Some((storage, location)) => { + let location = ensure_table_uri(location.as_str())?; + let storage = DeltaObjectStore::new(storage, location.clone()); + Ok(Arc::new(DefaultLogStore::new(Arc::new(storage), location))) + } None => { let location = ensure_table_uri(&self.options.table_uri)?; Ok(config::configure_log_store( diff --git a/crates/deltalake-core/src/table/mod.rs b/crates/deltalake-core/src/table/mod.rs index 098bcdfdd4..4abd070a04 100644 --- a/crates/deltalake-core/src/table/mod.rs +++ b/crates/deltalake-core/src/table/mod.rs @@ -298,13 +298,15 @@ impl<'de> Deserialize<'de> for DeltaTable { let config = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?; + // TODO twh; proper (de-)serialization let storage = Arc::new( seq.next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?, ); - let log_store = Arc::new(DefaultLogStore { - storage: Arc::clone(&storage), - }); + let log_store = Arc::new(DefaultLogStore::new( + Arc::clone(&storage), + storage.location().to_owned(), + )); let last_check_point = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?; @@ -364,7 +366,7 @@ impl DeltaTable { /// The URI of the underlying data pub fn table_uri(&self) -> String { - self.log_store.object_store().root_uri() + self.log_store.root_uri() } /// get a shared reference to the log store @@ -714,7 +716,7 @@ impl DeltaTable { let files = self.get_files_by_partitions(filters)?; Ok(files .iter() - .map(|fname| self.object_store().to_uri(fname)) + .map(|fname| self.log_store.to_uri(fname)) .collect()) } @@ -739,7 +741,7 @@ impl DeltaTable { pub fn get_file_uris(&self) -> impl Iterator + '_ { self.state .file_paths_iter() - .map(|path| self.object_store().to_uri(&path)) + .map(|path| self.log_store.to_uri(&path)) } /// Returns statistics for files, in order