Skip to content

Commit

Permalink
Major refactoring of the test utilities to allow storage providers to…
Browse files Browse the repository at this point in the history
… plug and play
  • Loading branch information
rtyler committed Dec 29, 2023
1 parent ff4c4d6 commit 43c822c
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 139 deletions.
5 changes: 2 additions & 3 deletions crates/deltalake-aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ rusoto_core = { version = "0.47", default-features = false, optional = true }
rusoto_credential = { version = "0.47" }
rusoto_sts = { version = "0.47", default-features = false, optional = true }
rusoto_dynamodb = { version = "0.47", default-features = false, optional = true }
object_store = "0.7"
object_store = { version = "0.7.1", features = ["aws"]}
lazy_static = "1"
maplit = "1"
async-trait = { workspace = true }
Expand All @@ -23,6 +23,7 @@ uuid = { workspace = true, features = ["serde", "v4"] }
url = { workspace = true }

[dev-dependencies]
chrono = { workspace = true }
serial_test = "2"
deltalake-test = { path = "../deltalake-test" }

Expand All @@ -33,11 +34,9 @@ native-tls = [
"rusoto_core/native-tls",
"rusoto_sts/native-tls",
"rusoto_dynamodb/native-tls",
"object_store/aws",
]
rustls = [
"rusoto_core/rustls",
"rusoto_sts/rustls",
"rusoto_dynamodb/rustls",
"object_store/aws",
]
2 changes: 1 addition & 1 deletion crates/deltalake-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
time::{Duration, SystemTime},
};

use object_store::path::Path;
use deltalake_core::Path;
use rusoto_core::{HttpClient, Region, RusotoError};
use rusoto_credential::AutoRefreshingProvider;
use rusoto_dynamodb::{
Expand Down
5 changes: 2 additions & 3 deletions crates/deltalake-aws/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use crate::storage::S3StorageOptions;
use crate::{constants, CommitEntry, DynamoDbLockClient, UpdateLogEntryResult};

use bytes::Bytes;
use object_store::path::Path;
use object_store::Error as ObjectStoreError;
use deltalake_core::{ObjectStoreError, Path};
use url::Url;

use deltalake_core::logstore::*;
Expand Down Expand Up @@ -45,7 +44,7 @@ impl S3DynamoDbLogStore {
s3_options.use_web_identity,
)
.map_err(|err| DeltaTableError::ObjectStore {
source: object_store::Error::Generic {
source: ObjectStoreError::Generic {
store: STORE_NAME,
source: err.into(),
},
Expand Down
61 changes: 50 additions & 11 deletions crates/deltalake-aws/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,65 @@
//! AWS S3 storage backend.
use bytes::Bytes;
use deltalake_core::storage::str_is_truthy;
use futures::stream::BoxStream;
use object_store::{path::Path, Error as ObjectStoreError};
use object_store::{
DynObjectStore, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
Result as ObjectStoreResult,
use deltalake_core::storage::object_store::{
aws::AmazonS3ConfigKey, parse_url_opts, GetOptions, GetResult, ListResult, MultipartId,
ObjectMeta, ObjectStore, Result as ObjectStoreResult,
};
use deltalake_core::storage::{str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageOptions};
use deltalake_core::{DeltaResult, ObjectStoreError, Path};
use futures::stream::BoxStream;
use rusoto_core::Region;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Range;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncWrite;
use url::Url;

const STORE_NAME: &str = "DeltaS3ObjectStore";

#[derive(Clone, Default, Debug)]
pub struct S3ObjectStoreFactory {}

impl S3ObjectStoreFactory {
fn with_env_s3(&self, options: &StorageOptions) -> StorageOptions {
let mut options = options.clone();
for (os_key, os_value) in std::env::vars_os() {
if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
if let Ok(config_key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()) {
if !options.0.contains_key(config_key.as_ref()) {
options
.0
.insert(config_key.as_ref().to_string(), value.to_string());
}
}
}
}
options
}
}

impl ObjectStoreFactory for S3ObjectStoreFactory {
fn parse_url_opts(
&self,
url: &Url,
options: &StorageOptions,
) -> DeltaResult<(ObjectStoreRef, Path)> {
let options = self.with_env_s3(options);
let (store, prefix) = parse_url_opts(
url,
options.0.iter().filter_map(|(key, value)| {
let s3_key = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
Some((s3_key, value.clone()))
}),
)?;

Ok((Arc::new(store), prefix))
}
}

/// Options used to configure the [S3StorageBackend].
///
/// Available options are described in [s3_constants].
Expand Down Expand Up @@ -141,7 +183,7 @@ impl Default for S3StorageOptions {

/// An S3 implementation of the [ObjectStore] trait
pub struct S3StorageBackend {
inner: Arc<DynObjectStore>,
inner: ObjectStoreRef,
/// Whether allowed to performance rename_if_not_exist as rename
allow_unsafe_rename: bool,
}
Expand All @@ -156,10 +198,7 @@ impl S3StorageBackend {
/// Creates a new S3StorageBackend.
///
/// Options are described in [s3_constants].
pub fn try_new(
storage: Arc<DynObjectStore>,
allow_unsafe_rename: bool,
) -> ObjectStoreResult<Self> {
pub fn try_new(storage: ObjectStoreRef, allow_unsafe_rename: bool) -> ObjectStoreResult<Self> {
Ok(Self {
inner: storage,
allow_unsafe_rename,
Expand Down
161 changes: 161 additions & 0 deletions crates/deltalake-aws/tests/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use chrono::Utc;
use deltalake_aws::storage::*;
use deltalake_test::utils::*;
use std::process::{Command, ExitStatus, Stdio};

#[derive(Clone, Debug)]
pub struct S3Integration {
bucket_name: String,
}

impl Default for S3Integration {
fn default() -> Self {
Self {
bucket_name: format!("test-delta-table-{}", Utc::now().timestamp()),
}
}
}

impl StorageIntegration for S3Integration {
/// Create a new bucket
fn create_bucket(&self) -> std::io::Result<ExitStatus> {
let mut child = Command::new("aws")
.args(["s3", "mb", &self.root_uri()])
.spawn()
.expect("aws command is installed");
child.wait()
}

fn bucket_name(&self) -> String {
self.bucket_name.clone()
}

fn root_uri(&self) -> String {
format!("s3://{}", &self.bucket_name())
}

/// prepare_env
fn prepare_env(&self) {
match std::env::var(s3_constants::AWS_ENDPOINT_URL).ok() {
Some(endpoint_url) if endpoint_url.to_lowercase() == "none" => {
std::env::remove_var(s3_constants::AWS_ENDPOINT_URL)
}
Some(_) => (),
None => std::env::set_var(s3_constants::AWS_ENDPOINT_URL, "http://localhost:4566"),
}
set_env_if_not_set(s3_constants::AWS_ACCESS_KEY_ID, "deltalake");
set_env_if_not_set(s3_constants::AWS_SECRET_ACCESS_KEY, "weloverust");
set_env_if_not_set(s3_constants::AWS_REGION, "us-east-1");
set_env_if_not_set(s3_constants::AWS_S3_LOCKING_PROVIDER, "dynamodb");
set_env_if_not_set("DYNAMO_LOCK_TABLE_NAME", "test_table");
set_env_if_not_set("DYNAMO_LOCK_REFRESH_PERIOD_MILLIS", "100");
set_env_if_not_set("DYNAMO_LOCK_ADDITIONAL_TIME_TO_WAIT_MILLIS", "100");
}

/// copy directory
fn copy_directory(&self, source: &str, destination: &str) -> std::io::Result<ExitStatus> {
let destination = format!("{}/{destination}", self.root_uri());
let mut child = Command::new("aws")
.args(["s3", "cp", source.as_ref(), &destination, "--recursive"])
.spawn()
.expect("aws command is installed");
child.wait()
}
}

impl S3Integration {
/// delete bucket
fn delete_bucket(bucket_name: impl AsRef<str>) -> std::io::Result<ExitStatus> {
let mut child = Command::new("aws")
.args(["s3", "rb", bucket_name.as_ref(), "--force"])
.spawn()
.expect("aws command is installed");
child.wait()
}
fn create_dynamodb_table(
table_name: &str,
attr_definitions: &[&str],
key_schema: &[&str],
) -> std::io::Result<ExitStatus> {
let args = [
"dynamodb",
"create-table",
"--table-name",
&table_name,
"--provisioned-throughput",
"ReadCapacityUnits=10,WriteCapacityUnits=10",
"--attribute-definitions",
];
let mut child = Command::new("aws")
.args(args)
.args(attr_definitions.iter())
.arg("--key-schema")
.args(key_schema)
.stdout(Stdio::null())
.spawn()
.expect("aws command is installed");
let status = child.wait()?;
Self::wait_for_table(table_name)?;
Ok(status)
}

fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
haystack
.windows(needle.len())
.position(|window| window == needle)
}

fn wait_for_table(table_name: &str) -> std::io::Result<()> {
let args = ["dynamodb", "describe-table", "--table-name", &table_name];
loop {
let output = Command::new("aws")
.args(args)
.output()
.expect("aws command is installed");
if Self::find_subsequence(&output.stdout, "CREATING".as_bytes()).is_some() {
std::thread::sleep(std::time::Duration::from_millis(200));
continue;
} else {
return Ok(());
}
}
}

pub fn create_lock_table() -> std::io::Result<ExitStatus> {
let table_name =
std::env::var("DELTA_DYNAMO_TABLE_NAME").unwrap_or_else(|_| "delta_log".into());
Self::create_dynamodb_table(
&table_name,
&[
"AttributeName=tablePath,AttributeType=S",
"AttributeName=fileName,AttributeType=S",
],
&[
"AttributeName=tablePath,KeyType=HASH",
"AttributeName=fileName,KeyType=RANGE",
],
)
}

fn delete_dynamodb_table(table_name: &str) -> std::io::Result<ExitStatus> {
let mut child = Command::new("aws")
.args(["dynamodb", "delete-table", "--table-name", &table_name])
.stdout(Stdio::null())
.spawn()
.expect("aws command is installed");
child.wait()
}

pub fn delete_lock_table() -> std::io::Result<ExitStatus> {
let table_name =
std::env::var("DELTA_DYNAMO_TABLE_NAME").unwrap_or_else(|_| "delta_log".into());
Self::delete_dynamodb_table(&table_name)
}
}

impl Drop for S3Integration {
fn drop(&mut self) {
Self::delete_bucket(self.root_uri()).expect("Failed to drop bucket");
Self::delete_lock_table().expect("Failed to delete lock table");
}
}
24 changes: 17 additions & 7 deletions crates/deltalake-aws/tests/integration_read.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
#![cfg(feature="integration_test")]
#![cfg(feature = "integration_test")]

use deltalake_aws::storage::*;
use deltalake_core::storage::factories;
use deltalake_core::{DeltaTableBuilder, Path};
use deltalake_test::utils::*;
use serial_test::serial;
use url::Url;

mod common;
use common::*;

static TEST_PREFIXES: &[&str] = &["my table", "你好/😊"];

Expand All @@ -12,17 +18,22 @@ static TEST_PREFIXES_ENCODED: &[&str] = &["my%20table", "%E4%BD%A0%E5%A5%BD/%F0%
#[tokio::test]
#[serial]
async fn test_read_tables_aws() -> TestResult {
read_tables(StorageIntegration::Amazon).await?;
factories().insert(
Url::parse("s3://").unwrap(),
std::sync::Arc::new(S3ObjectStoreFactory::default()),
);
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;

read_tables(&context).await?;

for (prefix, prefix_encoded) in TEST_PREFIXES.iter().zip(TEST_PREFIXES_ENCODED.iter()) {
read_table_paths(StorageIntegration::Amazon, prefix, prefix_encoded).await?;
read_table_paths(&context, prefix, prefix_encoded).await?;
}

Ok(())
}

async fn read_tables(storage: StorageIntegration) -> TestResult {
let context = IntegrationContext::new(storage)?;
async fn read_tables(context: &IntegrationContext) -> TestResult {
context.load_table(TestTables::Simple).await?;
context.load_table(TestTables::Golden).await?;
context
Expand All @@ -37,11 +48,10 @@ async fn read_tables(storage: StorageIntegration) -> TestResult {
}

async fn read_table_paths(
storage: StorageIntegration,
context: &IntegrationContext,
table_root: &str,
upload_path: &str,
) -> TestResult {
let context = IntegrationContext::new(storage)?;
context
.load_table_with_name(TestTables::Delta0_8_0SpecialPartitioned, upload_path)
.await?;
Expand Down
7 changes: 5 additions & 2 deletions crates/deltalake-aws/tests/repair_s3_rename_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

use bytes::Bytes;
use deltalake_aws::storage::S3StorageBackend;
use deltalake_core::{DeltaTableBuilder, ObjectStore, Path};
use deltalake_core::storage::object_store::{
DynObjectStore, Error as ObjectStoreError, GetOptions, GetResult, ListResult, MultipartId,
ObjectMeta, Result as ObjectStoreResult,
};
use deltalake_core::{DeltaTableBuilder, ObjectStore, Path};
use deltalake_test::utils::{IntegrationContext, StorageIntegration};
use futures::stream::BoxStream;
use serial_test::serial;
Expand All @@ -16,6 +16,9 @@ use tokio::io::AsyncWrite;
use tokio::task::JoinHandle;
use tokio::time::Duration;

mod common;
use common::*;

#[tokio::test(flavor = "multi_thread")]
#[serial]
#[ignore = "currently tests are hanging"]
Expand All @@ -41,7 +44,7 @@ async fn repair_when_worker_pauses_after_rename_test() {
async fn run_repair_test_case(path: &str, pause_copy: bool) -> Result<(), ObjectStoreError> {
std::env::set_var("AWS_S3_LOCKING_PROVIDER", "dynamodb");
std::env::set_var("DYNAMO_LOCK_LEASE_DURATION", "2");
let context = IntegrationContext::new(StorageIntegration::Amazon).unwrap();
let context = IntegrationContext::new(Box::new(S3Integration::default())).unwrap();

let root_path = Path::from(path);
let src1 = root_path.child("src1");
Expand Down
Loading

0 comments on commit 43c822c

Please sign in to comment.