diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index 25f326f036..db4c59be87 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -73,7 +73,6 @@ use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Add, DataType as DeltaDataType, Invariant, PrimitiveType}; use crate::logstore::LogStoreRef; use crate::protocol::{self}; -use crate::storage::ObjectStoreRef; use crate::table::builder::ensure_table_uri; use crate::table::state::DeltaTableState; use crate::{open_table, open_table_with_storage_options, DeltaTable}; @@ -358,10 +357,10 @@ impl PruningStatistics for DeltaTable { // each delta table must register a specific object store, since paths are internally // handled relative to the table root. -pub(crate) fn register_store(store: ObjectStoreRef, env: Arc) { +pub(crate) fn register_store(store: LogStoreRef, env: Arc) { let object_store_url = store.object_store_url(); let url: &Url = object_store_url.as_ref(); - env.register_object_store(url, store); + env.register_object_store(url, store.object_store()); } pub(crate) fn logical_schema( @@ -633,7 +632,7 @@ impl<'a> DeltaScanBuilder<'a> { .create_physical_plan( self.state, FileScanConfig { - object_store_url: self.log_store.object_store().object_store_url(), + object_store_url: self.log_store.object_store_url(), file_schema, file_groups: file_groups.into_values().collect(), statistics: self.snapshot.datafusion_table_statistics(), @@ -685,7 +684,7 @@ impl TableProvider for DeltaTable { filters: &[Expr], limit: Option, ) -> DataFusionResult> { - register_store(self.object_store(), session.runtime_env().clone()); + register_store(self.log_store(), session.runtime_env().clone()); let filter_expr = conjunction(filters.iter().cloned()); let scan = DeltaScanBuilder::new(&self.state, self.log_store(), session) @@ -763,7 +762,7 @@ impl TableProvider for DeltaTableProvider { filters: &[Expr], limit: Option, ) -> DataFusionResult> { - register_store(self.log_store.object_store(), session.runtime_env().clone()); + register_store(self.log_store.clone(), session.runtime_env().clone()); let filter_expr = conjunction(filters.iter().cloned()); let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session) diff --git a/crates/deltalake-core/src/logstore/default_logstore.rs b/crates/deltalake-core/src/logstore/default_logstore.rs index e74e3f76c7..c60d39abf9 100644 --- a/crates/deltalake-core/src/logstore/default_logstore.rs +++ b/crates/deltalake-core/src/logstore/default_logstore.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use bytes::Bytes; +use datafusion::execution::object_store::ObjectStoreUrl; use object_store::path::Path; use url::Url; @@ -71,4 +72,8 @@ impl LogStore for DefaultLogStore { fn to_uri(&self, location: &Path) -> String { super::to_uri(&self.location, location) } + + fn object_store_url(&self) -> ObjectStoreUrl { + super::object_store_url(&self.location) + } } diff --git a/crates/deltalake-core/src/logstore/mod.rs b/crates/deltalake-core/src/logstore/mod.rs index d4459e4e61..13902f136e 100644 --- a/crates/deltalake-core/src/logstore/mod.rs +++ b/crates/deltalake-core/src/logstore/mod.rs @@ -14,7 +14,12 @@ use crate::{ }; use bytes::Bytes; use log::debug; -use object_store::{path::Path, Error as ObjectStoreError, ObjectStore}; +use object_store::{ + path::Path, Error as ObjectStoreError, ObjectStore, Result as ObjectStoreResult, +}; + +#[cfg(feature = "datafusion")] +use datafusion::datasource::object_store::ObjectStoreUrl; pub mod default_logstore; @@ -69,6 +74,30 @@ pub trait LogStore: Sync + Send { fn log_path(&self) -> &Path { &DELTA_LOG_PATH } + + /// Check if the location is a delta table location + async fn is_delta_table_location(&self) -> ObjectStoreResult { + // TODO We should really be using HEAD here, but this fails in windows tests + let object_store = self.object_store(); + let mut stream = object_store.list(Some(self.log_path())).await?; + if let Some(res) = stream.next().await { + match res { + Ok(_) => Ok(true), + Err(ObjectStoreError::NotFound { .. }) => Ok(false), + Err(err) => Err(err), + } + } else { + Ok(false) + } + } + + #[cfg(feature = "datafusion")] + /// Generate a unique enough url to identify the store in datafusion. + /// The DF object store registry only cares about the scheme and the host of the url for + /// registering/fetching. In our case the scheme is hard-coded to "delta-rs", so to get a unique + /// host we convert the location from this `DeltaObjectStore` to a valid name, combining the + /// original scheme, host and path with invalid characters replaced. + fn object_store_url(&self) -> ObjectStoreUrl; } // TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders @@ -111,6 +140,20 @@ fn to_uri(root: &Url, location: &Path) -> String { } } +fn object_store_url(location: &Url) -> ObjectStoreUrl { + // we are certain, that the URL can be parsed, since + // we make sure when we are parsing the table uri + + use object_store::path::DELIMITER; + ObjectStoreUrl::parse(format!( + "delta-rs://{}-{}{}", + location.scheme(), + location.host_str().unwrap_or("-"), + location.path().replace(DELIMITER, "-").replace(':', "-") + )) + .expect("Invalid object store url.") +} + /// Extract version from a file name in the delta log pub fn extract_version_from_filename(name: &str) -> Option { DELTA_LOG_REGEX @@ -119,8 +162,7 @@ pub fn extract_version_from_filename(name: &str) -> Option { } async fn get_latest_version(log_store: &dyn LogStore, current_version: i64) -> DeltaResult { - let object_store = log_store.object_store(); - let version_start = match get_last_checkpoint(&object_store).await { + let version_start = match get_last_checkpoint(log_store).await { Ok(last_check_point) => last_check_point.version, Err(ProtocolError::CheckpointNotFound) => { // no checkpoint @@ -140,6 +182,8 @@ async fn get_latest_version(log_store: &dyn LogStore, current_version: i64) -> D let mut max_version: i64 = version_start; let prefix = Some(log_store.log_path()); let offset_path = commit_uri_from_version(max_version); + // let log_store = log_store.clone(); + let object_store = log_store.object_store(); let mut files = object_store.list_with_offset(prefix, &offset_path).await?; while let Some(obj_meta) = files.next().await { @@ -189,3 +233,29 @@ async fn write_commit_entry( })?; Ok(()) } + +#[cfg(feature = "datafusion")] +#[cfg(test)] +mod tests { + use url::Url; + + #[tokio::test] + async fn test_unique_object_store_url() { + for (location_1, location_2) in [ + // Same scheme, no host, different path + ("file:///path/to/table_1", "file:///path/to/table_2"), + // Different scheme/host, same path + ("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"), + // Same scheme, different host, same path + ("s3://bucket_1/table_1", "s3://bucket_2/table_1"), + ] { + let url_1 = Url::parse(location_1).unwrap(); + let url_2 = Url::parse(location_2).unwrap(); + + assert_ne!( + super::object_store_url(&url_1).as_str(), + super::object_store_url(&url_2).as_str(), + ); + } + } +} diff --git a/crates/deltalake-core/src/operations/create.rs b/crates/deltalake-core/src/operations/create.rs index 4f4e07c2f6..79760a888d 100644 --- a/crates/deltalake-core/src/operations/create.rs +++ b/crates/deltalake-core/src/operations/create.rs @@ -291,7 +291,8 @@ impl std::future::IntoFuture for CreateBuilder { Box::pin(async move { let mode = this.mode.clone(); let (mut table, actions, operation) = this.into_table_and_actions()?; - let table_state = if table.object_store().is_delta_table_location().await? { + let log_store = table.log_store(); + let table_state = if log_store.is_delta_table_location().await? { match mode { SaveMode::ErrorIfExists => return Err(CreateError::TableAlreadyExists.into()), SaveMode::Append => return Err(CreateError::AppendNotAllowed.into()), diff --git a/crates/deltalake-core/src/operations/delete.rs b/crates/deltalake-core/src/operations/delete.rs index 5cea334654..fc535ada2c 100644 --- a/crates/deltalake-core/src/operations/delete.rs +++ b/crates/deltalake-core/src/operations/delete.rs @@ -278,7 +278,7 @@ impl std::future::IntoFuture for DeleteBuilder { let session = SessionContext::new(); // If a user provides their own their DF state then they must register the store themselves - register_store(this.log_store.object_store().clone(), session.runtime_env()); + register_store(this.log_store.clone(), session.runtime_env()); session.state() }); diff --git a/crates/deltalake-core/src/operations/merge.rs b/crates/deltalake-core/src/operations/merge.rs index 20618c16e5..d38ddf0efb 100644 --- a/crates/deltalake-core/src/operations/merge.rs +++ b/crates/deltalake-core/src/operations/merge.rs @@ -1212,7 +1212,7 @@ impl std::future::IntoFuture for MergeBuilder { let session = SessionContext::new(); // If a user provides their own their DF state then they must register the store themselves - register_store(this.log_store.object_store().clone(), session.runtime_env()); + register_store(this.log_store.clone(), session.runtime_env()); session.state() }); diff --git a/crates/deltalake-core/src/operations/update.rs b/crates/deltalake-core/src/operations/update.rs index 21077c1e6e..32fa4eea49 100644 --- a/crates/deltalake-core/src/operations/update.rs +++ b/crates/deltalake-core/src/operations/update.rs @@ -430,7 +430,7 @@ impl std::future::IntoFuture for UpdateBuilder { let session = SessionContext::new(); // If a user provides their own their DF state then they must register the store themselves - register_store(this.log_store.object_store().clone(), session.runtime_env()); + register_store(this.log_store.clone(), session.runtime_env()); session.state() }); diff --git a/crates/deltalake-core/src/operations/write.rs b/crates/deltalake-core/src/operations/write.rs index 1fbf1a3e80..dec4b7ced7 100644 --- a/crates/deltalake-core/src/operations/write.rs +++ b/crates/deltalake-core/src/operations/write.rs @@ -211,8 +211,7 @@ impl WriteBuilder { } async fn check_preconditions(&self) -> DeltaResult> { - let object_store = self.log_store.object_store(); - match object_store.is_delta_table_location().await? { + match self.log_store.is_delta_table_location().await? { true => { let min_writer = self.snapshot.min_writer_version(); if min_writer > MAX_SUPPORTED_WRITER_VERSION { diff --git a/crates/deltalake-core/src/protocol/checkpoints.rs b/crates/deltalake-core/src/protocol/checkpoints.rs index bee45f51bf..f48fbfbd76 100644 --- a/crates/deltalake-core/src/protocol/checkpoints.rs +++ b/crates/deltalake-core/src/protocol/checkpoints.rs @@ -23,7 +23,7 @@ use crate::kernel::{ Action, Add as AddAction, DataType, Metadata, PrimitiveType, Protocol, StructField, StructType, Txn, }; -use crate::storage::DeltaObjectStore; +use crate::logstore::LogStore; use crate::table::state::DeltaTableState; use crate::table::{CheckPoint, CheckPointBuilder}; use crate::{open_table_with_version, DeltaTable}; @@ -70,12 +70,7 @@ pub const CHECKPOINT_RECORD_BATCH_SIZE: usize = 5000; /// Creates checkpoint at current table version pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), ProtocolError> { - create_checkpoint_for( - table.version(), - table.get_state(), - table.object_store().as_ref(), - ) - .await?; + create_checkpoint_for(table.version(), table.get_state(), table.log_store.as_ref()).await?; Ok(()) } @@ -86,7 +81,7 @@ pub async fn cleanup_metadata(table: &DeltaTable) -> Result Result<(), ProtocolError> { // TODO: checkpoints _can_ be multi-part... haven't actually found a good reference for // an appropriate split point yet though so only writing a single part currently. // See https://github.com/delta-io/delta-rs/issues/288 - let last_checkpoint_path = storage.log_path().child("_last_checkpoint"); + let last_checkpoint_path = log_store.log_path().child("_last_checkpoint"); debug!("Writing parquet bytes to checkpoint buffer."); let (checkpoint, parquet_bytes) = parquet_bytes_from_state(state)?; let file_name = format!("{version:020}.checkpoint.parquet"); - let checkpoint_path = storage.log_path().child(file_name); + let checkpoint_path = log_store.log_path().child(file_name); + let object_store = log_store.object_store(); debug!("Writing checkpoint to {:?}.", checkpoint_path); - storage.put(&checkpoint_path, parquet_bytes).await?; + object_store.put(&checkpoint_path, parquet_bytes).await?; let last_checkpoint_content: Value = serde_json::to_value(checkpoint)?; let last_checkpoint_content = bytes::Bytes::from(serde_json::to_vec(&last_checkpoint_content)?); debug!("Writing _last_checkpoint to {:?}.", last_checkpoint_path); - storage + object_store .put(&last_checkpoint_path, last_checkpoint_content) .await?; @@ -151,7 +147,7 @@ pub async fn create_checkpoint_for( /// and less than the specified version. pub async fn cleanup_expired_logs_for( until_version: i64, - storage: &DeltaObjectStore, + log_store: &dyn LogStore, cutoff_timestamp: i64, ) -> Result { lazy_static! { @@ -162,10 +158,11 @@ pub async fn cleanup_expired_logs_for( // Feed a stream of candidate deletion files directly into the delete_stream // function to try to improve the speed of cleanup and reduce the need for // intermediate memory. - let deleted = storage + let object_store = log_store.object_store(); + let deleted = object_store .delete_stream( - storage - .list(Some(storage.log_path())) + object_store + .list(Some(log_store.log_path())) .await? // This predicate function will filter out any locations that don't // match the given timestamp range diff --git a/crates/deltalake-core/src/protocol/mod.rs b/crates/deltalake-core/src/protocol/mod.rs index 47e24cd959..8a5cd9f858 100644 --- a/crates/deltalake-core/src/protocol/mod.rs +++ b/crates/deltalake-core/src/protocol/mod.rs @@ -26,7 +26,7 @@ use std::mem::take; use crate::errors::DeltaResult; use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove}; -use crate::storage::ObjectStoreRef; +use crate::logstore::LogStore; use crate::table::CheckPoint; use crate::table::DeltaTableMetaData; @@ -601,14 +601,15 @@ pub enum OutputMode { } pub(crate) async fn get_last_checkpoint( - object_store: &ObjectStoreRef, + log_store: &dyn LogStore, ) -> Result { let last_checkpoint_path = Path::from_iter(["_delta_log", "_last_checkpoint"]); debug!("loading checkpoint from {last_checkpoint_path}"); + let object_store = log_store.object_store(); match object_store.get(&last_checkpoint_path).await { Ok(data) => Ok(serde_json::from_slice(&data.bytes().await?)?), Err(ObjectStoreError::NotFound { .. }) => { - match find_latest_check_point_for_version(object_store, i64::MAX).await { + match find_latest_check_point_for_version(log_store, i64::MAX).await { Ok(Some(cp)) => Ok(cp), _ => Err(ProtocolError::CheckpointNotFound), } @@ -618,7 +619,7 @@ pub(crate) async fn get_last_checkpoint( } pub(crate) async fn find_latest_check_point_for_version( - object_store: &ObjectStoreRef, + log_store: &dyn LogStore, version: i64, ) -> Result, ProtocolError> { lazy_static! { @@ -629,7 +630,8 @@ pub(crate) async fn find_latest_check_point_for_version( } let mut cp: Option = None; - let mut stream = object_store.list(Some(object_store.log_path())).await?; + let object_store = log_store.object_store(); + let mut stream = object_store.list(Some(log_store.log_path())).await?; while let Some(obj_meta) = stream.next().await { // Exit early if any objects can't be listed. diff --git a/crates/deltalake-core/src/storage/config.rs b/crates/deltalake-core/src/storage/config.rs index 114747f61c..16a9b0eb4b 100644 --- a/crates/deltalake-core/src/storage/config.rs +++ b/crates/deltalake-core/src/storage/config.rs @@ -232,10 +232,8 @@ pub(crate) fn configure_log_store( options: impl Into + Clone, ) -> DeltaResult { let mut options = options.into(); - let (scheme, _prefix) = ObjectStoreScheme::parse(&url, &mut options)?; - match scheme { - _ => Ok(Arc::new(DefaultLogStore::try_new(url, options)?)), - } + let (_scheme, _prefix) = ObjectStoreScheme::parse(&url, &mut options)?; + Ok(Arc::new(DefaultLogStore::try_new(url, options)?)) } pub(crate) fn configure_store( diff --git a/crates/deltalake-core/src/storage/mod.rs b/crates/deltalake-core/src/storage/mod.rs index ac841fb862..8d583d2edd 100644 --- a/crates/deltalake-core/src/storage/mod.rs +++ b/crates/deltalake-core/src/storage/mod.rs @@ -6,7 +6,7 @@ use std::ops::Range; use std::sync::Arc; use bytes::Bytes; -use futures::{stream::BoxStream, StreamExt}; +use futures::stream::BoxStream; use lazy_static::lazy_static; use object_store::GetOptions; use serde::de::{Error, SeqAccess, Visitor}; @@ -25,9 +25,6 @@ pub mod utils; #[cfg(any(feature = "s3", feature = "s3-native-tls"))] pub mod s3; -#[cfg(feature = "datafusion")] -use datafusion::datasource::object_store::ObjectStoreUrl; - pub use object_store::path::{Path, DELIMITER}; pub use object_store::{ DynObjectStore, Error as ObjectStoreError, GetResult, ListResult, MultipartId, ObjectMeta, @@ -114,32 +111,6 @@ impl DeltaObjectStore { self.location.clone() } - #[cfg(feature = "datafusion")] - /// Generate a unique enough url to identify the store in datafusion. - /// The DF object store registry only cares about the scheme and the host of the url for - /// registering/fetching. In our case the scheme is hard-coded to "delta-rs", so to get a unique - /// host we convert the location from this `DeltaObjectStore` to a valid name, combining the - /// original scheme, host and path with invalid characters replaced. - pub fn object_store_url(&self) -> ObjectStoreUrl { - // we are certain, that the URL can be parsed, since - // we make sure when we are parsing the table uri - ObjectStoreUrl::parse(format!( - "delta-rs://{}-{}{}", - self.location.scheme(), - self.location.host_str().unwrap_or("-"), - self.location - .path() - .replace(DELIMITER, "-") - .replace(':', "-") - )) - .expect("Invalid object store url.") - } - - /// [Path] to Delta log - pub fn log_path(&self) -> &Path { - &DELTA_LOG_PATH - } - /// Deletes object by `paths`. pub async fn delete_batch(&self, paths: &[Path]) -> ObjectStoreResult<()> { for path in paths { @@ -151,21 +122,6 @@ impl DeltaObjectStore { } Ok(()) } - - /// Check if the location is a delta table location - pub async fn is_delta_table_location(&self) -> ObjectStoreResult { - // TODO We should really be using HEAD here, but this fails in windows tests - let mut stream = self.list(Some(self.log_path())).await?; - if let Some(res) = stream.next().await { - match res { - Ok(_) => Ok(true), - Err(ObjectStoreError::NotFound { .. }) => Ok(false), - Err(err) => Err(err), - } - } else { - Ok(false) - } - } } #[async_trait::async_trait] @@ -319,37 +275,3 @@ impl<'de> Deserialize<'de> for DeltaObjectStore { deserializer.deserialize_seq(DeltaObjectStoreVisitor {}) } } - -#[cfg(feature = "datafusion")] -#[cfg(test)] -mod tests { - use crate::storage::DeltaObjectStore; - use object_store::memory::InMemory; - use std::sync::Arc; - use url::Url; - - #[tokio::test] - async fn test_unique_object_store_url() { - // Just a dummy store to be passed for initialization - let inner_store = Arc::from(InMemory::new()); - - for (location_1, location_2) in [ - // Same scheme, no host, different path - ("file:///path/to/table_1", "file:///path/to/table_2"), - // Different scheme/host, same path - ("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"), - // Same scheme, different host, same path - ("s3://bucket_1/table_1", "s3://bucket_2/table_1"), - ] { - let url_1 = Url::parse(location_1).unwrap(); - let url_2 = Url::parse(location_2).unwrap(); - let store_1 = DeltaObjectStore::new(inner_store.clone(), url_1); - let store_2 = DeltaObjectStore::new(inner_store.clone(), url_2); - - assert_ne!( - store_1.object_store_url().as_str(), - store_2.object_store_url().as_str(), - ); - } - } -} diff --git a/crates/deltalake-core/src/table/mod.rs b/crates/deltalake-core/src/table/mod.rs index 4abd070a04..e4110a29f8 100644 --- a/crates/deltalake-core/src/table/mod.rs +++ b/crates/deltalake-core/src/table/mod.rs @@ -377,8 +377,7 @@ impl DeltaTable { /// Return the list of paths of given checkpoint. pub fn get_checkpoint_data_paths(&self, check_point: &CheckPoint) -> Vec { let checkpoint_prefix = format!("{:020}", check_point.version); - let object_store = self.object_store(); - let log_path = object_store.log_path(); + let log_path = self.log_store.log_path(); let mut checkpoint_data_paths = Vec::new(); match check_point.parts { @@ -414,7 +413,7 @@ impl DeltaTable { // Get file objects from table. let storage = self.object_store(); - let mut stream = storage.list(Some(storage.log_path())).await?; + let mut stream = storage.list(Some(self.log_store.log_path())).await?; while let Some(obj_meta) = stream.next().await { let obj_meta = obj_meta?; @@ -497,7 +496,7 @@ impl DeltaTable { /// loading the last checkpoint and incrementally applying each version since. #[cfg(any(feature = "parquet", feature = "parquet2"))] pub async fn update(&mut self) -> Result<(), DeltaTableError> { - match get_last_checkpoint(&self.object_store()).await { + match get_last_checkpoint(self.log_store.as_ref()).await { Ok(last_check_point) => { debug!("update with latest checkpoint {last_check_point:?}"); if Some(last_check_point) == self.last_check_point { @@ -595,7 +594,7 @@ impl DeltaTable { // 1. find latest checkpoint below version #[cfg(any(feature = "parquet", feature = "parquet2"))] - match find_latest_check_point_for_version(&self.object_store(), version).await? { + match find_latest_check_point_for_version(self.log_store.as_ref(), version).await? { Some(check_point) => { self.restore_checkpoint(check_point).await?; }