diff --git a/crates/deltalake-aws/src/lib.rs b/crates/deltalake-aws/src/lib.rs index 00fd02b587..f6a2b2da31 100644 --- a/crates/deltalake-aws/src/lib.rs +++ b/crates/deltalake-aws/src/lib.rs @@ -44,13 +44,11 @@ impl LogStoreFactory for S3LogStoreFactory { let s3_options = S3StorageOptions::from_map(&options.0); if s3_options.locking_provider.as_deref() != Some("dynamodb") { - println!("RETURNING A DEFAAULT"); debug!("S3LogStoreFactory has been asked to create a LogStore without the dynamodb locking provider"); return Ok(deltalake_core::logstore::default_logstore( store, location, options, )); } - println!("RETURNING AN S3 DYNAMODB LOGSTORE {s3_options:?}"); Ok(Arc::new(logstore::S3DynamoDbLogStore::try_new( location.clone(), diff --git a/crates/deltalake-aws/src/logstore.rs b/crates/deltalake-aws/src/logstore.rs index 293ed0895e..a7feb3f44d 100644 --- a/crates/deltalake-aws/src/logstore.rs +++ b/crates/deltalake-aws/src/logstore.rs @@ -80,14 +80,16 @@ impl S3DynamoDbLogStore { return Ok(RepairLogEntryResult::AlreadyCompleted); } for retry in 0..=MAX_REPAIR_RETRIES { - match LogStore::write_commit_entry(self, entry.version, &entry.temp_path).await { + match write_commit_entry(&self.storage, entry.version, &entry.temp_path).await { Ok(()) => { + debug!("Successfully committed entry for version {}", entry.version); return self.try_complete_entry(entry, true).await; } // `N.json` has already been moved, complete the entry in DynamoDb just in case Err(TransactionError::ObjectStore { source: ObjectStoreError::NotFound { .. }, }) => { + warn!("It looks like the {}.json has already been moved, we got 404 from ObjectStorage.", entry.version); return self.try_complete_entry(entry, false).await; } Err(err) if retry == MAX_REPAIR_RETRIES => return Err(err), @@ -105,6 +107,7 @@ impl S3DynamoDbLogStore { entry: &CommitEntry, copy_performed: bool, ) -> Result { + debug!("try_complete_entry for {:?}, {}", entry, copy_performed); for retry in 0..=MAX_REPAIR_RETRIES { match self .lock_client @@ -119,7 +122,7 @@ impl S3DynamoDbLogStore { }) { Ok(x) => return Ok(Self::map_retry_result(x, copy_performed)), Err(err) if retry == MAX_REPAIR_RETRIES => return Err(err), - Err(err) => log::debug!( + Err(err) => error!( "retry #{retry} on log entry {entry:?} failed to update lock db: '{err}'" ), } @@ -162,7 +165,7 @@ impl LogStore for S3DynamoDbLogStore { if let Ok(Some(entry)) = entry { self.repair_entry(&entry).await?; } - LogStore::read_commit_entry(self, version).await + read_commit_entry(&self.storage, version).await } /// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists] @@ -183,9 +186,10 @@ impl LogStore for S3DynamoDbLogStore { .await .map_err(|err| match err { LockClientError::VersionAlreadyExists { version, .. } => { + warn!("LockClientError::VersionAlreadyExists({version})"); TransactionError::VersionAlreadyExists(version) } - LockClientError::ProvisionedThroughputExceeded => todo!(), + LockClientError::ProvisionedThroughputExceeded => todo!("deltalake-aws does not yet handle DynamoDB providioned throughput errors"), LockClientError::LockTableNotFound => { let table_name = self.lock_client.get_lock_table_name(); error!("Lock table '{table_name}' not found"); @@ -228,7 +232,7 @@ impl LogStore for S3DynamoDbLogStore { self.repair_entry(&entry).await?; Ok(entry.version) } else { - LogStore::get_latest_version(self, current_version).await + get_latest_version(self, current_version).await } } @@ -236,11 +240,6 @@ impl LogStore for S3DynamoDbLogStore { self.storage.clone() } - #[cfg(feature = "datafusion")] - fn object_store_url(&self) -> datafusion::execution::object_store::ObjectStoreUrl { - LogStore::object_store_url(&self.config.location) - } - fn config(&self) -> &LogStoreConfig { &self.config } diff --git a/crates/deltalake-aws/src/storage.rs b/crates/deltalake-aws/src/storage.rs index 2b32438cd5..97786e9736 100644 --- a/crates/deltalake-aws/src/storage.rs +++ b/crates/deltalake-aws/src/storage.rs @@ -55,7 +55,6 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { Some((s3_key, value.clone())) }), )?; - println!("STORE: {store:?}, options {options:?}"); let options = S3StorageOptions::from_map(&options.0); let store = S3StorageBackend::try_new( diff --git a/crates/deltalake-aws/tests/integration_s3_dynamodb.rs b/crates/deltalake-aws/tests/integration_s3_dynamodb.rs index 3d575fb4b2..7338ca1509 100644 --- a/crates/deltalake-aws/tests/integration_s3_dynamodb.rs +++ b/crates/deltalake-aws/tests/integration_s3_dynamodb.rs @@ -148,7 +148,6 @@ const COMMITS: i64 = 5; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[serial] async fn test_concurrent_writers() -> TestResult<()> { - let _ = pretty_env_logger::try_init(); // Goal: a test with multiple writers, very similar to `integration_concurrent_writes` let context = IntegrationContext::new(Box::new(S3Integration::default()))?; println!(">>> preparing table"); diff --git a/crates/deltalake-core/src/data_catalog/storage/mod.rs b/crates/deltalake-core/src/data_catalog/storage/mod.rs index eaacbccd4e..741db3cd0e 100644 --- a/crates/deltalake-core/src/data_catalog/storage/mod.rs +++ b/crates/deltalake-core/src/data_catalog/storage/mod.rs @@ -49,7 +49,7 @@ impl ListingSchemaProvider { let uri = ensure_table_uri(root_uri)?; let storage_options = storage_options.unwrap_or_default().into(); // We already parsed the url, so unwrapping is safe. - let store = get_default_store(&uri)?; + let store = store_for(&uri)?; Ok(Self { authority: uri.to_string(), store, diff --git a/crates/deltalake-core/src/logstore/default_logstore.rs b/crates/deltalake-core/src/logstore/default_logstore.rs index 58ac826280..78afc61a8e 100644 --- a/crates/deltalake-core/src/logstore/default_logstore.rs +++ b/crates/deltalake-core/src/logstore/default_logstore.rs @@ -58,12 +58,41 @@ impl LogStore for DefaultLogStore { self.storage.clone() } - #[cfg(feature = "datafusion")] - fn object_store_url(&self) -> datafusion::execution::object_store::ObjectStoreUrl { - super::object_store_url(&self.config.location) - } - fn config(&self) -> &LogStoreConfig { &self.config } } + +#[cfg(feature = "datafusion")] +#[cfg(test)] +mod datafusion_tests { + use super::*; + + use std::collections::HashMap; + use url::Url; + + #[tokio::test] + async fn test_unique_object_store_url() { + let location = Url::parse("memory://table").unwrap(); + let store = crate::logstore::logstore_for(location, HashMap::default()); + assert!(store.is_ok()); + let store = store.unwrap(); + + for (location_1, location_2) in [ + // Same scheme, no host, different path + ("file:///path/to/table_1", "file:///path/to/table_2"), + // Different scheme/host, same path + ("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"), + // Same scheme, different host, same path + ("s3://bucket_1/table_1", "s3://bucket_2/table_1"), + ] { + let url_1 = Url::parse(location_1).unwrap(); + let url_2 = Url::parse(location_2).unwrap(); + + assert_ne!( + store.object_store_url(&url_1).as_str(), + store.object_store_url(&url_2).as_str(), + ); + } + } +} diff --git a/crates/deltalake-core/src/logstore/mod.rs b/crates/deltalake-core/src/logstore/mod.rs index 32b94b34ad..3bfa3ee835 100644 --- a/crates/deltalake-core/src/logstore/mod.rs +++ b/crates/deltalake-core/src/logstore/mod.rs @@ -128,7 +128,6 @@ pub fn logstore_with( location: Url, options: impl Into + Clone, ) -> DeltaResult { - println!("WITH! {store:?}"); let scheme = Url::parse(&format!("{}://", location.scheme())) .map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?; @@ -225,7 +224,18 @@ pub trait LogStore: Sync + Send { /// registering/fetching. In our case the scheme is hard-coded to "delta-rs", so to get a unique /// host we convert the location from this `LogStore` to a valid name, combining the /// original scheme, host and path with invalid characters replaced. - fn object_store_url(&self) -> ObjectStoreUrl; + fn object_store_url(&self) -> ObjectStoreUrl { + use object_store::path::DELIMITER; + let location = &self.config().location; + + ObjectStoreUrl::parse(format!( + "delta-rs://{}-{}{}", + location.scheme(), + location.host_str().unwrap_or("-"), + location.path().replace(DELIMITER, "-").replace(':', "-") + )) + .expect("Invalid object store url.") + } /// Get configuration representing configured log store. fn config(&self) -> &LogStoreConfig; @@ -342,21 +352,6 @@ lazy_static! { static ref DELTA_LOG_REGEX: Regex = Regex::new(r"(\d{20})\.(json|checkpoint).*$").unwrap(); } -#[cfg(feature = "datafusion")] -fn object_store_url(location: &Url) -> ObjectStoreUrl { - // we are certain, that the URL can be parsed, since - // we make sure when we are parsing the table uri - - use object_store::path::DELIMITER; - ObjectStoreUrl::parse(format!( - "delta-rs://{}-{}{}", - location.scheme(), - location.host_str().unwrap_or("-"), - location.path().replace(DELIMITER, "-").replace(':', "-") - )) - .expect("Invalid object store url.") -} - /// Extract version from a file name in the delta log pub fn extract_version_from_filename(name: &str) -> Option { DELTA_LOG_REGEX @@ -364,7 +359,11 @@ pub fn extract_version_from_filename(name: &str) -> Option { .map(|captures| captures.get(1).unwrap().as_str().parse().unwrap()) } -async fn get_latest_version(log_store: &dyn LogStore, current_version: i64) -> DeltaResult { +/// Default implementation for retrieving the latest version +pub async fn get_latest_version( + log_store: &dyn LogStore, + current_version: i64, +) -> DeltaResult { let version_start = match get_last_checkpoint(log_store).await { Ok(last_check_point) => last_check_point.version, Err(ProtocolError::CheckpointNotFound) => { @@ -410,7 +409,10 @@ async fn get_latest_version(log_store: &dyn LogStore, current_version: i64) -> D } /// Read delta log for a specific version -async fn read_commit_entry(storage: &dyn ObjectStore, version: i64) -> DeltaResult> { +pub async fn read_commit_entry( + storage: &dyn ObjectStore, + version: i64, +) -> DeltaResult> { let commit_uri = commit_uri_from_version(version); match storage.get(&commit_uri).await { Ok(res) => Ok(Some(res.bytes().await?)), @@ -419,7 +421,8 @@ async fn read_commit_entry(storage: &dyn ObjectStore, version: i64) -> DeltaResu } } -async fn write_commit_entry( +/// Default implementation for writing a commit entry +pub async fn write_commit_entry( storage: &dyn ObjectStore, version: i64, tmp_commit: &Path, @@ -458,29 +461,3 @@ mod tests { assert!(store.is_ok()); } } - -#[cfg(feature = "datafusion")] -#[cfg(test)] -mod datafusion_tests { - use url::Url; - - #[tokio::test] - async fn test_unique_object_store_url() { - for (location_1, location_2) in [ - // Same scheme, no host, different path - ("file:///path/to/table_1", "file:///path/to/table_2"), - // Different scheme/host, same path - ("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"), - // Same scheme, different host, same path - ("s3://bucket_1/table_1", "s3://bucket_2/table_1"), - ] { - let url_1 = Url::parse(location_1).unwrap(); - let url_2 = Url::parse(location_2).unwrap(); - - assert_ne!( - super::object_store_url(&url_1).as_str(), - super::object_store_url(&url_2).as_str(), - ); - } - } -} diff --git a/crates/deltalake-core/src/storage/mod.rs b/crates/deltalake-core/src/storage/mod.rs index ae602640e6..2398276011 100644 --- a/crates/deltalake-core/src/storage/mod.rs +++ b/crates/deltalake-core/src/storage/mod.rs @@ -13,11 +13,6 @@ pub mod utils; use crate::{DeltaResult, DeltaTableError}; -#[cfg(feature = "datafusion")] -pub use datafusion::execution::object_store::DefaultObjectStoreRegistry; -#[cfg(feature = "datafusion")] -pub(crate) use datafusion::execution::object_store::ObjectStoreRegistry; - pub use object_store; use object_store::local::LocalFileSystem; use object_store::memory::InMemory; @@ -147,7 +142,6 @@ pub fn str_is_truthy(val: &str) -> bool { /// [ObjectStore] pub fn url_prefix_handler(store: T, prefix: Path) -> DeltaResult { if prefix != Path::from("/") { - println!("returning prefix store for {prefix:?}"); Ok(Arc::new(PrefixStore::new(store, prefix))) } else { Ok(Arc::new(store)) diff --git a/crates/deltalake-core/src/storage/registry.rs b/crates/deltalake-core/src/storage/registry.rs deleted file mode 100644 index 8174cbc868..0000000000 --- a/crates/deltalake-core/src/storage/registry.rs +++ /dev/null @@ -1,252 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store. -//! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS -//! and query data inside these systems. - -use crate::{DeltaResult, DeltaTableError}; - -use dashmap::DashMap; -use object_store::local::LocalFileSystem; -use object_store::ObjectStore; -use std::sync::Arc; -use url::Url; - -/// A parsed URL identifying a particular [`ObjectStore`] -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct ObjectStoreUrl { - url: Url, -} - -#[allow(unused)] -impl ObjectStoreUrl { - /// Parse an [`ObjectStoreUrl`] from a string - pub fn parse(s: impl AsRef) -> DeltaResult { - let mut parsed = Url::parse(s.as_ref()).map_err(|e| DeltaTableError::GenericError { - source: Box::new(e), - })?; - - let remaining = &parsed[url::Position::BeforePath..]; - if !remaining.is_empty() && remaining != "/" { - return Err(DeltaTableError::NotATable( - "ObjectStoreUrl must only contain scheme and authority, got: {remaining}".into(), - )); - } - - // Always set path for consistency - parsed.set_path("/"); - Ok(Self { url: parsed }) - } - - /// An [`ObjectStoreUrl`] for the local filesystem - pub fn local_filesystem() -> Self { - Self::parse("file://").unwrap() - } - - /// Returns this [`ObjectStoreUrl`] as a string - pub fn as_str(&self) -> &str { - self.as_ref() - } -} - -impl AsRef for ObjectStoreUrl { - fn as_ref(&self) -> &str { - self.url.as_ref() - } -} - -impl AsRef for ObjectStoreUrl { - fn as_ref(&self) -> &Url { - &self.url - } -} - -impl std::fmt::Display for ObjectStoreUrl { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - self.as_str().fmt(f) - } -} - -/// [`ObjectStoreRegistry`] maps a URL to an [`ObjectStore`] instance, -/// and allows DataFusion to read from different [`ObjectStore`] -/// instances. For example DataFusion might be configured so that -/// -/// 1. `s3://my_bucket/lineitem/` mapped to the `/lineitem` path on an -/// AWS S3 object store bound to `my_bucket` -/// -/// 2. `s3://my_other_bucket/lineitem/` mapped to the (same) -/// `/lineitem` path on a *different* AWS S3 object store bound to -/// `my_other_bucket` -/// -/// When given a [`ListingTableUrl`], DataFusion tries to find an -/// appropriate [`ObjectStore`]. For example -/// -/// ```sql -/// create external table unicorns stored as parquet location 's3://my_bucket/lineitem/'; -/// ``` -/// -/// In this particular case, the url `s3://my_bucket/lineitem/` will be provided to -/// [`ObjectStoreRegistry::get_store`] and one of three things will happen: -/// -/// - If an [`ObjectStore`] has been registered with [`ObjectStoreRegistry::register_store`] with -/// `s3://my_bucket`, that [`ObjectStore`] will be returned -/// -/// - If an AWS S3 object store can be ad-hoc discovered by the url `s3://my_bucket/lineitem/`, this -/// object store will be registered with key `s3://my_bucket` and returned. -/// -/// - Otherwise an error will be returned, indicating that no suitable [`ObjectStore`] could -/// be found -/// -/// This allows for two different use-cases: -/// -/// 1. Systems where object store buckets are explicitly created using DDL, can register these -/// buckets using [`ObjectStoreRegistry::register_store`] -/// -/// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can create [`ObjectStore`] -/// lazily by providing a custom implementation of [`ObjectStoreRegistry`] -/// -/// -/// [`ListingTableUrl`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTableUrl.html -/// [`ObjectStore`]: object_store::ObjectStore -pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { - /// If a store with the same key existed before, it is replaced and returned - fn register_store( - &self, - url: &Url, - store: Arc, - ) -> Option>; - - /// Get a suitable store for the provided URL. For example: - /// - /// - URL with scheme `file:///` or no scheme will return the default LocalFS store - /// - URL with scheme `s3://bucket/` will return the S3 store - /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store - /// - /// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be executed depending on - /// the `url` and [`ObjectStoreRegistry`] implementation. An [`ObjectStore`] may be lazily - /// created and registered. - fn get_store(&self, url: &Url) -> DeltaResult>; -} - -/// The default [`ObjectStoreRegistry`] -pub struct DefaultObjectStoreRegistry { - /// A map from scheme to object store that serve list / read operations for the store - object_stores: DashMap>, -} - -impl std::fmt::Debug for DefaultObjectStoreRegistry { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.debug_struct("DefaultObjectStoreRegistry") - .field( - "schemes", - &self - .object_stores - .iter() - .map(|o| o.key().clone()) - .collect::>(), - ) - .finish() - } -} - -impl Default for DefaultObjectStoreRegistry { - fn default() -> Self { - Self::new() - } -} - -impl DefaultObjectStoreRegistry { - /// This will register [`LocalFileSystem`] to handle `file://` paths - pub fn new() -> Self { - let object_stores: DashMap> = DashMap::new(); - object_stores.insert("file://".to_string(), Arc::new(LocalFileSystem::new())); - Self { object_stores } - } -} - -/// -/// Stores are registered based on the scheme, host and port of the provided URL -/// with a [`LocalFileSystem::new`] automatically registered for `file://` -/// -/// For example: -/// -/// - `file:///my_path` will return the default LocalFS store -/// - `s3://bucket/path` will return a store registered with `s3://bucket` if any -/// - `hdfs://host:port/path` will return a store registered with `hdfs://host:port` if any -impl ObjectStoreRegistry for DefaultObjectStoreRegistry { - fn register_store( - &self, - url: &Url, - store: Arc, - ) -> Option> { - let s = get_url_key(url); - self.object_stores.insert(s, store) - } - - fn get_store(&self, url: &Url) -> DeltaResult> { - let s = get_url_key(url); - self.object_stores - .get(&s) - .map(|o| o.value().clone()) - .ok_or_else(|| { - DeltaTableError::Generic(format!("No suitable object store found for {url}")) - }) - } -} - -/// Get the key of a url for object store registration. -/// The credential info will be removed -fn get_url_key(url: &Url) -> String { - format!( - "{}://{}", - url.scheme(), - &url[url::Position::BeforeHost..url::Position::AfterPort], - ) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_object_store_url() { - let file = ObjectStoreUrl::parse("file://").unwrap(); - assert_eq!(file.as_str(), "file:///"); - - let url = ObjectStoreUrl::parse("s3://bucket").unwrap(); - assert_eq!(url.as_str(), "s3://bucket/"); - - let url = ObjectStoreUrl::parse("s3://username:password@host:123").unwrap(); - assert_eq!(url.as_str(), "s3://username:password@host:123/"); - } - - #[test] - fn test_get_url_key() { - let file = ObjectStoreUrl::parse("file://").unwrap(); - let key = get_url_key(&file.url); - assert_eq!(key.as_str(), "file://"); - - let url = ObjectStoreUrl::parse("s3://bucket").unwrap(); - let key = get_url_key(&url.url); - assert_eq!(key.as_str(), "s3://bucket"); - - let url = ObjectStoreUrl::parse("s3://username:password@host:123").unwrap(); - let key = get_url_key(&url.url); - assert_eq!(key.as_str(), "s3://host:123"); - } -}