diff --git a/crates/deltalake-aws/src/logstore.rs b/crates/deltalake-aws/src/logstore.rs index a7feb3f44d..37ff26db17 100644 --- a/crates/deltalake-aws/src/logstore.rs +++ b/crates/deltalake-aws/src/logstore.rs @@ -189,7 +189,9 @@ impl LogStore for S3DynamoDbLogStore { warn!("LockClientError::VersionAlreadyExists({version})"); TransactionError::VersionAlreadyExists(version) } - LockClientError::ProvisionedThroughputExceeded => todo!("deltalake-aws does not yet handle DynamoDB providioned throughput errors"), + LockClientError::ProvisionedThroughputExceeded => todo!( + "deltalake-aws does not yet handle DynamoDB providioned throughput errors" + ), LockClientError::LockTableNotFound => { let table_name = self.lock_client.get_lock_table_name(); error!("Lock table '{table_name}' not found"); diff --git a/crates/deltalake-core/src/data_catalog/storage/mod.rs b/crates/deltalake-core/src/data_catalog/storage/mod.rs index 741db3cd0e..a332464952 100644 --- a/crates/deltalake-core/src/data_catalog/storage/mod.rs +++ b/crates/deltalake-core/src/data_catalog/storage/mod.rs @@ -163,7 +163,7 @@ mod tests { #[tokio::test] async fn test_table_names() { - let fs = ListingSchemaProvider::try_new("./tests/data/", None).unwrap(); + let fs = ListingSchemaProvider::try_new("../deltalake-test/tests/data/", None).unwrap(); fs.refresh().await.unwrap(); let table_names = fs.table_names(); assert!(table_names.len() > 20); @@ -172,7 +172,9 @@ mod tests { #[tokio::test] async fn test_query_table() { - let schema = Arc::new(ListingSchemaProvider::try_new("./tests/data/", None).unwrap()); + let schema = Arc::new( + ListingSchemaProvider::try_new("../deltalake-test/tests/data/", None).unwrap(), + ); schema.refresh().await.unwrap(); let ctx = SessionContext::new(); diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index 17d04c692a..038828d31b 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -1907,7 +1907,7 @@ mod tests { #[tokio::test] async fn delta_table_provider_with_config() { - let table = crate::open_table("tests/data/delta-2.2.0-partitioned-types") + let table = crate::open_table("../deltalake-test/tests/data/delta-2.2.0-partitioned-types") .await .unwrap(); let config = DeltaScanConfigBuilder::new() diff --git a/crates/deltalake-core/src/logstore/default_logstore.rs b/crates/deltalake-core/src/logstore/default_logstore.rs index 78afc61a8e..ed463e9947 100644 --- a/crates/deltalake-core/src/logstore/default_logstore.rs +++ b/crates/deltalake-core/src/logstore/default_logstore.rs @@ -62,37 +62,3 @@ impl LogStore for DefaultLogStore { &self.config } } - -#[cfg(feature = "datafusion")] -#[cfg(test)] -mod datafusion_tests { - use super::*; - - use std::collections::HashMap; - use url::Url; - - #[tokio::test] - async fn test_unique_object_store_url() { - let location = Url::parse("memory://table").unwrap(); - let store = crate::logstore::logstore_for(location, HashMap::default()); - assert!(store.is_ok()); - let store = store.unwrap(); - - for (location_1, location_2) in [ - // Same scheme, no host, different path - ("file:///path/to/table_1", "file:///path/to/table_2"), - // Different scheme/host, same path - ("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"), - // Same scheme, different host, same path - ("s3://bucket_1/table_1", "s3://bucket_2/table_1"), - ] { - let url_1 = Url::parse(location_1).unwrap(); - let url_2 = Url::parse(location_2).unwrap(); - - assert_ne!( - store.object_store_url(&url_1).as_str(), - store.object_store_url(&url_2).as_str(), - ); - } - } -} diff --git a/crates/deltalake-core/src/logstore/mod.rs b/crates/deltalake-core/src/logstore/mod.rs index 3bfa3ee835..891286749c 100644 --- a/crates/deltalake-core/src/logstore/mod.rs +++ b/crates/deltalake-core/src/logstore/mod.rs @@ -225,22 +225,25 @@ pub trait LogStore: Sync + Send { /// host we convert the location from this `LogStore` to a valid name, combining the /// original scheme, host and path with invalid characters replaced. fn object_store_url(&self) -> ObjectStoreUrl { - use object_store::path::DELIMITER; - let location = &self.config().location; - - ObjectStoreUrl::parse(format!( - "delta-rs://{}-{}{}", - location.scheme(), - location.host_str().unwrap_or("-"), - location.path().replace(DELIMITER, "-").replace(':', "-") - )) - .expect("Invalid object store url.") + crate::logstore::object_store_url(&self.config().location) } /// Get configuration representing configured log store. fn config(&self) -> &LogStoreConfig; } +#[cfg(feature = "datafusion")] +fn object_store_url(location: &Url) -> ObjectStoreUrl { + use object_store::path::DELIMITER; + ObjectStoreUrl::parse(format!( + "delta-rs://{}-{}{}", + location.scheme(), + location.host_str().unwrap_or("-"), + location.path().replace(DELIMITER, "-").replace(':', "-") + )) + .expect("Invalid object store url.") +} + /// TODO pub fn to_uri(root: &Url, location: &Path) -> String { match root.scheme() { @@ -461,3 +464,32 @@ mod tests { assert!(store.is_ok()); } } + +#[cfg(feature = "datafusion")] +#[cfg(test)] +mod datafusion_tests { + use super::*; + + use std::collections::HashMap; + use url::Url; + + #[tokio::test] + async fn test_unique_object_store_url() { + for (location_1, location_2) in [ + // Same scheme, no host, different path + ("file:///path/to/table_1", "file:///path/to/table_2"), + // Different scheme/host, same path + ("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"), + // Same scheme, different host, same path + ("s3://bucket_1/table_1", "s3://bucket_2/table_1"), + ] { + let url_1 = Url::parse(location_1).unwrap(); + let url_2 = Url::parse(location_2).unwrap(); + + assert_ne!( + object_store_url(&url_1).as_str(), + object_store_url(&url_2).as_str(), + ); + } + } +} diff --git a/crates/deltalake-core/src/operations/load.rs b/crates/deltalake-core/src/operations/load.rs index 610f86dee6..0189381922 100644 --- a/crates/deltalake-core/src/operations/load.rs +++ b/crates/deltalake-core/src/operations/load.rs @@ -88,7 +88,7 @@ mod tests { #[tokio::test] async fn test_load_local() -> TestResult { - let table = DeltaTableBuilder::from_uri("./tests/data/delta-0.8.0") + let table = DeltaTableBuilder::from_uri("../deltalake-test/tests/data/delta-0.8.0") .load() .await .unwrap(); diff --git a/crates/deltalake-core/tests/fs_common/mod.rs b/crates/deltalake-core/tests/fs_common/mod.rs index abef90b05c..ebd4d50b88 100644 --- a/crates/deltalake-core/tests/fs_common/mod.rs +++ b/crates/deltalake-core/tests/fs_common/mod.rs @@ -36,7 +36,7 @@ pub async fn create_table_from_json( partition_columns: Vec<&str>, config: Value, ) -> DeltaTable { - assert!(path.starts_with("./tests/data")); + assert!(path.starts_with("../deltalake-test/tests/data")); std::fs::create_dir_all(path).unwrap(); std::fs::remove_dir_all(path).unwrap(); std::fs::create_dir_all(path).unwrap(); diff --git a/crates/deltalake-core/tests/read_delta_partitions_test.rs b/crates/deltalake-core/tests/read_delta_partitions_test.rs index 182502fb13..e7815af748 100644 --- a/crates/deltalake-core/tests/read_delta_partitions_test.rs +++ b/crates/deltalake-core/tests/read_delta_partitions_test.rs @@ -125,7 +125,7 @@ async fn read_null_partitions_from_checkpoint() { use serde_json::json; let mut table = fs_common::create_table_from_json( - "./tests/data/read_null_partitions_from_checkpoint", + "../deltalake-test/tests/data/read_null_partitions_from_checkpoint", json!({ "type": "struct", "fields": [ @@ -175,9 +175,10 @@ async fn load_from_delta_8_0_table_with_special_partition() { use deltalake_core::{DeltaOps, DeltaTable}; use futures::{future, StreamExt}; - let table = deltalake_core::open_table("./tests/data/delta-0.8.0-special-partition") - .await - .unwrap(); + let table = + deltalake_core::open_table("../deltalake-test/tests/data/delta-0.8.0-special-partition") + .await + .unwrap(); let (_, stream): (DeltaTable, SendableRecordBatchStream) = DeltaOps(table) .load()