Skip to content

Commit

Permalink
checkpoint WIP, not seeming like we can create tables
Browse files Browse the repository at this point in the history
cargo test --test integration_s3_dynamodb -- test_concurrent_writers --nocapture

running 1 test
make_bucket: test-delta-table-1703877212
 DEBUG deltalake_core::logstore > Found a storage provider for s3://
STORE: AmazonS3 { client: S3Client { config: S3Config { region: "us-east-2", endpoint: "http://localhost:4566", bucket: "test-delta-table-1703877212", bucket_endpoint: "http://localhost:4566/test-delta-table-1703877212", credentials: StaticCredentialProvider { credential: AwsCredential { key_id: "deltalake", secret_key: "weloverust", token: None } }, retry_config: RetryConfig { backoff: BackoffConfig { init_backoff: 100ms, max_backoff: 15s, base: 2.0 }, max_retries: 10, retry_timeout: 180s }, client_options: ClientOptions { user_agent: None, content_type_map: {}, default_content_type: None, default_headers: None, proxy_url: None, proxy_ca_certificate: None, proxy_excludes: None, allow_http: Deferred("true"), allow_insecure: Parsed(false), timeout: None, connect_timeout: None, pool_idle_timeout: None, pool_max_idle_per_host: None, http2_keep_alive_interval: None, http2_keep_alive_timeout: None, http2_keep_alive_while_idle: Parsed(false), http1_only: Parsed(false), http2_only: Parsed(false) }, sign_payload: true, checksum: None, copy_if_not_exists: None }, client: Client { accepts: Accepts, proxies: [Proxy(System({}), None)], referer: true, default_headers: {"accept": "*/*", "user-agent": "object_store/0.7.1"} } } }, options StorageOptions({"allow_http": "true", "aws_secret_access_key": "weloverust", "aws_region": "us-east-2", "aws_access_key_id": "deltalake", "aws_endpoint": "http://localhost:4566"})
WITH! S3StorageBackend
 DEBUG deltalake_core::logstore > Found a logstore provider for s3://
RETURNING AN S3 DYNAMODB LOGSTORE S3StorageOptions { endpoint_url: Some("http://localhost:4566"), region: Custom { name: "us-east-2", endpoint: "http://localhost:4566" }, profile: None, aws_access_key_id: Some("deltalake"), aws_secret_access_key: Some("weloverust"), aws_session_token: None, virtual_hosted_style_request: false, locking_provider: Some("dynamodb"), assume_role_arn: None, assume_role_session_name: None, use_web_identity: false, s3_pool_idle_timeout: 15s, sts_pool_idle_timeout: 10s, s3_get_internal_server_error_retries: 10, allow_unsafe_rename: false, extra_opts: {"allow_http": "true"} }
 DEBUG deltalake_aws::logstore  > S3DynamoDbLogStore configured with lock client: DynamoDbLockClient(config: DynamoDbConfig { billing_mode: PayPerRequest, lock_table_name: "delta_log_it_47993", use_web_identity: false, region: Custom { name: "us-east-2", endpoint: "http://localhost:4566" } })
>>> preparing table
 DEBUG deltalake_core::logstore > Found a storage provider for s3://
STORE: AmazonS3 { client: S3Client { config: S3Config { region: "us-east-2", endpoint: "http://localhost:4566", bucket: "test-delta-table-1703877212", bucket_endpoint: "http://localhost:4566/test-delta-table-1703877212", credentials: StaticCredentialProvider { credential: AwsCredential { key_id: "deltalake", secret_key: "weloverust", token: None } }, retry_config: RetryConfig { backoff: BackoffConfig { init_backoff: 100ms, max_backoff: 15s, base: 2.0 }, max_retries: 10, retry_timeout: 180s }, client_options: ClientOptions { user_agent: None, content_type_map: {}, default_content_type: None, default_headers: None, proxy_url: None, proxy_ca_certificate: None, proxy_excludes: None, allow_http: Deferred("true"), allow_insecure: Parsed(false), timeout: None, connect_timeout: None, pool_idle_timeout: None, pool_max_idle_per_host: None, http2_keep_alive_interval: None, http2_keep_alive_timeout: None, http2_keep_alive_while_idle: Parsed(false), http1_only: Parsed(false), http2_only: Parsed(false) }, sign_payload: true, checksum: None, copy_if_not_exists: None }, client: Client { accepts: Accepts, proxies: [Proxy(System({}), None)], referer: true, default_headers: {"accept": "*/*", "user-agent": "object_store/0.7.1"} } } }, options StorageOptions({"allow_http": "true", "aws_secret_access_key": "weloverust", "aws_region": "us-east-2", "aws_access_key_id": "deltalake", "aws_endpoint": "http://localhost:4566"})
WITH! S3StorageBackend
 DEBUG deltalake_core::logstore > Found a logstore provider for s3://
returning prefix store for Path { raw: "concurrent_writes_d264d152-fbad-4bea-acf5-976700b8d81a" }
RETURNING AN S3 DYNAMODB LOGSTORE S3StorageOptions { endpoint_url: Some("http://localhost:4566"), region: Custom { name: "us-east-2", endpoint: "http://localhost:4566" }, profile: None, aws_access_key_id: Some("deltalake"), aws_secret_access_key: Some("weloverust"), aws_session_token: None, virtual_hosted_style_request: false, locking_provider: Some("dynamodb"), assume_role_arn: None, assume_role_session_name: None, use_web_identity: false, s3_pool_idle_timeout: 15s, sts_pool_idle_timeout: 10s, s3_get_internal_server_error_retries: 10, allow_unsafe_rename: false, extra_opts: {"allow_http": "true"} }
 DEBUG deltalake_aws::logstore  > S3DynamoDbLogStore configured with lock client: DynamoDbLockClient(config: DynamoDbConfig { billing_mode: PayPerRequest, lock_table_name: "delta_log_it_47993", use_web_identity: false, region: Custom { name: "us-east-2", endpoint: "http://localhost:4566" } })
table built: DeltaTable <s3://test-delta-table-1703877212/concurrent_writes_d264d152-fbad-4bea-acf5-976700b8d81a>
 DEBUG deltalake_aws::logstore  > Writing commit entry for S3DynamoDbLogStore(s3://test-delta-table-1703877212/concurrent_writes_d264d152-fbad-4bea-acf5-976700b8d81a): CommitEntry { version: 0, temp_path: Path { raw: "_delta_log/_commit_4c198b0c-9376-40cb-83b9-b252def3e301.json.tmp" }, complete: false, expire_time: None }
 DEBUG deltalake_aws::logstore  > Writing commit entry for S3DynamoDbLogStore(s3://test-delta-table-1703877212/concurrent_writes_d264d152-fbad-4bea-acf5-976700b8d81a): CommitEntry { version: 0, temp_path: Path { raw: "_delta_log/_commit_4c198b0c-9376-40cb-83b9-b252def3e301.json.tmp" }, complete: false, expire_time: None }
 DEBUG deltalake_aws::logstore  > retry #0 on log entry CommitEntry { version: 0, temp_path: Path { raw: "_delta_log/_commit_4c198b0c-9376-40cb-83b9-b252def3e301.json.tmp" }, complete: false, expire_time: None } failed to move commit: 'Tried committing existing table version: 0'
 DEBUG deltalake_aws::logstore  > Writing commit entry for S3DynamoDbLogStore(s3://test-delta-table-1703877212/concurrent_writes_d264d152-fbad-4bea-acf5-976700b8d81a): CommitEntry { version: 0, temp_path: Path { raw: "_delta_log/_commit_4c198b0c-9376-40cb-83b9-b252def3e301.json.tmp" }, complete: false, expire_time: None }
 DEBUG deltalake_aws::logstore  > retry #1 on log entry CommitEntry { version: 0, temp_path: Path { raw: "_delta_log/_commit_4c198b0c-9376-40cb-83b9-b252def3e301.json.tmp" }, complete: false, expire_time: None } failed to move commit: 'Tried committing existing table version: 0'
 DEBUG deltalake_aws::logstore  > Writing commit entry for S3DynamoDbLogStore(s3://test-delta-table-1703877212/concurrent_writes_d264d152-fbad-4bea-acf5-976700b8d81a): CommitEntry { version: 0, temp_path: Path { raw: "_delta_log/_commit_4c198b0c-9376-40cb-83b9-b252def3e301.json.tmp" }, complete: false, expire_time: None }
 DEBUG deltalake_aws::logstore  > retry #2 on log entry CommitEntry { version: 0, temp_path: Path { raw: "_delta_log/_commit_4c198b0c-9376-40cb-83b9-b252def3e301.json.tmp" }, complete: false, expire_time: None } failed to move commit: 'Tried committing existing table version: 0'
 DEBUG deltalake_aws::logstore  > Writing commit entry for S3DynamoDbLogStore(s3://test-delta-table-1703877212/concurrent_writes_d264d152-fbad-4bea-acf5-976700b8d81a): CommitEntry { version: 0, temp_path: Path { raw: "_delta_log/_commit_4c198b0c-9376-40cb-83b9-b252def3e301.json.tmp" }, complete: false, expire_time: None }
 DEBUG deltalake_aws::logstore  > Writing commit entry for S3DynamoDbLogStore(s3://test-delta-table-1703877212/concurrent_writes_d264d152-fbad-4bea-acf5-976700b8d81a): CommitEntry { version: 0, temp_path: Path { raw: "_delta_log/_commit_4c198b0c-9376-40cb-83b9-b252def3e301.json.tmp" }, complete: false, expire_time: None }
 DEBUG deltalake_aws::logstore  > retry #0 on log entry CommitEntry { version: 0, temp_path: Path { raw: "_delta_log/_commit_4c198b0c-9376-40cb-83b9-b252def3e301.json.tmp" }, complete: false, expire_time: None } failed to move commit: 'Tried committing existing table version: 0'
 DEBUG deltalake_aws::logstore  > Writing commit entry for S3DynamoDbLogStore(s3://test-delta-table-1703877212/concurrent_writes_d264d152-fbad-4bea-acf5-976700b8d81a): CommitEntry { version: 0, temp_path: Path { raw: "_delta_log/_commit_4c198b0c-9376-40cb-83b9-b252def3e301.json.tmp" }, complete: false, expire_time: None }
 DEBUG deltalake_aws::logstore  > retry #1 on log entry CommitEntry { version: 0, temp_path: Path { raw: "_delta_log/_commit_4c198b0c-9376-40cb-83b9-b252def3e301.json.tmp" }, complete: false, expire_time: None } failed to move commit: 'Tried committing existing table version: 0'
 DEBUG deltalake_aws::logstore  > Writing commit entry for S3DynamoDbLogStore(s3://test-delta-table-1703877212/concurrent_writes_d264d152-fbad-4bea-acf5-976700b8d81a): CommitEntry { version: 0, temp_path: Path { raw: "_delta_log/_commit_4c198b0c-9376-40cb-83b9-b252def3e301.json.tmp" }, complete: false, expire_time: None }
 DEBUG deltalake_aws::logstore  > retry #2 on log entry CommitEntry { version: 0, temp_path: Path { raw: "_delta_log/_commit_4c198b0c-9376-40cb-83b9-b252def3e301.json.tmp" }, complete: false, expire_time: None } failed to move commit: 'Tried committing existing table version: 0'
 DEBUG deltalake_aws::logstore  > Writing commit entry for S3DynamoDbLogStore(s3://test-delta-table-1703877212/concurrent_writes_d264d152-fbad-4bea-acf5-976700b8d81a): CommitEntry { version: 0, temp_path: Path { raw: "_delta_log/_commit_4c198b0c-9376-40cb-83b9-b252def3e301.json.tmp" }, complete: false, expire_time: None }
delete: s3://test-delta-table-1703877212/concurrent_writes_d264d152-fbad-4bea-acf5-976700b8d81a/_delta_log/_commit_4c198b0c-9376-40cb-83b9-b252def3e301.json.tmp
remove_bucket: test-delta-table-1703877212
Error: VersionAlreadyExists(0)
test test_concurrent_writers ... FAILED
  • Loading branch information
rtyler committed Dec 29, 2023
1 parent b7bfaf3 commit 725c3c3
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 10 deletions.
1 change: 1 addition & 0 deletions crates/deltalake-aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ url = { workspace = true }
chrono = { workspace = true }
serial_test = "2"
deltalake-test = { path = "../deltalake-test" }
pretty_env_logger = "*"
rand = "0.8"
serde_json = { workspace = true }

Expand Down
7 changes: 7 additions & 0 deletions crates/deltalake-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl LogStoreFactory for S3LogStoreFactory {
store, location, options,
));
}
println!("RETURNING AN S3 DYNAMODB LOGSTORE {s3_options:?}");

Ok(Arc::new(logstore::S3DynamoDbLogStore::try_new(
location.clone(),
Expand Down Expand Up @@ -109,6 +110,12 @@ pub struct DynamoDbLockClient {
config: DynamoDbConfig,
}

impl std::fmt::Debug for DynamoDbLockClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(f, "DynamoDbLockClient(config: {:?})", self.config)
}
}

impl DynamoDbLockClient {
/// Creates a new DynamoDbLockClient from the supplied storage options.
pub fn try_new(
Expand Down
37 changes: 27 additions & 10 deletions crates/deltalake-aws/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{constants, CommitEntry, DynamoDbLockClient, UpdateLogEntryResult};

use bytes::Bytes;
use deltalake_core::{ObjectStoreError, Path};
use log::*;
use url::Url;

use deltalake_core::logstore::*;
Expand All @@ -29,6 +30,12 @@ pub struct S3DynamoDbLogStore {
table_path: String,
}

impl std::fmt::Debug for S3DynamoDbLogStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(f, "S3DynamoDbLogStore({})", self.table_path)
}
}

impl S3DynamoDbLogStore {
/// Create log store
pub fn try_new(
Expand All @@ -49,6 +56,7 @@ impl S3DynamoDbLogStore {
source: err.into(),
},
})?;
debug!("S3DynamoDbLogStore configured with lock client: {lock_client:?}");
let table_path = to_uri(&location, &Path::from(""));
Ok(Self {
storage: object_store,
Expand Down Expand Up @@ -83,7 +91,7 @@ impl S3DynamoDbLogStore {
return self.try_complete_entry(entry, false).await;
}
Err(err) if retry == MAX_REPAIR_RETRIES => return Err(err),
Err(err) => log::debug!(
Err(err) => debug!(
"retry #{retry} on log entry {entry:?} failed to move commit: '{err}'"
),
}
Expand Down Expand Up @@ -168,6 +176,7 @@ impl LogStore for S3DynamoDbLogStore {
tmp_commit: &Path,
) -> Result<(), TransactionError> {
let entry = CommitEntry::new(version, tmp_commit.clone());
debug!("Writing commit entry for {self:?}: {entry:?}");
// create log entry in dynamo db: complete = false, no expireTime
self.lock_client
.put_commit_entry(&self.table_path, &entry)
Expand All @@ -177,16 +186,23 @@ impl LogStore for S3DynamoDbLogStore {
TransactionError::VersionAlreadyExists(version)
}
LockClientError::ProvisionedThroughputExceeded => todo!(),
LockClientError::LockTableNotFound => TransactionError::LogStoreError {
msg: format!(
"lock table '{}' not found",
self.lock_client.get_lock_table_name()
),
source: Box::new(err),
LockClientError::LockTableNotFound => {
let table_name = self.lock_client.get_lock_table_name();
error!("Lock table '{table_name}' not found");
TransactionError::LogStoreError {
msg: format!(
"lock table '{}' not found",
self.lock_client.get_lock_table_name()
),
source: Box::new(err),
}
},
err => TransactionError::LogStoreError {
msg: "dynamodb client failed to write log entry".to_owned(),
source: Box::new(err),
err => {
error!("dynamodb client failed to write log entry: {err:?}");
TransactionError::LogStoreError {
msg: "dynamodb client failed to write log entry".to_owned(),
source: Box::new(err),
}
},
})?;
// `repair_entry` performs the exact steps required to finalize the commit, but contains
Expand All @@ -199,6 +215,7 @@ impl LogStore for S3DynamoDbLogStore {
}

async fn get_latest_version(&self, current_version: i64) -> DeltaResult<i64> {
debug!("Retrieving latest version of {self:?} at v{current_version}");
let entry = self
.lock_client
.get_latest_entry(&self.table_path)
Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
Some((s3_key, value.clone()))
}),
)?;
println!("STORE: {store:?}, options {options:?}");

let options = S3StorageOptions::from_map(&options.0);
let store = S3StorageBackend::try_new(
Expand Down
7 changes: 7 additions & 0 deletions crates/deltalake-aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,14 @@ const COMMITS: i64 = 5;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_concurrent_writers() -> TestResult<()> {
let _ = pretty_env_logger::try_init();
// Goal: a test with multiple writers, very similar to `integration_concurrent_writes`
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
println!(">>> preparing table");
let table = prepare_table(&context, "concurrent_writes").await?;
println!(">>> table prepared");
let table_uri = table.table_uri();
println!("Starting workers on {table_uri}");

let mut workers = Vec::new();
for w in 0..WORKERS {
Expand Down Expand Up @@ -184,6 +188,7 @@ impl Worker {
.load()
.await
.unwrap();
println!("Loaded table in worker: {table:?}");
Self { table, name }
}

Expand Down Expand Up @@ -268,11 +273,13 @@ async fn prepare_table(context: &IntegrationContext, table_name: &str) -> TestRe
.with_allow_http(true)
.with_storage_options(OPTIONS.clone())
.build()?;
println!("table built: {table:?}");
// create delta table
let table = DeltaOps(table)
.create()
.with_columns(schema.fields().clone())
.await?;
println!("table created: {table:?}");
Ok(table)
}

Expand Down

0 comments on commit 725c3c3

Please sign in to comment.