Skip to content

Commit

Permalink
refactor: consolidate integration test code
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Jan 3, 2024
1 parent 3aa6dc6 commit ea52d83
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 299 deletions.
167 changes: 2 additions & 165 deletions crates/deltalake-aws/tests/integration_read.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
#![cfg(feature = "integration_test")]

use deltalake_core::{DeltaTableBuilder, Path};
use deltalake_test::read::{read_table_paths, test_read_tables};
use deltalake_test::utils::*;
use serial_test::serial;

mod common;
use common::*;

static TEST_PREFIXES: &[&str] = &["my table", "你好/😊"];

/// TEST_PREFIXES as they should appear in object stores.
static TEST_PREFIXES_ENCODED: &[&str] = &["my%20table", "%E4%BD%A0%E5%A5%BD/%F0%9F%98%8A"];

Expand All @@ -17,173 +16,11 @@ static TEST_PREFIXES_ENCODED: &[&str] = &["my%20table", "%E4%BD%A0%E5%A5%BD/%F0%
async fn test_read_tables_aws() -> TestResult {
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;

read_tables(&context).await?;
test_read_tables(&context).await?;

for (prefix, prefix_encoded) in TEST_PREFIXES.iter().zip(TEST_PREFIXES_ENCODED.iter()) {
read_table_paths(&context, prefix, prefix_encoded).await?;
}

Ok(())
}

async fn read_tables(context: &IntegrationContext) -> TestResult {
context.load_table(TestTables::Simple).await?;
context.load_table(TestTables::Golden).await?;
context
.load_table(TestTables::Delta0_8_0SpecialPartitioned)
.await?;

read_simple_table(&context).await?;
read_simple_table_with_version(&context).await?;
read_golden(&context).await?;

Ok(())
}

async fn read_table_paths(
context: &IntegrationContext,
table_root: &str,
upload_path: &str,
) -> TestResult {
context
.load_table_with_name(TestTables::Delta0_8_0SpecialPartitioned, upload_path)
.await?;

println!("table_root: {}", table_root);
verify_store(&context, table_root).await?;

read_encoded_table(&context, table_root).await?;

Ok(())
}

async fn verify_store(integration: &IntegrationContext, root_path: &str) -> TestResult {
let table_uri = format!("{}/{}", integration.root_uri(), root_path);
println!("working with table_uri: {}", table_uri);
let storage = DeltaTableBuilder::from_uri(table_uri.clone())
.with_allow_http(true)
.build_storage()?
.object_store();

let files = storage.list_with_delimiter(None).await?;
println!("files: {files:?}");
assert_eq!(
vec![
Path::parse("_delta_log").unwrap(),
Path::parse("x=A%2FA").unwrap(),
Path::parse("x=B%20B").unwrap(),
],
files.common_prefixes
);

Ok(())
}

async fn read_encoded_table(integration: &IntegrationContext, root_path: &str) -> TestResult {
let table_uri = format!("{}/{}", integration.root_uri(), root_path);

let table = DeltaTableBuilder::from_uri(table_uri)
.with_allow_http(true)
.load()
.await?;

assert_eq!(table.version(), 0);
assert_eq!(table.get_files_iter().count(), 2);

Ok(())
}

async fn read_simple_table(integration: &IntegrationContext) -> TestResult {
let table_uri = integration.uri_for_table(TestTables::Simple);
let table = DeltaTableBuilder::from_uri(table_uri)
.with_allow_http(true)
.load()
.await?;

assert_eq!(table.version(), 4);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files_iter().collect::<Vec<_>>(),
vec![
Path::from("part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c000.snappy.parquet"),
Path::from("part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet"),
Path::from("part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet"),
Path::from("part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet"),
Path::from("part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet"),
]
);
let tombstones = table.get_state().all_tombstones();
assert_eq!(tombstones.len(), 31);
assert!(tombstones.contains(&deltalake_core::kernel::Remove {
path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string(),
deletion_timestamp: Some(1587968596250),
data_change: true,
extended_file_metadata: None,
deletion_vector: None,
base_row_id: None,
default_row_commit_version: None,
size: None,
partition_values: None,
tags: None,
}));

Ok(())
}

async fn read_simple_table_with_version(integration: &IntegrationContext) -> TestResult {
let table_uri = integration.uri_for_table(TestTables::Simple);

let table = DeltaTableBuilder::from_uri(table_uri)
.with_allow_http(true)
.with_version(3)
.load()
.await?;

assert_eq!(table.version(), 3);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files_iter().collect::<Vec<_>>(),
vec![
Path::from("part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c000.snappy.parquet"),
Path::from("part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet"),
Path::from("part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet"),
Path::from("part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet"),
Path::from("part-00000-f17fcbf5-e0dc-40ba-adae-ce66d1fcaef6-c000.snappy.parquet"),
Path::from("part-00001-bb70d2ba-c196-4df2-9c85-f34969ad3aa9-c000.snappy.parquet"),
]
);
let tombstones = table.get_state().all_tombstones();
assert_eq!(tombstones.len(), 29);
assert!(tombstones.contains(&deltalake_core::kernel::Remove {
path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string(),
deletion_timestamp: Some(1587968596250),
data_change: true,
tags: None,
partition_values: None,
base_row_id: None,
default_row_commit_version: None,
size: None,
deletion_vector: None,
extended_file_metadata: None,
}));

Ok(())
}

async fn read_golden(integration: &IntegrationContext) -> TestResult {
let table_uri = integration.uri_for_table(TestTables::Golden);

let table = DeltaTableBuilder::from_uri(table_uri)
.with_allow_http(true)
.load()
.await
.unwrap();

assert_eq!(table.version(), 0);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);

Ok(())
}
2 changes: 0 additions & 2 deletions crates/deltalake-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@ datafusion = [
]
datafusion-ext = ["datafusion"]
gcs = ["object_store/gcp"]
# used only for integration testing
integration_test = []
json = ["parquet/json"]
python = ["arrow/pyarrow"]
unity-experimental = ["reqwest", "hyper"]
Expand Down
2 changes: 0 additions & 2 deletions crates/deltalake-core/tests/command_filesystem_check.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![cfg(feature = "integration_test")]

use deltalake_core::Path;
use deltalake_core::{errors::DeltaTableError, DeltaOps};
use deltalake_test::utils::*;
Expand Down
83 changes: 83 additions & 0 deletions crates/deltalake-core/tests/integration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use deltalake_test::read::read_table_paths;
use deltalake_test::utils::*;
use deltalake_test::{test_concurrent_writes, test_read_tables};
use object_store::path::Path;
use serial_test::serial;

#[allow(dead_code)]
mod fs_common;

static TEST_PREFIXES: &[&str] = &["my table", "你好/😊"];

#[tokio::test]
#[serial]
async fn test_integration_local() -> TestResult {
let context = IntegrationContext::new(Box::new(LocalStorageIntegration::default()))?;

test_read_tables(&context).await?;

for prefix in TEST_PREFIXES {
read_table_paths(&context, prefix, prefix).await?;
}

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_concurrency_local() -> TestResult {
let context = IntegrationContext::new(Box::new(LocalStorageIntegration::default()))?;

test_concurrent_writes(&context).await?;

Ok(())
}

#[tokio::test]
async fn test_action_reconciliation() {
let path = "./tests/data/action_reconciliation";
let mut table = fs_common::create_table(path, None).await;

// Add a file.
let a = fs_common::add(3 * 60 * 1000);
assert_eq!(1, fs_common::commit_add(&mut table, &a).await);
assert_eq!(
table.get_files_iter().collect::<Vec<_>>(),
vec![Path::from(a.path.clone())]
);

// Remove added file.
let r = deltalake_core::kernel::Remove {
path: a.path.clone(),
deletion_timestamp: Some(chrono::Utc::now().timestamp_millis()),
data_change: false,
extended_file_metadata: None,
partition_values: None,
size: None,
tags: None,
deletion_vector: None,
base_row_id: None,
default_row_commit_version: None,
};

assert_eq!(2, fs_common::commit_removes(&mut table, vec![&r]).await);
assert_eq!(table.get_files_iter().count(), 0);
assert_eq!(
table
.get_state()
.all_tombstones()
.iter()
.map(|r| r.path.as_str())
.collect::<Vec<_>>(),
vec![a.path.as_str()]
);

// Add removed file back.
assert_eq!(3, fs_common::commit_add(&mut table, &a).await);
assert_eq!(
table.get_files_iter().collect::<Vec<_>>(),
vec![Path::from(a.path)]
);
// tombstone is removed.
assert_eq!(table.get_state().all_tombstones().len(), 0);
}
2 changes: 1 addition & 1 deletion crates/deltalake-core/tests/integration_datafusion.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![cfg(all(feature = "integration_test", feature = "datafusion"))]
#![cfg(feature = "datafusion")]

use arrow::array::Int64Array;
use deltalake_test::datafusion::*;
Expand Down
2 changes: 2 additions & 0 deletions crates/deltalake-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ chrono = { workspace = true, default-features = false, features = ["clock"] }
deltalake-core = { path = "../deltalake-core" }
dotenvy = "0"
fs_extra = "1.3.0"
object_store = { workspace = true }
rand = "0.8"
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tempfile = "3"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

[features]
default = []
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![cfg(feature = "integration_test")]

use std::collections::HashMap;
use std::future::Future;
use std::iter::FromIterator;
Expand All @@ -10,19 +8,10 @@ use deltalake_core::operations::transaction::commit;
use deltalake_core::operations::DeltaOps;
use deltalake_core::protocol::{DeltaOperation, SaveMode};
use deltalake_core::{DeltaTable, DeltaTableBuilder};
use deltalake_test::utils::*;
use serial_test::serial;

#[tokio::test]
#[serial]
async fn test_concurrent_writes_local() -> TestResult {
let storage = Box::new(LocalStorageIntegration::default());
let context = IntegrationContext::new(storage)?;
test_concurrent_writes(&context).await?;
Ok(())
}

async fn test_concurrent_writes(context: &IntegrationContext) -> TestResult {
use crate::utils::*;

pub async fn test_concurrent_writes(context: &IntegrationContext) -> TestResult {
let (_table, table_uri) = prepare_table(&context).await?;
run_test(|name| Worker::new(&table_uri, name)).await;
Ok(())
Expand Down
5 changes: 5 additions & 0 deletions crates/deltalake-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@ use std::sync::Arc;
use tempfile::TempDir;

pub mod clock;
pub mod concurrent;
#[cfg(feature = "datafusion")]
pub mod datafusion;
pub mod read;
pub mod utils;

pub use concurrent::test_concurrent_writes;
pub use read::test_read_tables;

#[derive(Default)]
pub struct TestContext {
/// The main table under test
Expand Down
Loading

0 comments on commit ea52d83

Please sign in to comment.