Skip to content

Commit

Permalink
Fix integration test to create a lock table
Browse files Browse the repository at this point in the history
  • Loading branch information
dispanser committed Dec 6, 2023
1 parent 40bc81f commit ede3540
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 52 deletions.
26 changes: 20 additions & 6 deletions crates/deltalake-core/src/logstore/s3/lock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,26 @@ impl DynamoDbLockClient {
let lock_table_name = options
.extra_opts
.get(constants::LOCK_TABLE_KEY_NAME)
.map_or(constants::DEFAULT_LOCK_TABLE_NAME.to_owned(), Clone::clone);
let billing_mode = options
.map_or_else(
|| {
std::env::var(constants::LOCK_TABLE_KEY_NAME)
.unwrap_or(constants::DEFAULT_LOCK_TABLE_NAME.to_owned())
},
Clone::clone,
);

let billing_mode: BillingMode = options
.extra_opts
.get(constants::BILLING_MODE_KEY_NAME)
.map(|bm| BillingMode::from_str(bm))
.unwrap_or(Ok(BillingMode::PayPerRequest))?;
.map_or_else(
|| {
std::env::var(constants::BILLING_MODE_KEY_NAME).map_or_else(
|_| Ok(BillingMode::PayPerRequest),
|bm| BillingMode::from_str(&bm),
)
},
|bm| BillingMode::from_str(bm),
)?;
Ok(Self {
dynamodb_client,
config: DynamoDbConfig {
Expand Down Expand Up @@ -341,8 +355,8 @@ mod constants {
use lazy_static::lazy_static;

pub const DEFAULT_LOCK_TABLE_NAME: &str = "delta_log";
pub const LOCK_TABLE_KEY_NAME: &str = "table_name";
pub const BILLING_MODE_KEY_NAME: &str = "billing_mode";
pub const LOCK_TABLE_KEY_NAME: &str = "DELTA_DYNAMO_TABLE_NAME";
pub const BILLING_MODE_KEY_NAME: &str = "DELTA_DYNAMO_BILLING_MODE";

pub const ATTR_TABLE_PATH: &str = "tablePath";
pub const ATTR_FILE_NAME: &str = "fileName";
Expand Down
7 changes: 7 additions & 0 deletions crates/deltalake-core/src/logstore/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ impl LogStore for S3DynamoDbLogStore {
TransactionError::VersionAlreadyExists(version)
}
LockClientError::ProvisionedThroughputExceeded => todo!(),
LockClientError::LockTableNotFound => TransactionError::LogStoreError {
msg: format!(
"lock table '{}' not found",
self.lock_client.get_lock_table_name()
),
source: Box::new(err),
},
err => TransactionError::LogStoreError {
msg: "dynamodb client failed to write log entry".to_owned(),
source: Box::new(err),
Expand Down
101 changes: 79 additions & 22 deletions crates/deltalake-core/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::DeltaTableBuilder;
use chrono::Utc;
use fs_extra::dir::{copy, CopyOptions};
use object_store::DynObjectStore;
use rand::Rng;
use serde_json::json;
use std::env;
use std::sync::Arc;
Expand Down Expand Up @@ -169,6 +170,7 @@ impl Drop for IntegrationContext {
StorageIntegration::Amazon => {
s3_cli::delete_bucket(self.root_uri()).unwrap();
s3_cli::delete_lock_table().unwrap();
s3_cli::delete_new_lock_table().unwrap();
}
StorageIntegration::Microsoft => {
az_cli::delete_container(&self.bucket).unwrap();
Expand Down Expand Up @@ -219,12 +221,17 @@ impl StorageIntegration {
Self::Onelake => Ok(()),
Self::OnelakeAbfs => Ok(()),
Self::Amazon => {
std::env::set_var(
"DELTA_DYNAMO_TABLE_NAME",
format!("delta_log_it_{}", rand::thread_rng().gen::<u16>()),
);
s3_cli::create_bucket(format!("s3://{}", name.as_ref()))?;
set_env_if_not_set(
"DYNAMO_LOCK_PARTITION_KEY_VALUE",
format!("s3://{}", name.as_ref()),
);
s3_cli::create_lock_table()?;
s3_cli::create_new_lock_table()?;
Ok(())
}
Self::Google => {
Expand Down Expand Up @@ -465,51 +472,101 @@ pub mod s3_cli {
set_env_if_not_set("DYNAMO_LOCK_ADDITIONAL_TIME_TO_WAIT_MILLIS", "100");
}

pub fn create_lock_table() -> std::io::Result<ExitStatus> {
let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
.expect("variable AWS_ENDPOINT_URL must be set to connect to S3 emulator");
let table_name =
std::env::var("DYNAMO_LOCK_TABLE_NAME").unwrap_or_else(|_| "test_table".into());
fn create_dynamodb_table(
table_name: &str,
endpoint_url: &str,
attr_definitions: &[&str],
key_schema: &[&str],
) -> std::io::Result<ExitStatus> {
println!("creating table {}", table_name);
let args01 = [
"dynamodb",
"create-table",
"--table-name",
&table_name,
"--endpoint-url",
&endpoint_url,
"--provisioned-throughput",
"ReadCapacityUnits=10,WriteCapacityUnits=10",
"--attribute-definitions",
];
let args: Vec<_> = args01
.iter()
.chain(attr_definitions.iter())
.chain(["--key-schema"].iter())
.chain(key_schema)
.collect();
let mut child = Command::new("aws")
.args([
"dynamodb",
"create-table",
"--table-name",
&table_name,
"--endpoint-url",
&endpoint,
"--attribute-definitions",
"AttributeName=key,AttributeType=S",
"--key-schema",
"AttributeName=key,KeyType=HASH",
"--provisioned-throughput",
"ReadCapacityUnits=10,WriteCapacityUnits=10",
])
.args(args)
.stdout(Stdio::null())
.spawn()
.expect("aws command is installed");
child.wait()
}

pub fn delete_lock_table() -> std::io::Result<ExitStatus> {
let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
pub fn create_lock_table() -> std::io::Result<ExitStatus> {
let endpoint_url = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
.expect("variable AWS_ENDPOINT_URL must be set to connect to S3 emulator");
let table_name =
std::env::var("DYNAMO_LOCK_TABLE_NAME").unwrap_or_else(|_| "test_table".into());
create_dynamodb_table(
&table_name,
&endpoint_url,
&["AttributeName=key,AttributeType=S"],
&["AttributeName=key,KeyType=HASH"],
)
}

pub fn create_new_lock_table() -> std::io::Result<ExitStatus> {
let endpoint_url = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
.expect("variable AWS_ENDPOINT_URL must be set to connect to S3 emulator");
let table_name =
std::env::var("DELTA_DYNAMO_TABLE_NAME").unwrap_or_else(|_| "delta_log".into());
create_dynamodb_table(
&table_name,
&endpoint_url,
&[
"AttributeName=tablePath,AttributeType=S",
"AttributeName=fileName,AttributeType=S",
],
&[
"AttributeName=tablePath,KeyType=HASH",
"AttributeName=fileName,KeyType=RANGE",
],
)
}

fn delete_dynamodb_table(table_name: &str, endpoint_url: &str) -> std::io::Result<ExitStatus> {
let mut child = Command::new("aws")
.args([
"dynamodb",
"delete-table",
"--table-name",
&table_name,
"--endpoint-url",
&endpoint,
&endpoint_url,
])
.stdout(Stdio::null())
.spawn()
.expect("aws command is installed");
child.wait()
}

pub fn delete_lock_table() -> std::io::Result<ExitStatus> {
let endpoint_url = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
.expect("variable AWS_ENDPOINT_URL must be set to connect to S3 emulator");
let table_name =
std::env::var("DYNAMO_LOCK_TABLE_NAME").unwrap_or_else(|_| "test_table".into());
delete_dynamodb_table(&table_name, &endpoint_url)
}

pub fn delete_new_lock_table() -> std::io::Result<ExitStatus> {
let endpoint_url = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
.expect("variable AWS_ENDPOINT_URL must be set to connect to S3 emulator");
let table_name =
std::env::var("DELTA_DYNAMO_TABLE_NAME").unwrap_or_else(|_| "delta_log".into());
delete_dynamodb_table(&table_name, &endpoint_url)
}
}

/// small wrapper around google api
Expand Down
46 changes: 22 additions & 24 deletions crates/deltalake-core/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,38 +32,34 @@ mod fs_common;
pub type TestResult<T> = Result<T, Box<dyn std::error::Error + 'static>>;

lazy_static! {
static ref LOCK_TABLE_NAME: String = format!("delta_log_tests_constant");
static ref OPTIONS: HashMap<String, String> = maplit::hashmap! {
"allow_http".to_owned() => "true".to_owned(),
"table_name".to_owned() => LOCK_TABLE_NAME.to_owned(),
};
static ref S3_OPTIONS: S3StorageOptions = S3StorageOptions::from_map(&OPTIONS);
static ref CLIENT: DynamoDbLockClient =
DynamoDbLockClient::try_new(&S3StorageOptions::from_map(&OPTIONS))
.expect("failure initializing dynamodb lock client");
static ref TABLE_PATH: String = format!("s3://my_delta_table_{}", uuid::Uuid::new_v4());
}

#[ctor::ctor]
fn prepare_dynamodb() {
let _context = IntegrationContext::new(StorageIntegration::Amazon).unwrap();
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
let _create_table_result = CLIENT.try_create_lock_table().await.unwrap();
});
fn make_client() -> TestResult<DynamoDbLockClient> {
Ok(DynamoDbLockClient::try_new(&S3StorageOptions::from_map(
&OPTIONS,
))?)
}

#[test]
fn client_config_picks_up_lock_table_name() {
assert_eq!(CLIENT.get_lock_table_name(), LOCK_TABLE_NAME.clone());
#[serial]
fn client_config_picks_up_lock_table_name() -> TestResult<()> {
let _context = IntegrationContext::new(StorageIntegration::Amazon)?;
assert!(make_client()?
.get_lock_table_name()
.starts_with("delta_log_it_"));
Ok(())
}

#[tokio::test]
#[serial]
async fn get_missing_item() -> TestResult<()> {
let client = &CLIENT;
let _context = IntegrationContext::new(StorageIntegration::Amazon)?;
let client = make_client()?;
let version = i64::MAX;
let result = client.get_commit_entry(&TABLE_PATH, version).await;
assert_eq!(result.unwrap(), None);
Expand All @@ -85,6 +81,7 @@ async fn test_append() -> TestResult<()> {
#[serial]
async fn test_repair() -> TestResult<()> {
let context = IntegrationContext::new(StorageIntegration::Amazon)?;
let client = make_client()?;
let table = prepare_table(&context, "repair_needed").await?;
let options: StorageOptions = OPTIONS.clone().into();
let log_store: S3DynamoDbLogStore = S3DynamoDbLogStore::try_new(
Expand All @@ -96,7 +93,7 @@ async fn test_repair() -> TestResult<()> {

// create an incomplete log entry, commit file not yet moved from its temporary location
let entry = create_incomplete_commit_entry(&table, 1, "unfinished_commit").await?;
let read_entry = CLIENT
let read_entry = client
.get_latest_entry(&table.table_uri())
.await?
.expect("no latest entry!");
Expand All @@ -112,7 +109,7 @@ async fn test_repair() -> TestResult<()> {
.rename_if_not_exists(&entry.temp_path, &commit_uri_from_version(entry.version))
.await?;

let read_entry = CLIENT
let read_entry = client
.get_latest_entry(&table.table_uri())
.await?
.expect("no latest entry!");
Expand Down Expand Up @@ -228,7 +225,7 @@ async fn create_incomplete_commit_entry(
)
.await?;
let commit_entry = CommitEntry::new(version, temp_path);
CLIENT
make_client()?
.put_commit_entry(&table.table_uri(), &commit_entry)
.await?;
Ok(commit_entry)
Expand Down Expand Up @@ -256,7 +253,7 @@ fn add_action(name: &str) -> Action {
}

async fn prepare_table(context: &IntegrationContext, table_name: &str) -> TestResult<DeltaTable> {
CLIENT.try_create_lock_table().await?;
make_client()?.try_create_lock_table().await?;
let table_name = format!("{}_{}", table_name, uuid::Uuid::new_v4());
let table_uri = context.uri_for_table(TestTables::Custom(table_name.to_owned()));
let schema = StructType::new(vec![StructField::new(
Expand Down Expand Up @@ -303,14 +300,15 @@ async fn append_to_table(
/// version and expiration time is around 24h in the future. Commits should cover a consecutive range
/// of versions, with monotonically non-decreasing expiration timestamps.
async fn validate_lock_table_state(table: &DeltaTable, expected_version: i64) -> TestResult<()> {
let lock_entry = CLIENT.get_latest_entry(&table.table_uri()).await?.unwrap();
let client = make_client()?;
let lock_entry = client.get_latest_entry(&table.table_uri()).await?.unwrap();
assert!(lock_entry.complete);
assert_eq!(lock_entry.version, expected_version);
assert_eq!(lock_entry.version, table.get_latest_version().await?);

validate_commit_entry(&lock_entry)?;

let latest = CLIENT
let latest = client
.get_latest_entries(&table.table_uri(), WORKERS * COMMITS)
.await?;
let max_version = latest.get(0).unwrap().version;
Expand Down

0 comments on commit ede3540

Please sign in to comment.