diff --git a/crates/deltalake-aws/tests/integration_read.rs b/crates/deltalake-aws/tests/integration_read.rs index 007dc5fba4..0a610b0bf9 100644 --- a/crates/deltalake-aws/tests/integration_read.rs +++ b/crates/deltalake-aws/tests/integration_read.rs @@ -1,6 +1,6 @@ #![cfg(feature = "integration_test")] -use deltalake_core::{DeltaTableBuilder, Path}; +use deltalake_test::read::{read_table_paths, test_read_tables}; use deltalake_test::utils::*; use serial_test::serial; @@ -8,7 +8,6 @@ mod common; use common::*; static TEST_PREFIXES: &[&str] = &["my table", "你好/😊"]; - /// TEST_PREFIXES as they should appear in object stores. static TEST_PREFIXES_ENCODED: &[&str] = &["my%20table", "%E4%BD%A0%E5%A5%BD/%F0%9F%98%8A"]; @@ -17,7 +16,7 @@ static TEST_PREFIXES_ENCODED: &[&str] = &["my%20table", "%E4%BD%A0%E5%A5%BD/%F0% async fn test_read_tables_aws() -> TestResult { let context = IntegrationContext::new(Box::new(S3Integration::default()))?; - read_tables(&context).await?; + test_read_tables(&context).await?; for (prefix, prefix_encoded) in TEST_PREFIXES.iter().zip(TEST_PREFIXES_ENCODED.iter()) { read_table_paths(&context, prefix, prefix_encoded).await?; @@ -25,165 +24,3 @@ async fn test_read_tables_aws() -> TestResult { Ok(()) } - -async fn read_tables(context: &IntegrationContext) -> TestResult { - context.load_table(TestTables::Simple).await?; - context.load_table(TestTables::Golden).await?; - context - .load_table(TestTables::Delta0_8_0SpecialPartitioned) - .await?; - - read_simple_table(&context).await?; - read_simple_table_with_version(&context).await?; - read_golden(&context).await?; - - Ok(()) -} - -async fn read_table_paths( - context: &IntegrationContext, - table_root: &str, - upload_path: &str, -) -> TestResult { - context - .load_table_with_name(TestTables::Delta0_8_0SpecialPartitioned, upload_path) - .await?; - - println!("table_root: {}", table_root); - verify_store(&context, table_root).await?; - - read_encoded_table(&context, table_root).await?; - - Ok(()) -} - -async fn verify_store(integration: &IntegrationContext, root_path: &str) -> TestResult { - let table_uri = format!("{}/{}", integration.root_uri(), root_path); - println!("working with table_uri: {}", table_uri); - let storage = DeltaTableBuilder::from_uri(table_uri.clone()) - .with_allow_http(true) - .build_storage()? - .object_store(); - - let files = storage.list_with_delimiter(None).await?; - println!("files: {files:?}"); - assert_eq!( - vec![ - Path::parse("_delta_log").unwrap(), - Path::parse("x=A%2FA").unwrap(), - Path::parse("x=B%20B").unwrap(), - ], - files.common_prefixes - ); - - Ok(()) -} - -async fn read_encoded_table(integration: &IntegrationContext, root_path: &str) -> TestResult { - let table_uri = format!("{}/{}", integration.root_uri(), root_path); - - let table = DeltaTableBuilder::from_uri(table_uri) - .with_allow_http(true) - .load() - .await?; - - assert_eq!(table.version(), 0); - assert_eq!(table.get_files_iter().count(), 2); - - Ok(()) -} - -async fn read_simple_table(integration: &IntegrationContext) -> TestResult { - let table_uri = integration.uri_for_table(TestTables::Simple); - let table = DeltaTableBuilder::from_uri(table_uri) - .with_allow_http(true) - .load() - .await?; - - assert_eq!(table.version(), 4); - assert_eq!(table.protocol().min_writer_version, 2); - assert_eq!(table.protocol().min_reader_version, 1); - assert_eq!( - table.get_files_iter().collect::>(), - vec![ - Path::from("part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c000.snappy.parquet"), - Path::from("part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet"), - Path::from("part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet"), - Path::from("part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet"), - Path::from("part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet"), - ] - ); - let tombstones = table.get_state().all_tombstones(); - assert_eq!(tombstones.len(), 31); - assert!(tombstones.contains(&deltalake_core::kernel::Remove { - path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string(), - deletion_timestamp: Some(1587968596250), - data_change: true, - extended_file_metadata: None, - deletion_vector: None, - base_row_id: None, - default_row_commit_version: None, - size: None, - partition_values: None, - tags: None, - })); - - Ok(()) -} - -async fn read_simple_table_with_version(integration: &IntegrationContext) -> TestResult { - let table_uri = integration.uri_for_table(TestTables::Simple); - - let table = DeltaTableBuilder::from_uri(table_uri) - .with_allow_http(true) - .with_version(3) - .load() - .await?; - - assert_eq!(table.version(), 3); - assert_eq!(table.protocol().min_writer_version, 2); - assert_eq!(table.protocol().min_reader_version, 1); - assert_eq!( - table.get_files_iter().collect::>(), - vec![ - Path::from("part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c000.snappy.parquet"), - Path::from("part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet"), - Path::from("part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet"), - Path::from("part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet"), - Path::from("part-00000-f17fcbf5-e0dc-40ba-adae-ce66d1fcaef6-c000.snappy.parquet"), - Path::from("part-00001-bb70d2ba-c196-4df2-9c85-f34969ad3aa9-c000.snappy.parquet"), - ] - ); - let tombstones = table.get_state().all_tombstones(); - assert_eq!(tombstones.len(), 29); - assert!(tombstones.contains(&deltalake_core::kernel::Remove { - path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string(), - deletion_timestamp: Some(1587968596250), - data_change: true, - tags: None, - partition_values: None, - base_row_id: None, - default_row_commit_version: None, - size: None, - deletion_vector: None, - extended_file_metadata: None, - })); - - Ok(()) -} - -async fn read_golden(integration: &IntegrationContext) -> TestResult { - let table_uri = integration.uri_for_table(TestTables::Golden); - - let table = DeltaTableBuilder::from_uri(table_uri) - .with_allow_http(true) - .load() - .await - .unwrap(); - - assert_eq!(table.version(), 0); - assert_eq!(table.protocol().min_writer_version, 2); - assert_eq!(table.protocol().min_reader_version, 1); - - Ok(()) -} diff --git a/crates/deltalake-core/Cargo.toml b/crates/deltalake-core/Cargo.toml index cb9e250e8f..1e41f8cbe6 100644 --- a/crates/deltalake-core/Cargo.toml +++ b/crates/deltalake-core/Cargo.toml @@ -137,8 +137,6 @@ datafusion = [ ] datafusion-ext = ["datafusion"] gcs = ["object_store/gcp"] -# used only for integration testing -integration_test = [] json = ["parquet/json"] python = ["arrow/pyarrow"] unity-experimental = ["reqwest", "hyper"] diff --git a/crates/deltalake-core/tests/command_filesystem_check.rs b/crates/deltalake-core/tests/command_filesystem_check.rs index 8d0eee6ac6..ac6142fb10 100644 --- a/crates/deltalake-core/tests/command_filesystem_check.rs +++ b/crates/deltalake-core/tests/command_filesystem_check.rs @@ -1,5 +1,3 @@ -#![cfg(feature = "integration_test")] - use deltalake_core::Path; use deltalake_core::{errors::DeltaTableError, DeltaOps}; use deltalake_test::utils::*; diff --git a/crates/deltalake-core/tests/integration.rs b/crates/deltalake-core/tests/integration.rs new file mode 100644 index 0000000000..f8536dfa9a --- /dev/null +++ b/crates/deltalake-core/tests/integration.rs @@ -0,0 +1,83 @@ +use deltalake_test::read::read_table_paths; +use deltalake_test::utils::*; +use deltalake_test::{test_concurrent_writes, test_read_tables}; +use object_store::path::Path; +use serial_test::serial; + +#[allow(dead_code)] +mod fs_common; + +static TEST_PREFIXES: &[&str] = &["my table", "你好/😊"]; + +#[tokio::test] +#[serial] +async fn test_integration_local() -> TestResult { + let context = IntegrationContext::new(Box::new(LocalStorageIntegration::default()))?; + + test_read_tables(&context).await?; + + for prefix in TEST_PREFIXES { + read_table_paths(&context, prefix, prefix).await?; + } + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_concurrency_local() -> TestResult { + let context = IntegrationContext::new(Box::new(LocalStorageIntegration::default()))?; + + test_concurrent_writes(&context).await?; + + Ok(()) +} + +#[tokio::test] +async fn test_action_reconciliation() { + let path = "./tests/data/action_reconciliation"; + let mut table = fs_common::create_table(path, None).await; + + // Add a file. + let a = fs_common::add(3 * 60 * 1000); + assert_eq!(1, fs_common::commit_add(&mut table, &a).await); + assert_eq!( + table.get_files_iter().collect::>(), + vec![Path::from(a.path.clone())] + ); + + // Remove added file. + let r = deltalake_core::kernel::Remove { + path: a.path.clone(), + deletion_timestamp: Some(chrono::Utc::now().timestamp_millis()), + data_change: false, + extended_file_metadata: None, + partition_values: None, + size: None, + tags: None, + deletion_vector: None, + base_row_id: None, + default_row_commit_version: None, + }; + + assert_eq!(2, fs_common::commit_removes(&mut table, vec![&r]).await); + assert_eq!(table.get_files_iter().count(), 0); + assert_eq!( + table + .get_state() + .all_tombstones() + .iter() + .map(|r| r.path.as_str()) + .collect::>(), + vec![a.path.as_str()] + ); + + // Add removed file back. + assert_eq!(3, fs_common::commit_add(&mut table, &a).await); + assert_eq!( + table.get_files_iter().collect::>(), + vec![Path::from(a.path)] + ); + // tombstone is removed. + assert_eq!(table.get_state().all_tombstones().len(), 0); +} diff --git a/crates/deltalake-core/tests/integration_datafusion.rs b/crates/deltalake-core/tests/integration_datafusion.rs index 45e2a41f6d..a7f6c171f6 100644 --- a/crates/deltalake-core/tests/integration_datafusion.rs +++ b/crates/deltalake-core/tests/integration_datafusion.rs @@ -1,4 +1,4 @@ -#![cfg(all(feature = "integration_test", feature = "datafusion"))] +#![cfg(feature = "datafusion")] use arrow::array::Int64Array; use deltalake_test::datafusion::*; diff --git a/crates/deltalake-test/Cargo.toml b/crates/deltalake-test/Cargo.toml index e41ad6e3ce..20bab3bf8b 100644 --- a/crates/deltalake-test/Cargo.toml +++ b/crates/deltalake-test/Cargo.toml @@ -10,10 +10,12 @@ chrono = { workspace = true, default-features = false, features = ["clock"] } deltalake-core = { path = "../deltalake-core" } dotenvy = "0" fs_extra = "1.3.0" +object_store = { workspace = true } rand = "0.8" serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } tempfile = "3" +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } [features] default = [] diff --git a/crates/deltalake-core/tests/integration_concurrent_writes.rs b/crates/deltalake-test/src/concurrent.rs similarity index 91% rename from crates/deltalake-core/tests/integration_concurrent_writes.rs rename to crates/deltalake-test/src/concurrent.rs index 2b362adf1b..10e486eb36 100644 --- a/crates/deltalake-core/tests/integration_concurrent_writes.rs +++ b/crates/deltalake-test/src/concurrent.rs @@ -1,5 +1,3 @@ -#![cfg(feature = "integration_test")] - use std::collections::HashMap; use std::future::Future; use std::iter::FromIterator; @@ -10,19 +8,10 @@ use deltalake_core::operations::transaction::commit; use deltalake_core::operations::DeltaOps; use deltalake_core::protocol::{DeltaOperation, SaveMode}; use deltalake_core::{DeltaTable, DeltaTableBuilder}; -use deltalake_test::utils::*; -use serial_test::serial; - -#[tokio::test] -#[serial] -async fn test_concurrent_writes_local() -> TestResult { - let storage = Box::new(LocalStorageIntegration::default()); - let context = IntegrationContext::new(storage)?; - test_concurrent_writes(&context).await?; - Ok(()) -} -async fn test_concurrent_writes(context: &IntegrationContext) -> TestResult { +use crate::utils::*; + +pub async fn test_concurrent_writes(context: &IntegrationContext) -> TestResult { let (_table, table_uri) = prepare_table(&context).await?; run_test(|name| Worker::new(&table_uri, name)).await; Ok(()) diff --git a/crates/deltalake-test/src/lib.rs b/crates/deltalake-test/src/lib.rs index 5a698305d6..430b90ec51 100644 --- a/crates/deltalake-test/src/lib.rs +++ b/crates/deltalake-test/src/lib.rs @@ -15,10 +15,15 @@ use std::sync::Arc; use tempfile::TempDir; pub mod clock; +pub mod concurrent; #[cfg(feature = "datafusion")] pub mod datafusion; +pub mod read; pub mod utils; +pub use concurrent::test_concurrent_writes; +pub use read::test_read_tables; + #[derive(Default)] pub struct TestContext { /// The main table under test diff --git a/crates/deltalake-core/tests/integration_read.rs b/crates/deltalake-test/src/read.rs similarity index 69% rename from crates/deltalake-core/tests/integration_read.rs rename to crates/deltalake-test/src/read.rs index c3b042c7df..5c2fd664af 100644 --- a/crates/deltalake-core/tests/integration_read.rs +++ b/crates/deltalake-test/src/read.rs @@ -1,82 +1,9 @@ -#![cfg(feature = "integration_test")] - -use deltalake_core::{DeltaTableBuilder, ObjectStore}; -use deltalake_test::utils::*; +use deltalake_core::DeltaTableBuilder; use object_store::path::Path; -use serial_test::serial; - -#[allow(dead_code)] -mod fs_common; - -static TEST_PREFIXES: &[&str] = &["my table", "你好/😊"]; - -/// TEST_PREFIXES as they should appear in object stores. -static TEST_PREFIXES_ENCODED: &[&str] = &["my%20table", "%E4%BD%A0%E5%A5%BD/%F0%9F%98%8A"]; -#[tokio::test] -#[serial] -async fn test_read_tables_local() -> TestResult { - let storage = Box::new(LocalStorageIntegration::default()); - let context = IntegrationContext::new(storage)?; - read_tables(&context).await?; - - for prefix in TEST_PREFIXES { - read_table_paths(&context, prefix, prefix).await?; - } - - Ok(()) -} - -#[tokio::test] -async fn test_action_reconciliation() { - let path = "./tests/data/action_reconciliation"; - let mut table = fs_common::create_table(path, None).await; - - // Add a file. - let a = fs_common::add(3 * 60 * 1000); - assert_eq!(1, fs_common::commit_add(&mut table, &a).await); - assert_eq!( - table.get_files_iter().collect::>(), - vec![Path::from(a.path.clone())] - ); +use crate::utils::{IntegrationContext, TestResult, TestTables}; - // Remove added file. - let r = deltalake_core::kernel::Remove { - path: a.path.clone(), - deletion_timestamp: Some(chrono::Utc::now().timestamp_millis()), - data_change: false, - extended_file_metadata: None, - partition_values: None, - size: None, - tags: None, - deletion_vector: None, - base_row_id: None, - default_row_commit_version: None, - }; - - assert_eq!(2, fs_common::commit_removes(&mut table, vec![&r]).await); - assert_eq!(table.get_files_iter().count(), 0); - assert_eq!( - table - .get_state() - .all_tombstones() - .iter() - .map(|r| r.path.as_str()) - .collect::>(), - vec![a.path.as_str()] - ); - - // Add removed file back. - assert_eq!(3, fs_common::commit_add(&mut table, &a).await); - assert_eq!( - table.get_files_iter().collect::>(), - vec![Path::from(a.path)] - ); - // tombstone is removed. - assert_eq!(table.get_state().all_tombstones().len(), 0); -} - -async fn read_tables(context: &IntegrationContext) -> TestResult { +pub async fn test_read_tables(context: &IntegrationContext) -> TestResult { context.load_table(TestTables::Simple).await?; context.load_table(TestTables::Golden).await?; context @@ -90,7 +17,7 @@ async fn read_tables(context: &IntegrationContext) -> TestResult { Ok(()) } -async fn read_table_paths( +pub async fn read_table_paths( context: &IntegrationContext, table_root: &str, upload_path: &str, @@ -106,40 +33,6 @@ async fn read_table_paths( Ok(()) } -async fn verify_store(integration: &IntegrationContext, root_path: &str) -> TestResult { - let table_uri = format!("{}/{}", integration.root_uri(), root_path); - let storage = DeltaTableBuilder::from_uri(table_uri.clone()) - .with_allow_http(true) - .build_storage()? - .object_store(); - - let files = storage.list_with_delimiter(None).await?; - assert_eq!( - vec![ - Path::parse("_delta_log").unwrap(), - Path::parse("x=A%2FA").unwrap(), - Path::parse("x=B%20B").unwrap(), - ], - files.common_prefixes - ); - - Ok(()) -} - -async fn read_encoded_table(integration: &IntegrationContext, root_path: &str) -> TestResult { - let table_uri = format!("{}/{}", integration.root_uri(), root_path); - - let table = DeltaTableBuilder::from_uri(table_uri) - .with_allow_http(true) - .load() - .await?; - - assert_eq!(table.version(), 0); - assert_eq!(table.get_files_iter().count(), 2); - - Ok(()) -} - async fn read_simple_table(integration: &IntegrationContext) -> TestResult { let table_uri = integration.uri_for_table(TestTables::Simple); let table = DeltaTableBuilder::from_uri(table_uri) @@ -234,3 +127,38 @@ async fn read_golden(integration: &IntegrationContext) -> TestResult { Ok(()) } + +async fn verify_store(integration: &IntegrationContext, root_path: &str) -> TestResult { + let table_uri = format!("{}/{}", integration.root_uri(), root_path); + + let storage = DeltaTableBuilder::from_uri(table_uri.clone()) + .with_allow_http(true) + .build_storage()? + .object_store(); + + let files = storage.list_with_delimiter(None).await?; + assert_eq!( + vec![ + Path::parse("_delta_log").unwrap(), + Path::parse("x=A%2FA").unwrap(), + Path::parse("x=B%20B").unwrap(), + ], + files.common_prefixes + ); + + Ok(()) +} + +async fn read_encoded_table(integration: &IntegrationContext, root_path: &str) -> TestResult { + let table_uri = format!("{}/{}", integration.root_uri(), root_path); + + let table = DeltaTableBuilder::from_uri(table_uri) + .with_allow_http(true) + .load() + .await?; + + assert_eq!(table.version(), 0); + assert_eq!(table.get_files_iter().count(), 2); + + Ok(()) +} diff --git a/crates/deltalake-test/src/utils.rs b/crates/deltalake-test/src/utils.rs index 0451d7c751..2fe788f5b6 100644 --- a/crates/deltalake-test/src/utils.rs +++ b/crates/deltalake-test/src/utils.rs @@ -7,7 +7,7 @@ use std::env; use std::process::ExitStatus; use tempfile::{tempdir, TempDir}; -pub type TestResult = Result<(), Box>; +pub type TestResult = Result>; pub trait StorageIntegration { fn create_bucket(&self) -> std::io::Result; diff --git a/crates/deltalake/Cargo.toml b/crates/deltalake/Cargo.toml index 9cd0789567..3d4f3eddc1 100644 --- a/crates/deltalake/Cargo.toml +++ b/crates/deltalake/Cargo.toml @@ -34,9 +34,6 @@ s3-native-tls = ["deltalake-aws/native-tls"] s3 = ["deltalake-aws/rustls"] unity-experimental = ["deltalake-core/unity-experimental"] -# used only for integration testing -integration_test = ["deltalake-core/integration_test"] - [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } chrono = { workspace = true, default-features = false, features = ["clock"] }