Skip to content

Commit

Permalink
temporarily copy some code into deltalake-aws for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed Dec 28, 2023
1 parent 06cbbd8 commit ff4c4d6
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 6 deletions.
2 changes: 1 addition & 1 deletion crates/deltalake-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
pub mod errors;
pub mod logstore;
mod storage;
pub mod storage;

use lazy_static::lazy_static;
use regex::Regex;
Expand Down
180 changes: 180 additions & 0 deletions crates/deltalake-aws/tests/integration_read.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
#![cfg(feature="integration_test")]

use deltalake_core::{DeltaTableBuilder, Path};
use deltalake_test::utils::*;
use serial_test::serial;

static TEST_PREFIXES: &[&str] = &["my table", "你好/😊"];

/// TEST_PREFIXES as they should appear in object stores.
static TEST_PREFIXES_ENCODED: &[&str] = &["my%20table", "%E4%BD%A0%E5%A5%BD/%F0%9F%98%8A"];

#[tokio::test]
#[serial]
Expand All @@ -11,3 +20,174 @@ async fn test_read_tables_aws() -> TestResult {

Ok(())
}

async fn read_tables(storage: StorageIntegration) -> TestResult {
let context = IntegrationContext::new(storage)?;
context.load_table(TestTables::Simple).await?;
context.load_table(TestTables::Golden).await?;
context
.load_table(TestTables::Delta0_8_0SpecialPartitioned)
.await?;

read_simple_table(&context).await?;
read_simple_table_with_version(&context).await?;
read_golden(&context).await?;

Ok(())
}

async fn read_table_paths(
storage: StorageIntegration,
table_root: &str,
upload_path: &str,
) -> TestResult {
let context = IntegrationContext::new(storage)?;
context
.load_table_with_name(TestTables::Delta0_8_0SpecialPartitioned, upload_path)
.await?;

println!("table_root: {}", table_root);
verify_store(&context, table_root).await?;

read_encoded_table(&context, table_root).await?;

Ok(())
}

async fn verify_store(integration: &IntegrationContext, root_path: &str) -> TestResult {
let table_uri = format!("{}/{}", integration.root_uri(), root_path);
println!("working with table_uri: {}", table_uri);
let storage = DeltaTableBuilder::from_uri(table_uri.clone())
.with_allow_http(true)
.build_storage()?
.object_store();

let files = storage.list_with_delimiter(None).await?;
println!("files: {files:?}");
assert_eq!(
vec![
Path::parse("_delta_log").unwrap(),
Path::parse("x=A%2FA").unwrap(),
Path::parse("x=B%20B").unwrap(),
],
files.common_prefixes
);

Ok(())
}

async fn read_encoded_table(integration: &IntegrationContext, root_path: &str) -> TestResult {
let table_uri = format!("{}/{}", integration.root_uri(), root_path);

let table = DeltaTableBuilder::from_uri(table_uri)
.with_allow_http(true)
.load()
.await?;

assert_eq!(table.version(), 0);
assert_eq!(table.get_files().len(), 2);

Ok(())
}

async fn read_simple_table(integration: &IntegrationContext) -> TestResult {
let table_uri = integration.uri_for_table(TestTables::Simple);
// the s3 options don't hurt us for other integrations ...
#[cfg(any(feature = "s3", feature = "s3-native-tls"))]
let table = DeltaTableBuilder::from_uri(table_uri)
.with_allow_http(true)
.load()
.await?;
#[cfg(not(any(feature = "s3", feature = "s3-native-tls")))]
let table = DeltaTableBuilder::from_uri(table_uri)
.with_allow_http(true)
.load()
.await?;

assert_eq!(table.version(), 4);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files(),
vec![
Path::from("part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c000.snappy.parquet"),
Path::from("part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet"),
Path::from("part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet"),
Path::from("part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet"),
Path::from("part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet"),
]
);
let tombstones = table.get_state().all_tombstones();
assert_eq!(tombstones.len(), 31);
assert!(tombstones.contains(&deltalake_core::kernel::Remove {
path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string(),
deletion_timestamp: Some(1587968596250),
data_change: true,
extended_file_metadata: None,
deletion_vector: None,
base_row_id: None,
default_row_commit_version: None,
size: None,
partition_values: None,
tags: None,
}));

Ok(())
}

async fn read_simple_table_with_version(integration: &IntegrationContext) -> TestResult {
let table_uri = integration.uri_for_table(TestTables::Simple);

let table = DeltaTableBuilder::from_uri(table_uri)
.with_allow_http(true)
.with_version(3)
.load()
.await?;

assert_eq!(table.version(), 3);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files(),
vec![
Path::from("part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c000.snappy.parquet"),
Path::from("part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet"),
Path::from("part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet"),
Path::from("part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet"),
Path::from("part-00000-f17fcbf5-e0dc-40ba-adae-ce66d1fcaef6-c000.snappy.parquet"),
Path::from("part-00001-bb70d2ba-c196-4df2-9c85-f34969ad3aa9-c000.snappy.parquet"),
]
);
let tombstones = table.get_state().all_tombstones();
assert_eq!(tombstones.len(), 29);
assert!(tombstones.contains(&deltalake_core::kernel::Remove {
path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string(),
deletion_timestamp: Some(1587968596250),
data_change: true,
tags: None,
partition_values: None,
base_row_id: None,
default_row_commit_version: None,
size: None,
deletion_vector: None,
extended_file_metadata: None,
}));

Ok(())
}

async fn read_golden(integration: &IntegrationContext) -> TestResult {
let table_uri = integration.uri_for_table(TestTables::Golden);

let table = DeltaTableBuilder::from_uri(table_uri)
.with_allow_http(true)
.load()
.await
.unwrap();

assert_eq!(table.version(), 0);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);

Ok(())
}
11 changes: 6 additions & 5 deletions crates/deltalake-aws/tests/repair_s3_rename_test.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#![cfg(feature = "integration_test")]

use bytes::Bytes;
use deltalake_core::test_utils::{IntegrationContext, StorageIntegration};
use deltalake_core::{storage::s3::S3StorageBackend, DeltaTableBuilder, ObjectStore};
use futures::stream::BoxStream;
use object_store::path::Path;
use object_store::{
use deltalake_aws::storage::S3StorageBackend;
use deltalake_core::{DeltaTableBuilder, ObjectStore, Path};
use deltalake_core::storage::object_store::{
DynObjectStore, Error as ObjectStoreError, GetOptions, GetResult, ListResult, MultipartId,
ObjectMeta, Result as ObjectStoreResult,
};
use deltalake_test::utils::{IntegrationContext, StorageIntegration};
use futures::stream::BoxStream;
use serial_test::serial;
use std::ops::Range;
use std::sync::{Arc, Mutex};
Expand Down

0 comments on commit ff4c4d6

Please sign in to comment.