Skip to content

Commit

Permalink
refactor: always use Metadata action
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Jan 7, 2024
1 parent cfa647a commit 5e86161
Show file tree
Hide file tree
Showing 15 changed files with 66 additions and 261 deletions.
14 changes: 6 additions & 8 deletions crates/deltalake-core/src/kernel/actions/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,16 @@ pub struct Metadata {
impl Metadata {
/// Create a new metadata action
pub fn new(
id: impl Into<String>,
format: Format,
schema_string: impl Into<String>,
schema: StructType,
partition_columns: impl IntoIterator<Item = impl Into<String>>,
configuration: Option<HashMap<String, Option<String>>>,
configuration: HashMap<String, Option<String>>,
) -> Self {
Self {
id: id.into(),
format,
schema_string: schema_string.into(),
id: uuid::Uuid::new_v4().to_string(),
format: Default::default(),
schema_string: serde_json::to_string(&schema).unwrap(),
partition_columns: partition_columns.into_iter().map(|c| c.into()).collect(),
configuration: configuration.unwrap_or_default(),
configuration,
name: None,
description: None,
created_time: None,
Expand Down
18 changes: 18 additions & 0 deletions crates/deltalake-core/src/kernel/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,15 @@ impl FromIterator<StructField> for StructType {
}
}

impl<'a> FromIterator<&'a StructField> for StructType {
fn from_iter<T: IntoIterator<Item = &'a StructField>>(iter: T) -> Self {
Self {
type_name: "struct".into(),
fields: iter.into_iter().cloned().collect(),
}
}
}

impl<const N: usize> From<[StructField; N]> for StructType {
fn from(value: [StructField; N]) -> Self {
Self {
Expand All @@ -343,6 +352,15 @@ impl<const N: usize> From<[StructField; N]> for StructType {
}
}

impl<'a, const N: usize> From<[&'a StructField; N]> for StructType {
fn from(value: [&'a StructField; N]) -> Self {
Self {
type_name: "struct".into(),
fields: value.into_iter().cloned().collect(),
}
}
}

impl<'a> IntoIterator for &'a StructType {
type Item = &'a StructField;
type IntoIter = std::slice::Iter<'a, StructField>;
Expand Down
10 changes: 5 additions & 5 deletions crates/deltalake-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,9 +556,9 @@ mod tests {
#[tokio::test]
async fn test_table_history() {
let path = "../deltalake-test/tests/data/simple_table_with_checkpoint";
let mut latest_table = crate::open_table(path).await.unwrap();
let latest_table = crate::open_table(path).await.unwrap();

let mut table = crate::open_table_with_version(path, 1).await.unwrap();
let table = crate::open_table_with_version(path, 1).await.unwrap();

let history1 = table.history(None).await.expect("Cannot get table history");
let history2 = latest_table
Expand Down Expand Up @@ -615,7 +615,7 @@ mod tests {
#[tokio::test]
async fn test_read_vacuumed_log_history() {
let path = "../deltalake-test/tests/data/checkpoints_vacuumed";
let mut table = crate::open_table(path).await.unwrap();
let table = crate::open_table(path).await.unwrap();

// load history for table version with available log file
let history = table
Expand Down Expand Up @@ -674,9 +674,9 @@ mod tests {
#[tokio::test()]
async fn test_version_zero_table_load() {
let path = "../deltalake-test/tests/data/COVID-19_NYT";
let mut latest_table: DeltaTable = crate::open_table(path).await.unwrap();
let latest_table: DeltaTable = crate::open_table(path).await.unwrap();

let mut version_0_table = crate::open_table_with_version(path, 0).await.unwrap();
let version_0_table = crate::open_table_with_version(path, 0).await.unwrap();

let version_0_history = version_0_table
.history(None)
Expand Down
20 changes: 10 additions & 10 deletions crates/deltalake-core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::logstore::{LogStore, LogStoreRef};
use crate::protocol::{DeltaOperation, SaveMode};
use crate::table::builder::ensure_table_uri;
use crate::table::config::DeltaConfigKey;
use crate::table::DeltaTableMetaData;
use crate::{DeltaTable, DeltaTableBuilder};

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -251,14 +250,18 @@ impl CreateBuilder {
reader_features: None,
});

let metadata = DeltaTableMetaData::new(
self.name,
self.comment,
None,
let mut metadata = Metadata::new(
StructType::new(self.columns),
self.partition_columns.unwrap_or_default(),
self.configuration,
);
)
.with_created_time(chrono::Utc::now().timestamp_millis());
if let Some(name) = self.name {
metadata = metadata.with_name(name);
}
if let Some(comment) = self.comment {
metadata = metadata.with_description(comment);
}

let operation = DeltaOperation::Create {
mode: self.mode.clone(),
Expand All @@ -267,10 +270,7 @@ impl CreateBuilder {
protocol: protocol.clone(),
};

let mut actions = vec![
Action::Protocol(protocol),
Action::Metadata(Metadata::try_from(metadata)?),
];
let mut actions = vec![Action::Protocol(protocol), Action::Metadata(metadata)];
actions.extend(
self.actions
.into_iter()
Expand Down
4 changes: 2 additions & 2 deletions crates/deltalake-core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ mod tests {
assert_eq!(table.version(), 1);
assert_eq!(table.get_file_uris().count(), 1);

let (mut table, metrics) = DeltaOps(table).delete().await.unwrap();
let (table, metrics) = DeltaOps(table).delete().await.unwrap();

assert_eq!(table.version(), 2);
assert_eq!(table.get_file_uris().count(), 0);
Expand Down Expand Up @@ -488,7 +488,7 @@ mod tests {
assert_eq!(table.version(), 2);
assert_eq!(table.get_file_uris().count(), 2);

let (mut table, metrics) = DeltaOps(table)
let (table, metrics) = DeltaOps(table)
.delete()
.with_predicate(col("value").eq(lit(1)))
.await
Expand Down
27 changes: 12 additions & 15 deletions crates/deltalake-core/src/operations/transaction/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::kernel::{
use crate::operations::transaction::PROTOCOL;
use crate::protocol::{DeltaOperation, SaveMode};
use crate::table::state::DeltaTableState;
use crate::table::DeltaTableMetaData;
use crate::{DeltaTable, DeltaTableBuilder};

pub fn create_add_action(
Expand Down Expand Up @@ -80,15 +79,11 @@ pub fn create_metadata_action(
true,
),
]);
let metadata = DeltaTableMetaData::new(
None,
None,
None,
Action::Metadata(Metadata::new(
table_schema,
parttiton_columns.unwrap_or_default(),
configuration.unwrap_or_default(),
);
Action::Metadata(Metadata::try_from(metadata).unwrap())
))
}

pub fn init_table_actions(configuration: Option<HashMap<String, Option<String>>>) -> Vec<Action> {
Expand Down Expand Up @@ -153,14 +148,16 @@ pub async fn create_initialized_table(
writer_features: None,
reader_features: None,
},
metadata: DeltaTableMetaData::new(
None,
None,
None,
table_schema,
partition_cols.to_vec(),
configuration.unwrap_or_default(),
),
metadata: Metadata {
id: uuid::Uuid::new_v4().to_string(),
name: None,
description: None,
format: Default::default(),
schema_string: serde_json::to_string(&table_schema).unwrap(),
partition_columns: partition_cols.to_vec(),
configuration: configuration.unwrap_or_default(),
created_time: Some(chrono::Utc::now().timestamp_millis()),
},
};
let actions = init_table_actions(None);
let prepared_commit = prepare_commit(
Expand Down
4 changes: 2 additions & 2 deletions crates/deltalake-core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ mod tests {
assert_eq!(table.version(), 1);
assert_eq!(table.get_file_uris().count(), 1);

let (mut table, metrics) = DeltaOps(table)
let (table, metrics) = DeltaOps(table)
.update()
.with_predicate(col("modified").eq(lit("2021-02-03")))
.with_update("modified", lit("2023-05-14"))
Expand Down Expand Up @@ -857,7 +857,7 @@ mod tests {

// Validate order operators do not include nulls
let table = prepare_values_table().await;
let (mut table, metrics) = DeltaOps(table)
let (table, metrics) = DeltaOps(table)
.update()
.with_predicate(col("value").gt(lit(2)).or(col("value").lt(lit(2))))
.with_update("value", lit(10))
Expand Down
22 changes: 1 addition & 21 deletions crates/deltalake-core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove};
use crate::logstore::LogStore;
use crate::table::CheckPoint;
use crate::table::DeltaTableMetaData;

/// Error returned when an invalid Delta log action is encountered.
#[allow(missing_docs)]
Expand Down Expand Up @@ -324,25 +323,6 @@ impl PartialEq for Remove {
}
}

impl TryFrom<DeltaTableMetaData> for Metadata {
type Error = ProtocolError;

fn try_from(metadata: DeltaTableMetaData) -> Result<Self, Self::Error> {
let schema_string = serde_json::to_string(&metadata.schema)
.map_err(|source| ProtocolError::SerializeOperation { source })?;
Ok(Self {
id: metadata.id,
name: metadata.name,
description: metadata.description,
format: metadata.format,
schema_string,
partition_columns: metadata.partition_columns,
created_time: metadata.created_time,
configuration: metadata.configuration,
})
}
}

#[allow(clippy::large_enum_variant)]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -372,7 +352,7 @@ pub enum DeltaOperation {
/// The min reader and writer protocol versions of the table
protocol: Protocol,
/// Metadata associated with the new table
metadata: DeltaTableMetaData,
metadata: Metadata,
},

/// Represents a Delta `Write` operation.
Expand Down
7 changes: 3 additions & 4 deletions crates/deltalake-core/src/table/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,13 +510,12 @@ fn parse_int(value: &str) -> Result<i64, DeltaConfigError> {
#[cfg(test)]
mod tests {
use super::*;
use crate::kernel::StructType;
use crate::table::DeltaTableMetaData;
use crate::kernel::{Metadata, StructType};
use std::collections::HashMap;

fn dummy_metadata() -> DeltaTableMetaData {
fn dummy_metadata() -> Metadata {
let schema = StructType::new(Vec::new());
DeltaTableMetaData::new(None, None, None, schema, Vec::new(), HashMap::new())
Metadata::new(schema, Vec::<String>::new(), HashMap::new())
}

#[test]
Expand Down
Loading

0 comments on commit 5e86161

Please sign in to comment.