From 6935c14b65e876342dbeffc1268d43898a59f375 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 29 Dec 2023 22:22:01 +0000 Subject: [PATCH] Fixing integration_concurrent_writes --- crates/deltalake-aws/src/logstore.rs | 10 +- crates/deltalake-core/src/table/builder.rs | 50 +++++--- .../tests/integration_concurrent_writes.rs | 37 +----- .../deltalake-core/tests/integration_read.rs | 1 - crates/deltalake-test/src/utils.rs | 120 +++--------------- 5 files changed, 60 insertions(+), 158 deletions(-) diff --git a/crates/deltalake-aws/src/logstore.rs b/crates/deltalake-aws/src/logstore.rs index 08bcaef074..293ed0895e 100644 --- a/crates/deltalake-aws/src/logstore.rs +++ b/crates/deltalake-aws/src/logstore.rs @@ -91,9 +91,9 @@ impl S3DynamoDbLogStore { return self.try_complete_entry(entry, false).await; } Err(err) if retry == MAX_REPAIR_RETRIES => return Err(err), - Err(err) => debug!( - "retry #{retry} on log entry {entry:?} failed to move commit: '{err}'" - ), + Err(err) => { + debug!("retry #{retry} on log entry {entry:?} failed to move commit: '{err}'") + } } } unreachable!("for loop yields Ok or Err in body when retry = MAX_REPAIR_RETRIES") @@ -196,14 +196,14 @@ impl LogStore for S3DynamoDbLogStore { ), source: Box::new(err), } - }, + } err => { error!("dynamodb client failed to write log entry: {err:?}"); TransactionError::LogStoreError { msg: "dynamodb client failed to write log entry".to_owned(), source: Box::new(err), } - }, + } })?; // `repair_entry` performs the exact steps required to finalize the commit, but contains // retry logic and more robust error handling under the assumption that any other client diff --git a/crates/deltalake-core/src/table/builder.rs b/crates/deltalake-core/src/table/builder.rs index 60b485e1f2..c52bdec40a 100644 --- a/crates/deltalake-core/src/table/builder.rs +++ b/crates/deltalake-core/src/table/builder.rs @@ -323,6 +323,19 @@ impl DeltaTableBuilder { } } +fn create_filetree_from_path(path: &PathBuf) -> DeltaResult<()> { + if !path.exists() { + std::fs::create_dir_all(path).map_err(|err| { + let msg = format!( + "Could not create local directory: {:?}\nError: {:?}", + path, err + ); + DeltaTableError::InvalidTableLocation(msg) + })?; + } + Ok(()) +} + /// Attempt to create a Url from given table location. /// /// The location could be: @@ -337,33 +350,30 @@ impl DeltaTableBuilder { pub fn ensure_table_uri(table_uri: impl AsRef) -> DeltaResult { let table_uri = table_uri.as_ref(); + debug!("ensure_table_uri {table_uri}"); let mut url = match Url::parse(table_uri) { - Ok(url) => Ok(url), + Ok(url) => { + if url.scheme() == "file" { + create_filetree_from_path( + &url.to_file_path() + .expect("Failed to convert a file:// URL to a file path"), + )?; + } + Ok(url) + } Err(_) => { - debug!("Creating a local table_uri: {}", table_uri); let path = PathBuf::from(table_uri); - if !path.exists() { - std::fs::create_dir_all(&path).map_err(|err| { - let msg = format!( - "Could not create local directory: {}\nError: {:?}", - table_uri, err - ); - DeltaTableError::InvalidTableLocation(msg) - })?; - } - - let path = std::fs::canonicalize(path).map_err(|err| { - let msg = format!("Invalid table location: {}\nError: {:?}", table_uri, err); + let _ = create_filetree_from_path(&path)?; + let path = std::fs::canonicalize(path.clone()).map_err(|err| { + let msg = format!("Invalid table location: {:?}\nError: {:?}", path, err); DeltaTableError::InvalidTableLocation(msg) })?; - debug!("going to create a directory URL: {:?}", path); - - Url::from_directory_path(path).map_err(|_| { + Url::from_directory_path(path.clone()).map_err(|_| { let msg = format!( - "Could not construct a URL from canonicalized path: {}.\n\ + "Could not construct a URL from canonicalized path: {:?}.\n\ Something must be very wrong with the table path.", - table_uri + path, ); DeltaTableError::InvalidTableLocation(msg) }) @@ -401,6 +411,8 @@ mod tests { assert!(uri.is_ok()); let _uri = ensure_table_uri("./nonexistent"); assert!(uri.is_ok()); + let _uri = ensure_table_uri("file:///tmp/nonexistent/some/path"); + assert!(uri.is_ok()); let uri = ensure_table_uri("s3://container/path"); assert!(uri.is_ok()); diff --git a/crates/deltalake-core/tests/integration_concurrent_writes.rs b/crates/deltalake-core/tests/integration_concurrent_writes.rs index f57167f2c1..4e66a9f93f 100644 --- a/crates/deltalake-core/tests/integration_concurrent_writes.rs +++ b/crates/deltalake-core/tests/integration_concurrent_writes.rs @@ -1,11 +1,13 @@ #![cfg(feature = "integration_test")] +use log::*; + use deltalake_core::kernel::{Action, Add, DataType, PrimitiveType, StructField, StructType}; use deltalake_core::operations::transaction::commit; use deltalake_core::operations::DeltaOps; use deltalake_core::protocol::{DeltaOperation, SaveMode}; -use deltalake_core::test_utils::{IntegrationContext, StorageIntegration, TestResult, TestTables}; use deltalake_core::{DeltaTable, DeltaTableBuilder}; +use deltalake_test::utils::*; use serial_test::serial; use std::collections::HashMap; use std::future::Future; @@ -15,38 +17,13 @@ use std::time::Duration; #[tokio::test] #[serial] async fn test_concurrent_writes_local() -> TestResult { - test_concurrent_writes(StorageIntegration::Local).await?; - Ok(()) -} - -#[cfg(feature = "s3")] -#[tokio::test] -#[serial] -async fn concurrent_writes_s3() -> TestResult { - test_concurrent_writes(StorageIntegration::Amazon).await?; - Ok(()) -} - -#[cfg(feature = "azure")] -#[tokio::test] -#[serial] -async fn test_concurrent_writes_azure() -> TestResult { - test_concurrent_writes(StorageIntegration::Microsoft).await?; - Ok(()) -} - -// tracked via https://github.com/datafusion-contrib/datafusion-objectstore-hdfs/issues/13 -#[ignore] -#[cfg(feature = "hdfs")] -#[tokio::test] -#[serial] -async fn test_concurrent_writes_hdfs() -> TestResult { - test_concurrent_writes(StorageIntegration::Hdfs).await?; + let storage = Box::new(LocalStorageIntegration::default()); + let context = IntegrationContext::new(storage)?; + test_concurrent_writes(&context).await?; Ok(()) } -async fn test_concurrent_writes(integration: StorageIntegration) -> TestResult { - let context = IntegrationContext::new(integration)?; +async fn test_concurrent_writes(context: &IntegrationContext) -> TestResult { let (_table, table_uri) = prepare_table(&context).await?; run_test(|name| Worker::new(&table_uri, name)).await; Ok(()) diff --git a/crates/deltalake-core/tests/integration_read.rs b/crates/deltalake-core/tests/integration_read.rs index aa4e146afc..d20b7f4e97 100644 --- a/crates/deltalake-core/tests/integration_read.rs +++ b/crates/deltalake-core/tests/integration_read.rs @@ -2,7 +2,6 @@ use deltalake_core::{DeltaTableBuilder, ObjectStore}; use deltalake_test::utils::*; -use log::*; use object_store::path::Path; use serial_test::serial; diff --git a/crates/deltalake-test/src/utils.rs b/crates/deltalake-test/src/utils.rs index e213a66882..8468bc5728 100644 --- a/crates/deltalake-test/src/utils.rs +++ b/crates/deltalake-test/src/utils.rs @@ -4,26 +4,23 @@ use deltalake_core::{DeltaResult, DeltaTableBuilder}; use fs_extra::dir::{copy, CopyOptions}; use std::collections::HashMap; use std::env; -use tempdir::TempDir; use std::process::ExitStatus; +use tempdir::TempDir; pub type TestResult = Result<(), Box>; pub trait StorageIntegration { fn create_bucket(&self) -> std::io::Result; - fn prepare_env(&self,); + fn prepare_env(&self); fn bucket_name(&self) -> String; fn root_uri(&self) -> String; - fn copy_directory(&self, - source: &str, - destination: &str, - ) -> std::io::Result; + fn copy_directory(&self, source: &str, destination: &str) -> std::io::Result; fn object_store(&self) -> DeltaResult { Ok(DeltaTableBuilder::from_uri(self.root_uri()) - .with_allow_http(true) - .build_storage()? - .object_store()) + .with_allow_http(true) + .build_storage()? + .object_store()) } } @@ -43,20 +40,19 @@ impl StorageIntegration for LocalStorageIntegration { Ok(ExitStatus::default()) } - fn prepare_env(&self,) { } + fn prepare_env(&self) {} fn bucket_name(&self) -> String { self.tmp_dir.as_ref().to_str().unwrap().to_owned() } - fn root_uri(&self) -> String { format!("file://{}", self.bucket_name()) } - fn copy_directory(&self, - source: &str, - destination: &str, - ) -> std::io::Result { + fn root_uri(&self) -> String { + format!("file://{}", self.bucket_name()) + } + fn copy_directory(&self, source: &str, destination: &str) -> std::io::Result { let mut options = CopyOptions::new(); - options.content_only = true; - let dest_path = self.tmp_dir.path().join(destination); - std::fs::create_dir_all(&dest_path)?; - copy(source, &dest_path, &options).expect("Failed to copy"); + options.content_only = true; + let dest_path = self.tmp_dir.path().join(destination); + std::fs::create_dir_all(&dest_path)?; + copy(source, &dest_path, &options).expect("Failed to copy"); Ok(ExitStatus::default()) } } @@ -130,7 +126,8 @@ impl IntegrationContext { table: TestTables, name: impl AsRef, ) -> TestResult { - self.integration.copy_directory(&table.as_path(), name.as_ref())?; + self.integration + .copy_directory(&table.as_path(), name.as_ref())?; Ok(()) } @@ -146,89 +143,6 @@ impl IntegrationContext { } } } -/* -impl Drop for IntegrationContext { - fn drop(&mut self) { - match self.integration { - StorageIntegration::Amazon => { - //s3_cli::delete_bucket(self.root_uri()).unwrap(); - //s3_cli::delete_lock_table().unwrap(); - } - StorageIntegration::Microsoft => { - az_cli::delete_container(&self.bucket).unwrap(); - } - StorageIntegration::Google => { - gs_cli::delete_bucket(&self.bucket).unwrap(); - } - StorageIntegration::Onelake => (), - StorageIntegration::OnelakeAbfs => (), - StorageIntegration::Local => (), - StorageIntegration::Hdfs => { - hdfs_cli::delete_dir(&self.bucket).unwrap(); - } - }; - self.restore_env(); - } -} - -/// Kinds of storage integration -pub enum StorageIntegration { - Amazon, - Microsoft, - Onelake, - Google, - Local, - Hdfs, - OnelakeAbfs, -} - -impl StorageIntegration { - fn prepare_env(&self) { - match self { - Self::Microsoft => az_cli::prepare_env(), - Self::Onelake => onelake_cli::prepare_env(), - Self::Amazon => (), //s3_cli::prepare_env(), - Self::Google => gs_cli::prepare_env(), - Self::OnelakeAbfs => onelake_cli::prepare_env(), - Self::Local => (), - Self::Hdfs => (), - } - } - - fn create_bucket(&self, name: impl AsRef) -> std::io::Result<()> { - match self { - Self::Microsoft => { - az_cli::create_container(name)?; - Ok(()) - } - Self::Onelake => Ok(()), - Self::OnelakeAbfs => Ok(()), - Self::Amazon => { - std::env::set_var( - "DELTA_DYNAMO_TABLE_NAME", - format!("delta_log_it_{}", rand::thread_rng().gen::()), - ); - //s3_cli::create_bucket(format!("s3://{}", name.as_ref()))?; - set_env_if_not_set( - "DYNAMO_LOCK_PARTITION_KEY_VALUE", - format!("s3://{}", name.as_ref()), - ); - //s3_cli::create_lock_table()?; - Ok(()) - } - Self::Google => { - gs_cli::create_bucket(name)?; - Ok(()) - } - Self::Local => Ok(()), - Self::Hdfs => { - hdfs_cli::create_dir(name)?; - Ok(()) - } - } - } -} -*/ /// Reference tables from the test data folder pub enum TestTables {