Skip to content

Commit

Permalink
Refactor tests such that non-integration datafusion tests all pass now
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed Dec 30, 2023
1 parent 74afe22 commit 6476de7
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 54 deletions.
4 changes: 3 additions & 1 deletion crates/deltalake-aws/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ impl LogStore for S3DynamoDbLogStore {
warn!("LockClientError::VersionAlreadyExists({version})");
TransactionError::VersionAlreadyExists(version)
}
LockClientError::ProvisionedThroughputExceeded => todo!("deltalake-aws does not yet handle DynamoDB providioned throughput errors"),
LockClientError::ProvisionedThroughputExceeded => todo!(
"deltalake-aws does not yet handle DynamoDB providioned throughput errors"
),
LockClientError::LockTableNotFound => {
let table_name = self.lock_client.get_lock_table_name();
error!("Lock table '{table_name}' not found");
Expand Down
6 changes: 4 additions & 2 deletions crates/deltalake-core/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ mod tests {

#[tokio::test]
async fn test_table_names() {
let fs = ListingSchemaProvider::try_new("./tests/data/", None).unwrap();
let fs = ListingSchemaProvider::try_new("../deltalake-test/tests/data/", None).unwrap();
fs.refresh().await.unwrap();
let table_names = fs.table_names();
assert!(table_names.len() > 20);
Expand All @@ -172,7 +172,9 @@ mod tests {

#[tokio::test]
async fn test_query_table() {
let schema = Arc::new(ListingSchemaProvider::try_new("./tests/data/", None).unwrap());
let schema = Arc::new(
ListingSchemaProvider::try_new("../deltalake-test/tests/data/", None).unwrap(),
);
schema.refresh().await.unwrap();

let ctx = SessionContext::new();
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1907,7 +1907,7 @@ mod tests {

#[tokio::test]
async fn delta_table_provider_with_config() {
let table = crate::open_table("tests/data/delta-2.2.0-partitioned-types")
let table = crate::open_table("../deltalake-test/tests/data/delta-2.2.0-partitioned-types")
.await
.unwrap();
let config = DeltaScanConfigBuilder::new()
Expand Down
34 changes: 0 additions & 34 deletions crates/deltalake-core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,37 +62,3 @@ impl LogStore for DefaultLogStore {
&self.config
}
}

#[cfg(feature = "datafusion")]
#[cfg(test)]
mod datafusion_tests {
use super::*;

use std::collections::HashMap;
use url::Url;

#[tokio::test]
async fn test_unique_object_store_url() {
let location = Url::parse("memory://table").unwrap();
let store = crate::logstore::logstore_for(location, HashMap::default());
assert!(store.is_ok());
let store = store.unwrap();

for (location_1, location_2) in [
// Same scheme, no host, different path
("file:///path/to/table_1", "file:///path/to/table_2"),
// Different scheme/host, same path
("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"),
// Same scheme, different host, same path
("s3://bucket_1/table_1", "s3://bucket_2/table_1"),
] {
let url_1 = Url::parse(location_1).unwrap();
let url_2 = Url::parse(location_2).unwrap();

assert_ne!(
store.object_store_url(&url_1).as_str(),
store.object_store_url(&url_2).as_str(),
);
}
}
}
52 changes: 42 additions & 10 deletions crates/deltalake-core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,22 +225,25 @@ pub trait LogStore: Sync + Send {
/// host we convert the location from this `LogStore` to a valid name, combining the
/// original scheme, host and path with invalid characters replaced.
fn object_store_url(&self) -> ObjectStoreUrl {
use object_store::path::DELIMITER;
let location = &self.config().location;

ObjectStoreUrl::parse(format!(
"delta-rs://{}-{}{}",
location.scheme(),
location.host_str().unwrap_or("-"),
location.path().replace(DELIMITER, "-").replace(':', "-")
))
.expect("Invalid object store url.")
crate::logstore::object_store_url(&self.config().location)
}

/// Get configuration representing configured log store.
fn config(&self) -> &LogStoreConfig;
}

#[cfg(feature = "datafusion")]
fn object_store_url(location: &Url) -> ObjectStoreUrl {
use object_store::path::DELIMITER;
ObjectStoreUrl::parse(format!(
"delta-rs://{}-{}{}",
location.scheme(),
location.host_str().unwrap_or("-"),
location.path().replace(DELIMITER, "-").replace(':', "-")
))
.expect("Invalid object store url.")
}

/// TODO
pub fn to_uri(root: &Url, location: &Path) -> String {
match root.scheme() {
Expand Down Expand Up @@ -461,3 +464,32 @@ mod tests {
assert!(store.is_ok());
}
}

#[cfg(feature = "datafusion")]
#[cfg(test)]
mod datafusion_tests {
use super::*;

use std::collections::HashMap;
use url::Url;

#[tokio::test]
async fn test_unique_object_store_url() {
for (location_1, location_2) in [
// Same scheme, no host, different path
("file:///path/to/table_1", "file:///path/to/table_2"),
// Different scheme/host, same path
("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"),
// Same scheme, different host, same path
("s3://bucket_1/table_1", "s3://bucket_2/table_1"),
] {
let url_1 = Url::parse(location_1).unwrap();
let url_2 = Url::parse(location_2).unwrap();

assert_ne!(
object_store_url(&url_1).as_str(),
object_store_url(&url_2).as_str(),
);
}
}
}
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/operations/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ mod tests {

#[tokio::test]
async fn test_load_local() -> TestResult {
let table = DeltaTableBuilder::from_uri("./tests/data/delta-0.8.0")
let table = DeltaTableBuilder::from_uri("../deltalake-test/tests/data/delta-0.8.0")
.load()
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/tests/fs_common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub async fn create_table_from_json(
partition_columns: Vec<&str>,
config: Value,
) -> DeltaTable {
assert!(path.starts_with("./tests/data"));
assert!(path.starts_with("../deltalake-test/tests/data"));
std::fs::create_dir_all(path).unwrap();
std::fs::remove_dir_all(path).unwrap();
std::fs::create_dir_all(path).unwrap();
Expand Down
9 changes: 5 additions & 4 deletions crates/deltalake-core/tests/read_delta_partitions_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ async fn read_null_partitions_from_checkpoint() {
use serde_json::json;

let mut table = fs_common::create_table_from_json(
"./tests/data/read_null_partitions_from_checkpoint",
"../deltalake-test/tests/data/read_null_partitions_from_checkpoint",
json!({
"type": "struct",
"fields": [
Expand Down Expand Up @@ -175,9 +175,10 @@ async fn load_from_delta_8_0_table_with_special_partition() {
use deltalake_core::{DeltaOps, DeltaTable};
use futures::{future, StreamExt};

let table = deltalake_core::open_table("./tests/data/delta-0.8.0-special-partition")
.await
.unwrap();
let table =
deltalake_core::open_table("../deltalake-test/tests/data/delta-0.8.0-special-partition")
.await
.unwrap();

let (_, stream): (DeltaTable, SendableRecordBatchStream) = DeltaOps(table)
.load()
Expand Down

0 comments on commit 6476de7

Please sign in to comment.