Skip to content

Commit

Permalink
Merge branch 'main' into chore/bump_kernel
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored Sep 21, 2024
2 parents a1a33ce + c123129 commit 68e940b
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 6 deletions.
45 changes: 44 additions & 1 deletion crates/aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use deltalake_aws::logstore::{RepairLogEntryResult, S3DynamoDbLogStore};
use deltalake_aws::storage::S3StorageOptions;
use deltalake_aws::{CommitEntry, DynamoDbConfig, DynamoDbLockClient};
use deltalake_core::kernel::{Action, Add, DataType, PrimitiveType, StructField, StructType};
use deltalake_core::logstore::{CommitOrBytes, LogStore};
use deltalake_core::logstore::{logstore_for, CommitOrBytes, LogStore};
use deltalake_core::operations::create::CreateBuilder;
use deltalake_core::operations::transaction::CommitBuilder;
use deltalake_core::protocol::{DeltaOperation, SaveMode};
use deltalake_core::storage::commit_uri_from_version;
Expand All @@ -23,6 +24,10 @@ use object_store::path::Path;
use serde_json::Value;
use serial_test::serial;

use maplit::hashmap;
use object_store::{PutOptions, PutPayload};
use url::Url;

mod common;
use common::*;

Expand Down Expand Up @@ -79,6 +84,44 @@ fn client_configs_via_env_variables() -> TestResult<()> {
Ok(())
}

#[tokio::test]
#[serial]
async fn test_create_s3_table() -> TestResult<()> {
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let _client = make_client()?;
let table_name = format!("{}_{}", "create_test", uuid::Uuid::new_v4());
let table_uri = context.uri_for_table(TestTables::Custom(table_name.to_owned()));

let schema = StructType::new(vec![StructField::new(
"id".to_string(),
DataType::Primitive(PrimitiveType::Integer),
true,
)]);
let storage_options: HashMap<String, String> = hashmap! {
"AWS_ALLOW_HTTP".into() => "true".into(),
"AWS_ENDPOINT_URL".into() => "http://localhost:4566".into(),
};
let log_store = logstore_for(Url::parse(&table_uri)?, storage_options, None)?;

let payload = PutPayload::from_static(b"test-drivin");
let _put = log_store
.object_store()
.put_opts(
&Path::from("_delta_log/_commit_failed.tmp"),
payload,
PutOptions::default(),
)
.await?;

let _created = CreateBuilder::new()
.with_log_store(log_store)
.with_partition_columns(vec!["id"])
.with_columns(schema.fields().cloned())
.with_save_mode(SaveMode::Ignore)
.await?;
Ok(())
}

#[tokio::test]
#[serial]
async fn get_missing_item() -> TestResult<()> {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ lazy_static! {
/// specifically, this trait adds the ability to recognize valid log files and
/// parse the version number from a log file path
// TODO handle compaction files
pub(super) trait PathExt {
pub(crate) trait PathExt {
fn child(&self, path: impl AsRef<str>) -> DeltaResult<Path>;
/// Returns the last path segment if not terminated with a "/"
fn filename(&self) -> Option<&str>;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};
pub use self::log_data::*;

mod log_data;
mod log_segment;
pub(crate) mod log_segment;
pub(crate) mod parse;
mod replay;
mod serde;
Expand Down
35 changes: 33 additions & 2 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
use url::Url;

use crate::kernel::log_segment::PathExt;
use crate::kernel::Action;
use crate::operations::transaction::TransactionError;
use crate::protocol::{get_last_checkpoint, ProtocolError};
Expand Down Expand Up @@ -238,7 +239,7 @@ pub trait LogStore: Sync + Send {
let mut stream = object_store.list(Some(self.log_path()));
if let Some(res) = stream.next().await {
match res {
Ok(_) => Ok(true),
Ok(meta) => Ok(meta.location.is_commit_file()),
Err(ObjectStoreError::NotFound { .. }) => Ok(false),
Err(err) => Err(err)?,
}
Expand Down Expand Up @@ -328,7 +329,7 @@ pub async fn get_actions(
// TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders
impl std::fmt::Debug for dyn LogStore + '_ {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "LogStore({})", self.root_uri())
write!(f, "{}({})", self.name(), self.root_uri())
}
}

Expand Down Expand Up @@ -509,6 +510,36 @@ mod tests {
let store = logstore_for(location, HashMap::default(), Some(IORuntime::default()));
assert!(store.is_ok());
}

#[tokio::test]
async fn test_is_location_a_table() {
use object_store::path::Path;
use object_store::{PutOptions, PutPayload};
let location = Url::parse("memory://table").unwrap();
let store =
logstore_for(location, HashMap::default(), None).expect("Failed to get logstore");
assert!(!store
.is_delta_table_location()
.await
.expect("Failed to look at table"));

// Let's put a failed commit into the directory and then see if it's still considered a
// delta table (it shouldn't be).
let payload = PutPayload::from_static(b"test-drivin");
let _put = store
.object_store()
.put_opts(
&Path::from("_delta_log/_commit_failed.tmp"),
payload,
PutOptions::default(),
)
.await
.expect("Failed to put");
assert!(!store
.is_delta_table_location()
.await
.expect("Failed to look at table"));
}
}

#[cfg(feature = "datafusion")]
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use delta_kernel::schema::MetadataValue;
use futures::future::BoxFuture;
use maplit::hashset;
use serde_json::Value;
use tracing::log::*;

use super::transaction::{CommitBuilder, TableReference, PROTOCOL};
use crate::errors::{DeltaResult, DeltaTableError};
Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-python"
version = "0.19.3"
version = "0.20.0"
authors = ["Qingping Hou <[email protected]>", "Will Jones <[email protected]>"]
homepage = "https://github.com/delta-io/delta-rs"
license = "Apache-2.0"
Expand Down

0 comments on commit 68e940b

Please sign in to comment.