diff --git a/crates/deltalake-aws/Cargo.toml b/crates/deltalake-aws/Cargo.toml index 755d6d6797..b0f102ce7c 100644 --- a/crates/deltalake-aws/Cargo.toml +++ b/crates/deltalake-aws/Cargo.toml @@ -26,6 +26,7 @@ url = { workspace = true } chrono = { workspace = true } serial_test = "2" deltalake-test = { path = "../deltalake-test" } +pretty_env_logger = "*" rand = "0.8" serde_json = { workspace = true } diff --git a/crates/deltalake-aws/src/lib.rs b/crates/deltalake-aws/src/lib.rs index 118e0db171..00fd02b587 100644 --- a/crates/deltalake-aws/src/lib.rs +++ b/crates/deltalake-aws/src/lib.rs @@ -50,6 +50,7 @@ impl LogStoreFactory for S3LogStoreFactory { store, location, options, )); } + println!("RETURNING AN S3 DYNAMODB LOGSTORE {s3_options:?}"); Ok(Arc::new(logstore::S3DynamoDbLogStore::try_new( location.clone(), @@ -109,6 +110,12 @@ pub struct DynamoDbLockClient { config: DynamoDbConfig, } +impl std::fmt::Debug for DynamoDbLockClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(f, "DynamoDbLockClient(config: {:?})", self.config) + } +} + impl DynamoDbLockClient { /// Creates a new DynamoDbLockClient from the supplied storage options. pub fn try_new( diff --git a/crates/deltalake-aws/src/logstore.rs b/crates/deltalake-aws/src/logstore.rs index f6f5705ed9..08bcaef074 100644 --- a/crates/deltalake-aws/src/logstore.rs +++ b/crates/deltalake-aws/src/logstore.rs @@ -9,6 +9,7 @@ use crate::{constants, CommitEntry, DynamoDbLockClient, UpdateLogEntryResult}; use bytes::Bytes; use deltalake_core::{ObjectStoreError, Path}; +use log::*; use url::Url; use deltalake_core::logstore::*; @@ -29,6 +30,12 @@ pub struct S3DynamoDbLogStore { table_path: String, } +impl std::fmt::Debug for S3DynamoDbLogStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(f, "S3DynamoDbLogStore({})", self.table_path) + } +} + impl S3DynamoDbLogStore { /// Create log store pub fn try_new( @@ -49,6 +56,7 @@ impl S3DynamoDbLogStore { source: err.into(), }, })?; + debug!("S3DynamoDbLogStore configured with lock client: {lock_client:?}"); let table_path = to_uri(&location, &Path::from("")); Ok(Self { storage: object_store, @@ -83,7 +91,7 @@ impl S3DynamoDbLogStore { return self.try_complete_entry(entry, false).await; } Err(err) if retry == MAX_REPAIR_RETRIES => return Err(err), - Err(err) => log::debug!( + Err(err) => debug!( "retry #{retry} on log entry {entry:?} failed to move commit: '{err}'" ), } @@ -168,6 +176,7 @@ impl LogStore for S3DynamoDbLogStore { tmp_commit: &Path, ) -> Result<(), TransactionError> { let entry = CommitEntry::new(version, tmp_commit.clone()); + debug!("Writing commit entry for {self:?}: {entry:?}"); // create log entry in dynamo db: complete = false, no expireTime self.lock_client .put_commit_entry(&self.table_path, &entry) @@ -177,16 +186,23 @@ impl LogStore for S3DynamoDbLogStore { TransactionError::VersionAlreadyExists(version) } LockClientError::ProvisionedThroughputExceeded => todo!(), - LockClientError::LockTableNotFound => TransactionError::LogStoreError { - msg: format!( - "lock table '{}' not found", - self.lock_client.get_lock_table_name() - ), - source: Box::new(err), + LockClientError::LockTableNotFound => { + let table_name = self.lock_client.get_lock_table_name(); + error!("Lock table '{table_name}' not found"); + TransactionError::LogStoreError { + msg: format!( + "lock table '{}' not found", + self.lock_client.get_lock_table_name() + ), + source: Box::new(err), + } }, - err => TransactionError::LogStoreError { - msg: "dynamodb client failed to write log entry".to_owned(), - source: Box::new(err), + err => { + error!("dynamodb client failed to write log entry: {err:?}"); + TransactionError::LogStoreError { + msg: "dynamodb client failed to write log entry".to_owned(), + source: Box::new(err), + } }, })?; // `repair_entry` performs the exact steps required to finalize the commit, but contains @@ -199,6 +215,7 @@ impl LogStore for S3DynamoDbLogStore { } async fn get_latest_version(&self, current_version: i64) -> DeltaResult { + debug!("Retrieving latest version of {self:?} at v{current_version}"); let entry = self .lock_client .get_latest_entry(&self.table_path) diff --git a/crates/deltalake-aws/src/storage.rs b/crates/deltalake-aws/src/storage.rs index 97786e9736..2b32438cd5 100644 --- a/crates/deltalake-aws/src/storage.rs +++ b/crates/deltalake-aws/src/storage.rs @@ -55,6 +55,7 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { Some((s3_key, value.clone())) }), )?; + println!("STORE: {store:?}, options {options:?}"); let options = S3StorageOptions::from_map(&options.0); let store = S3StorageBackend::try_new( diff --git a/crates/deltalake-aws/tests/integration_s3_dynamodb.rs b/crates/deltalake-aws/tests/integration_s3_dynamodb.rs index e3a8d9798d..3d575fb4b2 100644 --- a/crates/deltalake-aws/tests/integration_s3_dynamodb.rs +++ b/crates/deltalake-aws/tests/integration_s3_dynamodb.rs @@ -148,10 +148,14 @@ const COMMITS: i64 = 5; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[serial] async fn test_concurrent_writers() -> TestResult<()> { + let _ = pretty_env_logger::try_init(); // Goal: a test with multiple writers, very similar to `integration_concurrent_writes` let context = IntegrationContext::new(Box::new(S3Integration::default()))?; + println!(">>> preparing table"); let table = prepare_table(&context, "concurrent_writes").await?; + println!(">>> table prepared"); let table_uri = table.table_uri(); + println!("Starting workers on {table_uri}"); let mut workers = Vec::new(); for w in 0..WORKERS { @@ -184,6 +188,7 @@ impl Worker { .load() .await .unwrap(); + println!("Loaded table in worker: {table:?}"); Self { table, name } } @@ -268,11 +273,13 @@ async fn prepare_table(context: &IntegrationContext, table_name: &str) -> TestRe .with_allow_http(true) .with_storage_options(OPTIONS.clone()) .build()?; + println!("table built: {table:?}"); // create delta table let table = DeltaOps(table) .create() .with_columns(schema.fields().clone()) .await?; + println!("table created: {table:?}"); Ok(table) }