diff --git a/crates/aws/tests/integration_s3_dynamodb.rs b/crates/aws/tests/integration_s3_dynamodb.rs index b933735b4f..28c99664f4 100644 --- a/crates/aws/tests/integration_s3_dynamodb.rs +++ b/crates/aws/tests/integration_s3_dynamodb.rs @@ -10,7 +10,8 @@ use deltalake_aws::logstore::{RepairLogEntryResult, S3DynamoDbLogStore}; use deltalake_aws::storage::S3StorageOptions; use deltalake_aws::{CommitEntry, DynamoDbConfig, DynamoDbLockClient}; use deltalake_core::kernel::{Action, Add, DataType, PrimitiveType, StructField, StructType}; -use deltalake_core::logstore::{CommitOrBytes, LogStore}; +use deltalake_core::logstore::{logstore_for, CommitOrBytes, LogStore}; +use deltalake_core::operations::create::CreateBuilder; use deltalake_core::operations::transaction::CommitBuilder; use deltalake_core::protocol::{DeltaOperation, SaveMode}; use deltalake_core::storage::commit_uri_from_version; @@ -23,6 +24,10 @@ use object_store::path::Path; use serde_json::Value; use serial_test::serial; +use maplit::hashmap; +use object_store::{PutOptions, PutPayload}; +use url::Url; + mod common; use common::*; @@ -79,6 +84,44 @@ fn client_configs_via_env_variables() -> TestResult<()> { Ok(()) } +#[tokio::test] +#[serial] +async fn test_create_s3_table() -> TestResult<()> { + let context = IntegrationContext::new(Box::new(S3Integration::default()))?; + let _client = make_client()?; + let table_name = format!("{}_{}", "create_test", uuid::Uuid::new_v4()); + let table_uri = context.uri_for_table(TestTables::Custom(table_name.to_owned())); + + let schema = StructType::new(vec![StructField::new( + "id".to_string(), + DataType::Primitive(PrimitiveType::Integer), + true, + )]); + let storage_options: HashMap = hashmap! { + "AWS_ALLOW_HTTP".into() => "true".into(), + "AWS_ENDPOINT_URL".into() => "http://localhost:4566".into(), + }; + let log_store = logstore_for(Url::parse(&table_uri)?, storage_options, None)?; + + let payload = PutPayload::from_static(b"test-drivin"); + let _put = log_store + .object_store() + .put_opts( + &Path::from("_delta_log/_commit_failed.tmp"), + payload, + PutOptions::default(), + ) + .await?; + + let _created = CreateBuilder::new() + .with_log_store(log_store) + .with_partition_columns(vec!["id"]) + .with_columns(schema.fields().cloned()) + .with_save_mode(SaveMode::Ignore) + .await?; + Ok(()) +} + #[tokio::test] #[serial] async fn get_missing_item() -> TestResult<()> { diff --git a/crates/core/src/kernel/snapshot/log_segment.rs b/crates/core/src/kernel/snapshot/log_segment.rs index 17d29a7694..596304e003 100644 --- a/crates/core/src/kernel/snapshot/log_segment.rs +++ b/crates/core/src/kernel/snapshot/log_segment.rs @@ -37,7 +37,7 @@ lazy_static! { /// specifically, this trait adds the ability to recognize valid log files and /// parse the version number from a log file path // TODO handle compaction files -pub(super) trait PathExt { +pub(crate) trait PathExt { fn child(&self, path: impl AsRef) -> DeltaResult; /// Returns the last path segment if not terminated with a "/" fn filename(&self) -> Option<&str>; diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 99a62500d8..0df62c867b 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -44,7 +44,7 @@ use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; pub use self::log_data::*; mod log_data; -mod log_segment; +pub(crate) mod log_segment; pub(crate) mod parse; mod replay; mod serde; diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 51191d77d0..dd82274157 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -15,6 +15,7 @@ use serde::{Deserialize, Serialize}; use tracing::{debug, warn}; use url::Url; +use crate::kernel::log_segment::PathExt; use crate::kernel::Action; use crate::operations::transaction::TransactionError; use crate::protocol::{get_last_checkpoint, ProtocolError}; @@ -238,7 +239,7 @@ pub trait LogStore: Sync + Send { let mut stream = object_store.list(Some(self.log_path())); if let Some(res) = stream.next().await { match res { - Ok(_) => Ok(true), + Ok(meta) => Ok(meta.location.is_commit_file()), Err(ObjectStoreError::NotFound { .. }) => Ok(false), Err(err) => Err(err)?, } @@ -328,7 +329,7 @@ pub async fn get_actions( // TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders impl std::fmt::Debug for dyn LogStore + '_ { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "LogStore({})", self.root_uri()) + write!(f, "{}({})", self.name(), self.root_uri()) } } @@ -509,6 +510,36 @@ mod tests { let store = logstore_for(location, HashMap::default(), Some(IORuntime::default())); assert!(store.is_ok()); } + + #[tokio::test] + async fn test_is_location_a_table() { + use object_store::path::Path; + use object_store::{PutOptions, PutPayload}; + let location = Url::parse("memory://table").unwrap(); + let store = + logstore_for(location, HashMap::default(), None).expect("Failed to get logstore"); + assert!(!store + .is_delta_table_location() + .await + .expect("Failed to look at table")); + + // Let's put a failed commit into the directory and then see if it's still considered a + // delta table (it shouldn't be). + let payload = PutPayload::from_static(b"test-drivin"); + let _put = store + .object_store() + .put_opts( + &Path::from("_delta_log/_commit_failed.tmp"), + payload, + PutOptions::default(), + ) + .await + .expect("Failed to put"); + assert!(!store + .is_delta_table_location() + .await + .expect("Failed to look at table")); + } } #[cfg(feature = "datafusion")] diff --git a/crates/core/src/operations/create.rs b/crates/core/src/operations/create.rs index 5e26d7f1fb..ad0413722e 100644 --- a/crates/core/src/operations/create.rs +++ b/crates/core/src/operations/create.rs @@ -8,6 +8,7 @@ use delta_kernel::schema::MetadataValue; use futures::future::BoxFuture; use maplit::hashset; use serde_json::Value; +use tracing::log::*; use super::transaction::{CommitBuilder, TableReference, PROTOCOL}; use crate::errors::{DeltaResult, DeltaTableError}; diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index f712db435d..0ad1435d1c 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -49,7 +49,7 @@ fn io_rt(config: Option<&RuntimeConfig>) -> &Runtime { } else { RuntimeBuilder::new_current_thread() }; - let mut builder = builder.worker_threads(config.worker_threads); + let builder = builder.worker_threads(config.worker_threads); let mut builder = if config.enable_io && config.enable_time { builder.enable_all() } else if !config.enable_io && config.enable_time { diff --git a/python/Cargo.toml b/python/Cargo.toml index 8ce542fd2e..0c3c62232a 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-python" -version = "0.19.3" +version = "0.20.0" authors = ["Qingping Hou ", "Will Jones "] homepage = "https://github.com/delta-io/delta-rs" license = "Apache-2.0"