Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure log store correctly identifies existing delta tables #2890

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion crates/aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use deltalake_aws::logstore::{RepairLogEntryResult, S3DynamoDbLogStore};
use deltalake_aws::storage::S3StorageOptions;
use deltalake_aws::{CommitEntry, DynamoDbConfig, DynamoDbLockClient};
use deltalake_core::kernel::{Action, Add, DataType, PrimitiveType, StructField, StructType};
use deltalake_core::logstore::{CommitOrBytes, LogStore};
use deltalake_core::logstore::{logstore_for, CommitOrBytes, LogStore};
use deltalake_core::operations::create::CreateBuilder;
use deltalake_core::operations::transaction::CommitBuilder;
use deltalake_core::protocol::{DeltaOperation, SaveMode};
use deltalake_core::storage::commit_uri_from_version;
Expand All @@ -23,6 +24,10 @@ use object_store::path::Path;
use serde_json::Value;
use serial_test::serial;

use maplit::hashmap;
use object_store::{PutOptions, PutPayload};
use url::Url;

mod common;
use common::*;

Expand Down Expand Up @@ -79,6 +84,44 @@ fn client_configs_via_env_variables() -> TestResult<()> {
Ok(())
}

#[tokio::test]
#[serial]
async fn test_create_s3_table() -> TestResult<()> {
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let _client = make_client()?;
let table_name = format!("{}_{}", "create_test", uuid::Uuid::new_v4());
let table_uri = context.uri_for_table(TestTables::Custom(table_name.to_owned()));

let schema = StructType::new(vec![StructField::new(
"id".to_string(),
DataType::Primitive(PrimitiveType::Integer),
true,
)]);
let storage_options: HashMap<String, String> = hashmap! {
"AWS_ALLOW_HTTP".into() => "true".into(),
"AWS_ENDPOINT_URL".into() => "http://localhost:4566".into(),
};
let log_store = logstore_for(Url::parse(&table_uri)?, storage_options, None)?;

let payload = PutPayload::from_static(b"test-drivin");
let _put = log_store
.object_store()
.put_opts(
&Path::from("_delta_log/_commit_failed.tmp"),
payload,
PutOptions::default(),
)
.await?;

let _created = CreateBuilder::new()
.with_log_store(log_store)
.with_partition_columns(vec!["id"])
.with_columns(schema.fields().cloned())
.with_save_mode(SaveMode::Ignore)
.await?;
Ok(())
}

#[tokio::test]
#[serial]
async fn get_missing_item() -> TestResult<()> {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ lazy_static! {
/// specifically, this trait adds the ability to recognize valid log files and
/// parse the version number from a log file path
// TODO handle compaction files
pub(super) trait PathExt {
pub(crate) trait PathExt {
fn child(&self, path: impl AsRef<str>) -> DeltaResult<Path>;
/// Returns the last path segment if not terminated with a "/"
fn filename(&self) -> Option<&str>;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};
pub use self::log_data::*;

mod log_data;
mod log_segment;
pub(crate) mod log_segment;
pub(crate) mod parse;
mod replay;
mod serde;
Expand Down
35 changes: 33 additions & 2 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
use url::Url;

use crate::kernel::log_segment::PathExt;
use crate::kernel::Action;
use crate::operations::transaction::TransactionError;
use crate::protocol::{get_last_checkpoint, ProtocolError};
Expand Down Expand Up @@ -238,7 +239,7 @@ pub trait LogStore: Sync + Send {
let mut stream = object_store.list(Some(self.log_path()));
if let Some(res) = stream.next().await {
match res {
Ok(_) => Ok(true),
Ok(meta) => Ok(meta.location.is_commit_file()),
Err(ObjectStoreError::NotFound { .. }) => Ok(false),
Err(err) => Err(err)?,
}
Expand Down Expand Up @@ -328,7 +329,7 @@ pub async fn get_actions(
// TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders
impl std::fmt::Debug for dyn LogStore + '_ {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "LogStore({})", self.root_uri())
write!(f, "{}({})", self.name(), self.root_uri())
}
}

Expand Down Expand Up @@ -509,6 +510,36 @@ mod tests {
let store = logstore_for(location, HashMap::default(), Some(IORuntime::default()));
assert!(store.is_ok());
}

#[tokio::test]
async fn test_is_location_a_table() {
use object_store::path::Path;
use object_store::{PutOptions, PutPayload};
let location = Url::parse("memory://table").unwrap();
let store =
logstore_for(location, HashMap::default(), None).expect("Failed to get logstore");
assert!(!store
.is_delta_table_location()
.await
.expect("Failed to look at table"));

// Let's put a failed commit into the directory and then see if it's still considered a
// delta table (it shouldn't be).
let payload = PutPayload::from_static(b"test-drivin");
let _put = store
.object_store()
.put_opts(
&Path::from("_delta_log/_commit_failed.tmp"),
payload,
PutOptions::default(),
)
.await
.expect("Failed to put");
assert!(!store
.is_delta_table_location()
.await
.expect("Failed to look at table"));
}
}

#[cfg(feature = "datafusion")]
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use delta_kernel::schema::MetadataValue;
use futures::future::BoxFuture;
use maplit::hashset;
use serde_json::Value;
use tracing::log::*;

use super::transaction::{CommitBuilder, TableReference, PROTOCOL};
use crate::errors::{DeltaResult, DeltaTableError};
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn io_rt(config: Option<&RuntimeConfig>) -> &Runtime {
} else {
RuntimeBuilder::new_current_thread()
};
let mut builder = builder.worker_threads(config.worker_threads);
let builder = builder.worker_threads(config.worker_threads);
let mut builder = if config.enable_io && config.enable_time {
builder.enable_all()
} else if !config.enable_io && config.enable_time {
Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-python"
version = "0.19.3"
version = "0.20.0"
authors = ["Qingping Hou <[email protected]>", "Will Jones <[email protected]>"]
homepage = "https://github.com/delta-io/delta-rs"
license = "Apache-2.0"
Expand Down
Loading