Skip to content

Commit

Permalink
Fixing integration_concurrent_writes
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed Dec 29, 2023
1 parent 0dba143 commit cce5fdd
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 133 deletions.
50 changes: 31 additions & 19 deletions crates/deltalake-core/src/table/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,19 @@ impl DeltaTableBuilder {
}
}

fn create_filetree_from_path(path: &PathBuf) -> DeltaResult<()> {
if !path.exists() {
std::fs::create_dir_all(path).map_err(|err| {
let msg = format!(
"Could not create local directory: {:?}\nError: {:?}",
path, err
);
DeltaTableError::InvalidTableLocation(msg)
})?;
}
Ok(())
}

/// Attempt to create a Url from given table location.
///
/// The location could be:
Expand All @@ -337,33 +350,30 @@ impl DeltaTableBuilder {
pub fn ensure_table_uri(table_uri: impl AsRef<str>) -> DeltaResult<Url> {
let table_uri = table_uri.as_ref();

debug!("ensure_table_uri {table_uri}");
let mut url = match Url::parse(table_uri) {
Ok(url) => Ok(url),
Ok(url) => {
if url.scheme() == "file" {
create_filetree_from_path(
&url.to_file_path()
.expect("Failed to convert a file:// URL to a file path"),
)?;
}
Ok(url)
}
Err(_) => {
debug!("Creating a local table_uri: {}", table_uri);
let path = PathBuf::from(table_uri);
if !path.exists() {
std::fs::create_dir_all(&path).map_err(|err| {
let msg = format!(
"Could not create local directory: {}\nError: {:?}",
table_uri, err
);
DeltaTableError::InvalidTableLocation(msg)
})?;
}

let path = std::fs::canonicalize(path).map_err(|err| {
let msg = format!("Invalid table location: {}\nError: {:?}", table_uri, err);
let _ = create_filetree_from_path(&path)?;
let path = std::fs::canonicalize(path.clone()).map_err(|err| {
let msg = format!("Invalid table location: {:?}\nError: {:?}", path, err);
DeltaTableError::InvalidTableLocation(msg)
})?;

debug!("going to create a directory URL: {:?}", path);

Url::from_directory_path(path).map_err(|_| {
Url::from_directory_path(path.clone()).map_err(|_| {
let msg = format!(
"Could not construct a URL from canonicalized path: {}.\n\
"Could not construct a URL from canonicalized path: {:?}.\n\
Something must be very wrong with the table path.",
table_uri
path,
);
DeltaTableError::InvalidTableLocation(msg)
})
Expand Down Expand Up @@ -401,6 +411,8 @@ mod tests {
assert!(uri.is_ok());
let _uri = ensure_table_uri("./nonexistent");
assert!(uri.is_ok());
let _uri = ensure_table_uri("file:///tmp/nonexistent/some/path");
assert!(uri.is_ok());
let uri = ensure_table_uri("s3://container/path");
assert!(uri.is_ok());

Expand Down
37 changes: 7 additions & 30 deletions crates/deltalake-core/tests/integration_concurrent_writes.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#![cfg(feature = "integration_test")]

use log::*;

use deltalake_core::kernel::{Action, Add, DataType, PrimitiveType, StructField, StructType};
use deltalake_core::operations::transaction::commit;
use deltalake_core::operations::DeltaOps;
use deltalake_core::protocol::{DeltaOperation, SaveMode};
use deltalake_core::test_utils::{IntegrationContext, StorageIntegration, TestResult, TestTables};
use deltalake_core::{DeltaTable, DeltaTableBuilder};
use deltalake_test::utils::*;
use serial_test::serial;
use std::collections::HashMap;
use std::future::Future;
Expand All @@ -15,38 +17,13 @@ use std::time::Duration;
#[tokio::test]
#[serial]
async fn test_concurrent_writes_local() -> TestResult {
test_concurrent_writes(StorageIntegration::Local).await?;
Ok(())
}

#[cfg(feature = "s3")]
#[tokio::test]
#[serial]
async fn concurrent_writes_s3() -> TestResult {
test_concurrent_writes(StorageIntegration::Amazon).await?;
Ok(())
}

#[cfg(feature = "azure")]
#[tokio::test]
#[serial]
async fn test_concurrent_writes_azure() -> TestResult {
test_concurrent_writes(StorageIntegration::Microsoft).await?;
Ok(())
}

// tracked via https://github.com/datafusion-contrib/datafusion-objectstore-hdfs/issues/13
#[ignore]
#[cfg(feature = "hdfs")]
#[tokio::test]
#[serial]
async fn test_concurrent_writes_hdfs() -> TestResult {
test_concurrent_writes(StorageIntegration::Hdfs).await?;
let storage = Box::new(LocalStorageIntegration::default());
let context = IntegrationContext::new(storage)?;
test_concurrent_writes(&context).await?;
Ok(())
}

async fn test_concurrent_writes(integration: StorageIntegration) -> TestResult {
let context = IntegrationContext::new(integration)?;
async fn test_concurrent_writes(context: &IntegrationContext) -> TestResult {
let (_table, table_uri) = prepare_table(&context).await?;
run_test(|name| Worker::new(&table_uri, name)).await;
Ok(())
Expand Down
1 change: 0 additions & 1 deletion crates/deltalake-core/tests/integration_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use deltalake_core::{DeltaTableBuilder, ObjectStore};
use deltalake_test::utils::*;
use log::*;
use object_store::path::Path;
use serial_test::serial;

Expand Down
83 changes: 0 additions & 83 deletions crates/deltalake-test/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,89 +146,6 @@ impl IntegrationContext {
}
}
}
/*
impl Drop for IntegrationContext {
fn drop(&mut self) {
match self.integration {
StorageIntegration::Amazon => {
//s3_cli::delete_bucket(self.root_uri()).unwrap();
//s3_cli::delete_lock_table().unwrap();
}
StorageIntegration::Microsoft => {
az_cli::delete_container(&self.bucket).unwrap();
}
StorageIntegration::Google => {
gs_cli::delete_bucket(&self.bucket).unwrap();
}
StorageIntegration::Onelake => (),
StorageIntegration::OnelakeAbfs => (),
StorageIntegration::Local => (),
StorageIntegration::Hdfs => {
hdfs_cli::delete_dir(&self.bucket).unwrap();
}
};
self.restore_env();
}
}
/// Kinds of storage integration
pub enum StorageIntegration {
Amazon,
Microsoft,
Onelake,
Google,
Local,
Hdfs,
OnelakeAbfs,
}
impl StorageIntegration {
fn prepare_env(&self) {
match self {
Self::Microsoft => az_cli::prepare_env(),
Self::Onelake => onelake_cli::prepare_env(),
Self::Amazon => (), //s3_cli::prepare_env(),
Self::Google => gs_cli::prepare_env(),
Self::OnelakeAbfs => onelake_cli::prepare_env(),
Self::Local => (),
Self::Hdfs => (),
}
}
fn create_bucket(&self, name: impl AsRef<str>) -> std::io::Result<()> {
match self {
Self::Microsoft => {
az_cli::create_container(name)?;
Ok(())
}
Self::Onelake => Ok(()),
Self::OnelakeAbfs => Ok(()),
Self::Amazon => {
std::env::set_var(
"DELTA_DYNAMO_TABLE_NAME",
format!("delta_log_it_{}", rand::thread_rng().gen::<u16>()),
);
//s3_cli::create_bucket(format!("s3://{}", name.as_ref()))?;
set_env_if_not_set(
"DYNAMO_LOCK_PARTITION_KEY_VALUE",
format!("s3://{}", name.as_ref()),
);
//s3_cli::create_lock_table()?;
Ok(())
}
Self::Google => {
gs_cli::create_bucket(name)?;
Ok(())
}
Self::Local => Ok(()),
Self::Hdfs => {
hdfs_cli::create_dir(name)?;
Ok(())
}
}
}
}
*/

/// Reference tables from the test data folder
pub enum TestTables {
Expand Down

0 comments on commit cce5fdd

Please sign in to comment.