diff --git a/crates/deltalake-aws/src/logstore.rs b/crates/deltalake-aws/src/logstore.rs index 293ed0895e..d410328197 100644 --- a/crates/deltalake-aws/src/logstore.rs +++ b/crates/deltalake-aws/src/logstore.rs @@ -162,7 +162,7 @@ impl LogStore for S3DynamoDbLogStore { if let Ok(Some(entry)) = entry { self.repair_entry(&entry).await?; } - LogStore::read_commit_entry(self, version).await + read_commit_entry(&self.storage, version).await } /// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists] @@ -228,7 +228,7 @@ impl LogStore for S3DynamoDbLogStore { self.repair_entry(&entry).await?; Ok(entry.version) } else { - LogStore::get_latest_version(self, current_version).await + get_latest_version(self, current_version).await } } @@ -236,11 +236,6 @@ impl LogStore for S3DynamoDbLogStore { self.storage.clone() } - #[cfg(feature = "datafusion")] - fn object_store_url(&self) -> datafusion::execution::object_store::ObjectStoreUrl { - LogStore::object_store_url(&self.config.location) - } - fn config(&self) -> &LogStoreConfig { &self.config } diff --git a/crates/deltalake-core/src/data_catalog/storage/mod.rs b/crates/deltalake-core/src/data_catalog/storage/mod.rs index eaacbccd4e..741db3cd0e 100644 --- a/crates/deltalake-core/src/data_catalog/storage/mod.rs +++ b/crates/deltalake-core/src/data_catalog/storage/mod.rs @@ -49,7 +49,7 @@ impl ListingSchemaProvider { let uri = ensure_table_uri(root_uri)?; let storage_options = storage_options.unwrap_or_default().into(); // We already parsed the url, so unwrapping is safe. - let store = get_default_store(&uri)?; + let store = store_for(&uri)?; Ok(Self { authority: uri.to_string(), store, diff --git a/crates/deltalake-core/src/logstore/default_logstore.rs b/crates/deltalake-core/src/logstore/default_logstore.rs index 58ac826280..ed463e9947 100644 --- a/crates/deltalake-core/src/logstore/default_logstore.rs +++ b/crates/deltalake-core/src/logstore/default_logstore.rs @@ -58,11 +58,6 @@ impl LogStore for DefaultLogStore { self.storage.clone() } - #[cfg(feature = "datafusion")] - fn object_store_url(&self) -> datafusion::execution::object_store::ObjectStoreUrl { - super::object_store_url(&self.config.location) - } - fn config(&self) -> &LogStoreConfig { &self.config } diff --git a/crates/deltalake-core/src/logstore/mod.rs b/crates/deltalake-core/src/logstore/mod.rs index 32b94b34ad..d3c0f02ef9 100644 --- a/crates/deltalake-core/src/logstore/mod.rs +++ b/crates/deltalake-core/src/logstore/mod.rs @@ -225,7 +225,16 @@ pub trait LogStore: Sync + Send { /// registering/fetching. In our case the scheme is hard-coded to "delta-rs", so to get a unique /// host we convert the location from this `LogStore` to a valid name, combining the /// original scheme, host and path with invalid characters replaced. - fn object_store_url(&self) -> ObjectStoreUrl; + fn object_store_url(&self) -> ObjectStoreUrl { + use object_store::path::DELIMITER; + ObjectStoreUrl::parse(format!( + "delta-rs://{}-{}{}", + location.scheme(), + location.host_str().unwrap_or("-"), + location.path().replace(DELIMITER, "-").replace(':', "-") + )) + .expect("Invalid object store url.") + } /// Get configuration representing configured log store. fn config(&self) -> &LogStoreConfig; @@ -342,21 +351,6 @@ lazy_static! { static ref DELTA_LOG_REGEX: Regex = Regex::new(r"(\d{20})\.(json|checkpoint).*$").unwrap(); } -#[cfg(feature = "datafusion")] -fn object_store_url(location: &Url) -> ObjectStoreUrl { - // we are certain, that the URL can be parsed, since - // we make sure when we are parsing the table uri - - use object_store::path::DELIMITER; - ObjectStoreUrl::parse(format!( - "delta-rs://{}-{}{}", - location.scheme(), - location.host_str().unwrap_or("-"), - location.path().replace(DELIMITER, "-").replace(':', "-") - )) - .expect("Invalid object store url.") -} - /// Extract version from a file name in the delta log pub fn extract_version_from_filename(name: &str) -> Option { DELTA_LOG_REGEX @@ -364,7 +358,8 @@ pub fn extract_version_from_filename(name: &str) -> Option { .map(|captures| captures.get(1).unwrap().as_str().parse().unwrap()) } -async fn get_latest_version(log_store: &dyn LogStore, current_version: i64) -> DeltaResult { +/// Default implementation for retrieving the latest version +pub async fn get_latest_version(log_store: &dyn LogStore, current_version: i64) -> DeltaResult { let version_start = match get_last_checkpoint(log_store).await { Ok(last_check_point) => last_check_point.version, Err(ProtocolError::CheckpointNotFound) => { @@ -410,7 +405,7 @@ async fn get_latest_version(log_store: &dyn LogStore, current_version: i64) -> D } /// Read delta log for a specific version -async fn read_commit_entry(storage: &dyn ObjectStore, version: i64) -> DeltaResult> { +pub async fn read_commit_entry(storage: &dyn ObjectStore, version: i64) -> DeltaResult> { let commit_uri = commit_uri_from_version(version); match storage.get(&commit_uri).await { Ok(res) => Ok(Some(res.bytes().await?)), @@ -419,7 +414,8 @@ async fn read_commit_entry(storage: &dyn ObjectStore, version: i64) -> DeltaResu } } -async fn write_commit_entry( +/// Default implementation for writing a commit entry +pub async fn write_commit_entry( storage: &dyn ObjectStore, version: i64, tmp_commit: &Path,