From cce5fdd405fe6fed783f19c627cf318ffb51cdc0 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 29 Dec 2023 22:22:01 +0000 Subject: [PATCH] Fixing integration_concurrent_writes --- crates/deltalake-core/src/table/builder.rs | 50 ++++++----- .../tests/integration_concurrent_writes.rs | 37 ++------- .../deltalake-core/tests/integration_read.rs | 1 - crates/deltalake-test/src/utils.rs | 83 ------------------- 4 files changed, 38 insertions(+), 133 deletions(-) diff --git a/crates/deltalake-core/src/table/builder.rs b/crates/deltalake-core/src/table/builder.rs index 60b485e1f2..c52bdec40a 100644 --- a/crates/deltalake-core/src/table/builder.rs +++ b/crates/deltalake-core/src/table/builder.rs @@ -323,6 +323,19 @@ impl DeltaTableBuilder { } } +fn create_filetree_from_path(path: &PathBuf) -> DeltaResult<()> { + if !path.exists() { + std::fs::create_dir_all(path).map_err(|err| { + let msg = format!( + "Could not create local directory: {:?}\nError: {:?}", + path, err + ); + DeltaTableError::InvalidTableLocation(msg) + })?; + } + Ok(()) +} + /// Attempt to create a Url from given table location. /// /// The location could be: @@ -337,33 +350,30 @@ impl DeltaTableBuilder { pub fn ensure_table_uri(table_uri: impl AsRef) -> DeltaResult { let table_uri = table_uri.as_ref(); + debug!("ensure_table_uri {table_uri}"); let mut url = match Url::parse(table_uri) { - Ok(url) => Ok(url), + Ok(url) => { + if url.scheme() == "file" { + create_filetree_from_path( + &url.to_file_path() + .expect("Failed to convert a file:// URL to a file path"), + )?; + } + Ok(url) + } Err(_) => { - debug!("Creating a local table_uri: {}", table_uri); let path = PathBuf::from(table_uri); - if !path.exists() { - std::fs::create_dir_all(&path).map_err(|err| { - let msg = format!( - "Could not create local directory: {}\nError: {:?}", - table_uri, err - ); - DeltaTableError::InvalidTableLocation(msg) - })?; - } - - let path = std::fs::canonicalize(path).map_err(|err| { - let msg = format!("Invalid table location: {}\nError: {:?}", table_uri, err); + let _ = create_filetree_from_path(&path)?; + let path = std::fs::canonicalize(path.clone()).map_err(|err| { + let msg = format!("Invalid table location: {:?}\nError: {:?}", path, err); DeltaTableError::InvalidTableLocation(msg) })?; - debug!("going to create a directory URL: {:?}", path); - - Url::from_directory_path(path).map_err(|_| { + Url::from_directory_path(path.clone()).map_err(|_| { let msg = format!( - "Could not construct a URL from canonicalized path: {}.\n\ + "Could not construct a URL from canonicalized path: {:?}.\n\ Something must be very wrong with the table path.", - table_uri + path, ); DeltaTableError::InvalidTableLocation(msg) }) @@ -401,6 +411,8 @@ mod tests { assert!(uri.is_ok()); let _uri = ensure_table_uri("./nonexistent"); assert!(uri.is_ok()); + let _uri = ensure_table_uri("file:///tmp/nonexistent/some/path"); + assert!(uri.is_ok()); let uri = ensure_table_uri("s3://container/path"); assert!(uri.is_ok()); diff --git a/crates/deltalake-core/tests/integration_concurrent_writes.rs b/crates/deltalake-core/tests/integration_concurrent_writes.rs index f57167f2c1..4e66a9f93f 100644 --- a/crates/deltalake-core/tests/integration_concurrent_writes.rs +++ b/crates/deltalake-core/tests/integration_concurrent_writes.rs @@ -1,11 +1,13 @@ #![cfg(feature = "integration_test")] +use log::*; + use deltalake_core::kernel::{Action, Add, DataType, PrimitiveType, StructField, StructType}; use deltalake_core::operations::transaction::commit; use deltalake_core::operations::DeltaOps; use deltalake_core::protocol::{DeltaOperation, SaveMode}; -use deltalake_core::test_utils::{IntegrationContext, StorageIntegration, TestResult, TestTables}; use deltalake_core::{DeltaTable, DeltaTableBuilder}; +use deltalake_test::utils::*; use serial_test::serial; use std::collections::HashMap; use std::future::Future; @@ -15,38 +17,13 @@ use std::time::Duration; #[tokio::test] #[serial] async fn test_concurrent_writes_local() -> TestResult { - test_concurrent_writes(StorageIntegration::Local).await?; - Ok(()) -} - -#[cfg(feature = "s3")] -#[tokio::test] -#[serial] -async fn concurrent_writes_s3() -> TestResult { - test_concurrent_writes(StorageIntegration::Amazon).await?; - Ok(()) -} - -#[cfg(feature = "azure")] -#[tokio::test] -#[serial] -async fn test_concurrent_writes_azure() -> TestResult { - test_concurrent_writes(StorageIntegration::Microsoft).await?; - Ok(()) -} - -// tracked via https://github.com/datafusion-contrib/datafusion-objectstore-hdfs/issues/13 -#[ignore] -#[cfg(feature = "hdfs")] -#[tokio::test] -#[serial] -async fn test_concurrent_writes_hdfs() -> TestResult { - test_concurrent_writes(StorageIntegration::Hdfs).await?; + let storage = Box::new(LocalStorageIntegration::default()); + let context = IntegrationContext::new(storage)?; + test_concurrent_writes(&context).await?; Ok(()) } -async fn test_concurrent_writes(integration: StorageIntegration) -> TestResult { - let context = IntegrationContext::new(integration)?; +async fn test_concurrent_writes(context: &IntegrationContext) -> TestResult { let (_table, table_uri) = prepare_table(&context).await?; run_test(|name| Worker::new(&table_uri, name)).await; Ok(()) diff --git a/crates/deltalake-core/tests/integration_read.rs b/crates/deltalake-core/tests/integration_read.rs index aa4e146afc..d20b7f4e97 100644 --- a/crates/deltalake-core/tests/integration_read.rs +++ b/crates/deltalake-core/tests/integration_read.rs @@ -2,7 +2,6 @@ use deltalake_core::{DeltaTableBuilder, ObjectStore}; use deltalake_test::utils::*; -use log::*; use object_store::path::Path; use serial_test::serial; diff --git a/crates/deltalake-test/src/utils.rs b/crates/deltalake-test/src/utils.rs index e213a66882..e739d1e159 100644 --- a/crates/deltalake-test/src/utils.rs +++ b/crates/deltalake-test/src/utils.rs @@ -146,89 +146,6 @@ impl IntegrationContext { } } } -/* -impl Drop for IntegrationContext { - fn drop(&mut self) { - match self.integration { - StorageIntegration::Amazon => { - //s3_cli::delete_bucket(self.root_uri()).unwrap(); - //s3_cli::delete_lock_table().unwrap(); - } - StorageIntegration::Microsoft => { - az_cli::delete_container(&self.bucket).unwrap(); - } - StorageIntegration::Google => { - gs_cli::delete_bucket(&self.bucket).unwrap(); - } - StorageIntegration::Onelake => (), - StorageIntegration::OnelakeAbfs => (), - StorageIntegration::Local => (), - StorageIntegration::Hdfs => { - hdfs_cli::delete_dir(&self.bucket).unwrap(); - } - }; - self.restore_env(); - } -} - -/// Kinds of storage integration -pub enum StorageIntegration { - Amazon, - Microsoft, - Onelake, - Google, - Local, - Hdfs, - OnelakeAbfs, -} - -impl StorageIntegration { - fn prepare_env(&self) { - match self { - Self::Microsoft => az_cli::prepare_env(), - Self::Onelake => onelake_cli::prepare_env(), - Self::Amazon => (), //s3_cli::prepare_env(), - Self::Google => gs_cli::prepare_env(), - Self::OnelakeAbfs => onelake_cli::prepare_env(), - Self::Local => (), - Self::Hdfs => (), - } - } - - fn create_bucket(&self, name: impl AsRef) -> std::io::Result<()> { - match self { - Self::Microsoft => { - az_cli::create_container(name)?; - Ok(()) - } - Self::Onelake => Ok(()), - Self::OnelakeAbfs => Ok(()), - Self::Amazon => { - std::env::set_var( - "DELTA_DYNAMO_TABLE_NAME", - format!("delta_log_it_{}", rand::thread_rng().gen::()), - ); - //s3_cli::create_bucket(format!("s3://{}", name.as_ref()))?; - set_env_if_not_set( - "DYNAMO_LOCK_PARTITION_KEY_VALUE", - format!("s3://{}", name.as_ref()), - ); - //s3_cli::create_lock_table()?; - Ok(()) - } - Self::Google => { - gs_cli::create_bucket(name)?; - Ok(()) - } - Self::Local => Ok(()), - Self::Hdfs => { - hdfs_cli::create_dir(name)?; - Ok(()) - } - } - } -} -*/ /// Reference tables from the test data folder pub enum TestTables {