From 6a99a5e33b20cf64cef50a31e7970bce7b8a461e Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sat, 11 Nov 2023 09:29:55 +0100 Subject: [PATCH] feat: create known arrow schemas form kernel types --- .../src/kernel/actions/arrow.rs | 18 +- .../src/kernel/actions/arrow/checkpoint.rs | 17 +- .../src/kernel/actions/arrow/schemas.rs | 274 +++--------------- .../deltalake-core/src/kernel/actions/mod.rs | 4 + .../src/kernel/actions/schemas.rs | 150 +++++----- .../src/kernel/actions/types.rs | 6 +- .../deltalake-core/src/table/state_arrow.rs | 16 + 7 files changed, 173 insertions(+), 312 deletions(-) diff --git a/crates/deltalake-core/src/kernel/actions/arrow.rs b/crates/deltalake-core/src/kernel/actions/arrow.rs index e88d64751e..f566bfc699 100644 --- a/crates/deltalake-core/src/kernel/actions/arrow.rs +++ b/crates/deltalake-core/src/kernel/actions/arrow.rs @@ -12,6 +12,9 @@ pub mod checkpoint; pub mod schemas; pub mod snapshot; +const MAP_KEYS_NAME: &str = "keys"; +const MAP_VALUES_NAME: &str = "values"; + impl TryFrom<&StructType> for ArrowSchema { type Error = ArrowError; @@ -68,9 +71,9 @@ impl TryFrom<&MapType> for ArrowField { "entries", ArrowDataType::Struct( vec![ - ArrowField::new("key", ArrowDataType::try_from(a.key_type())?, false), + ArrowField::new(MAP_KEYS_NAME, ArrowDataType::try_from(a.key_type())?, false), ArrowField::new( - "value", + MAP_VALUES_NAME, ArrowDataType::try_from(a.value_type())?, a.value_contains_null(), ), @@ -127,7 +130,10 @@ impl TryFrom<&DataType> for ArrowDataType { } PrimitiveType::Timestamp => { // Issue: https://github.com/delta-io/delta/issues/643 - Ok(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)) + Ok(ArrowDataType::Timestamp( + TimeUnit::Microsecond, + Some("UTC".into()), + )) } } } @@ -147,12 +153,12 @@ impl TryFrom<&DataType> for ArrowDataType { ArrowDataType::Struct( vec![ ArrowField::new( - "keys", + MAP_KEYS_NAME, >::try_from(m.key_type())?, false, ), ArrowField::new( - "values", + MAP_VALUES_NAME, >::try_from(m.value_type())?, m.value_contains_null(), ), @@ -784,7 +790,7 @@ mod tests { let timestamp_field = DataType::Primitive(PrimitiveType::Timestamp); assert_eq!( >::try_from(×tamp_field).unwrap(), - ArrowDataType::Timestamp(TimeUnit::Microsecond, None) + ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())) ); } diff --git a/crates/deltalake-core/src/kernel/actions/arrow/checkpoint.rs b/crates/deltalake-core/src/kernel/actions/arrow/checkpoint.rs index 1f57a480ed..ec7eeabe91 100644 --- a/crates/deltalake-core/src/kernel/actions/arrow/checkpoint.rs +++ b/crates/deltalake-core/src/kernel/actions/arrow/checkpoint.rs @@ -6,7 +6,7 @@ use std::str::FromStr; use arrow_array::{ BooleanArray, Int32Array, Int64Array, ListArray, MapArray, RecordBatch, StringArray, - StructArray, + StructArray, TimestampMicrosecondArray, }; use arrow_json::ReaderBuilder; use arrow_schema::SchemaRef as ArrowSchemaRef; @@ -247,7 +247,8 @@ fn parse_action_protocol(arr: &StructArray) -> DeltaResult DeltaResult + '_>> { let paths = cast_struct_column::(arr, "path")?; let sizes = cast_struct_column::(arr, "size")?; - let modification_times = cast_struct_column::(arr, "modificationTime")?; + let modification_times = + cast_struct_column::(arr, "modificationTime")?; let data_changes = cast_struct_column::(arr, "dataChange")?; let partition_values = cast_struct_column::(arr, "partitionValues")? .iter() @@ -348,7 +349,7 @@ fn parse_actions_remove(arr: &StructArray) -> DeltaResult(arr, "dataChange")?; let deletion_timestamps = - if let Ok(ts) = cast_struct_column::(arr, "deletionTimestamp") { + if let Ok(ts) = cast_struct_column::(arr, "deletionTimestamp") { Either::Left(ts.into_iter()) } else { Either::Right(std::iter::repeat(None).take(data_changes.len())) @@ -515,8 +516,13 @@ fn cast_struct_column(arr: &StructArray, name: impl AsRef) -> D } fn struct_array_to_map(arr: &StructArray) -> DeltaResult>> { - let keys = cast_struct_column::(arr, "key")?; - let values = cast_struct_column::(arr, "value")?; + if arr.fields().len() != 2 { + return Err(Error::UnexpectedColumnType( + "Error parsing map: expected struct array with 2 fields".into(), + )); + } + let keys = cast_struct_column::(arr, arr.fields()[0].name())?; + let values = cast_struct_column::(arr, arr.fields()[1].name())?; Ok(keys .into_iter() .zip(values) @@ -558,6 +564,7 @@ mod tests { }); assert_eq!(action[0], expected) } + #[test] fn test_parse_metadata() { let batch = action_batch(); diff --git a/crates/deltalake-core/src/kernel/actions/arrow/schemas.rs b/crates/deltalake-core/src/kernel/actions/arrow/schemas.rs index ba8cedf522..567beb3ea2 100644 --- a/crates/deltalake-core/src/kernel/actions/arrow/schemas.rs +++ b/crates/deltalake-core/src/kernel/actions/arrow/schemas.rs @@ -1,249 +1,61 @@ -use std::sync::Arc; - -use arrow_schema::{DataType, Field, Fields, Schema}; +use arrow_schema::{Field, Fields, Schema}; +use lazy_static::lazy_static; use super::super::ActionType; +lazy_static! { + static ref ARROW_METADATA_FIELD: Field = + ActionType::Metadata.schema_field().try_into().unwrap(); + static ref ARROW_PROTOCOL_FIELD: Field = + ActionType::Protocol.schema_field().try_into().unwrap(); + static ref ARROW_COMMIT_INFO_FIELD: Field = + ActionType::CommitInfo.schema_field().try_into().unwrap(); + static ref ARROW_ADD_FIELD: Field = ActionType::Add.schema_field().try_into().unwrap(); + static ref ARROW_REMOVE_FIELD: Field = ActionType::Remove.schema_field().try_into().unwrap(); + static ref ARROW_CDC_FIELD: Field = ActionType::Cdc.schema_field().try_into().unwrap(); + static ref ARROW_TXN_FIELD: Field = ActionType::Txn.schema_field().try_into().unwrap(); + static ref ARROW_DOMAIN_METADATA_FIELD: Field = ActionType::DomainMetadata + .schema_field() + .try_into() + .unwrap(); + static ref ARROW_CHECKPOINT_METADATA_FIELD: Field = ActionType::CheckpointMetadata + .schema_field() + .try_into() + .unwrap(); + static ref ARROW_SIDECAR_FIELD: Field = ActionType::Sidecar.schema_field().try_into().unwrap(); +} + impl ActionType { /// Returns the root field for the action type - pub fn arrow_field(&self) -> Field { - match self { - Self::Add => get_root("add", self.arrow_fields()), - Self::Cdc => get_root("cdc", self.arrow_fields()), - Self::CommitInfo => get_root("commitInfo", self.arrow_fields()), - Self::DomainMetadata => get_root("domainMetadata", self.arrow_fields()), - Self::Metadata => get_root("metaData", self.arrow_fields()), - Self::Protocol => get_root("protocol", self.arrow_fields()), - Self::Remove => get_root("remove", self.arrow_fields()), - Self::Txn => get_root("txn", self.arrow_fields()), - } - } - - /// Returns the child fields for the action type - pub fn arrow_fields(&self) -> Vec { + pub fn arrow_field(&self) -> &Field { match self { - Self::Add => add_fields(), - Self::Cdc => cdc_fields(), - Self::CommitInfo => commit_info_fields(), - Self::DomainMetadata => domain_metadata_fields(), - Self::Metadata => metadata_fields(), - Self::Protocol => protocol_fields(), - Self::Remove => remove_fields(), - Self::Txn => txn_fields(), + Self::Metadata => &ARROW_METADATA_FIELD, + Self::Protocol => &ARROW_PROTOCOL_FIELD, + Self::CommitInfo => &ARROW_COMMIT_INFO_FIELD, + Self::Add => &ARROW_ADD_FIELD, + Self::Remove => &ARROW_REMOVE_FIELD, + Self::Cdc => &ARROW_CDC_FIELD, + Self::Txn => &ARROW_TXN_FIELD, + Self::DomainMetadata => &ARROW_DOMAIN_METADATA_FIELD, + Self::CheckpointMetadata => &ARROW_CHECKPOINT_METADATA_FIELD, + Self::Sidecar => &ARROW_SIDECAR_FIELD, } } } /// Returns the schema for the delta log -#[allow(dead_code)] pub fn get_log_schema() -> Schema { Schema { fields: Fields::from_iter([ - ActionType::Add.arrow_field(), - ActionType::Cdc.arrow_field(), - ActionType::CommitInfo.arrow_field(), - ActionType::DomainMetadata.arrow_field(), - ActionType::Metadata.arrow_field(), - ActionType::Protocol.arrow_field(), - ActionType::Remove.arrow_field(), - ActionType::Txn.arrow_field(), + ActionType::Add.arrow_field().clone(), + ActionType::Cdc.arrow_field().clone(), + ActionType::CommitInfo.arrow_field().clone(), + ActionType::DomainMetadata.arrow_field().clone(), + ActionType::Metadata.arrow_field().clone(), + ActionType::Protocol.arrow_field().clone(), + ActionType::Remove.arrow_field().clone(), + ActionType::Txn.arrow_field().clone(), ]), metadata: Default::default(), } } - -fn get_root(name: &str, fields: Vec) -> Field { - Field::new(name, DataType::Struct(Fields::from_iter(fields)), true) -} - -fn add_fields() -> Vec { - Vec::from_iter([ - Field::new("path", DataType::Utf8, false), - Field::new("size", DataType::Int64, false), - Field::new("modificationTime", DataType::Int64, false), - Field::new("dataChange", DataType::Boolean, false), - Field::new("stats", DataType::Utf8, true), - Field::new( - "partitionValues", - DataType::Map(Arc::new(get_map_field()), false), - true, - ), - Field::new( - "tags", - DataType::Map(Arc::new(get_map_field()), false), - true, - ), - Field::new( - "deletionVector", - DataType::Struct(Fields::from(vec![ - Field::new("storageType", DataType::Utf8, false), - Field::new("pathOrInlineDv", DataType::Utf8, false), - Field::new("offset", DataType::Int32, true), - Field::new("sizeInBytes", DataType::Int32, false), - Field::new("cardinality", DataType::Int64, false), - ])), - true, - ), - Field::new("baseRowId", DataType::Int64, true), - Field::new("defaultRowCommitVersion", DataType::Int64, true), - ]) -} - -fn cdc_fields() -> Vec { - Vec::from_iter([ - Field::new("path", DataType::Utf8, true), - Field::new( - "partitionValues", - DataType::Map(Arc::new(get_map_field()), false), - true, - ), - Field::new("size", DataType::Int64, true), - Field::new("dataChange", DataType::Boolean, true), - Field::new( - "tags", - DataType::Map(Arc::new(get_map_field()), false), - true, - ), - ]) -} - -fn remove_fields() -> Vec { - Vec::from_iter([ - Field::new("path", DataType::Utf8, true), - Field::new("deletionTimestamp", DataType::Int64, true), - Field::new("dataChange", DataType::Boolean, true), - Field::new("extendedFileMetadata", DataType::Boolean, true), - Field::new("size", DataType::Int64, true), - Field::new( - "partitionValues", - DataType::Map(Arc::new(get_map_field()), false), - true, - ), - Field::new( - "tags", - DataType::Map(Arc::new(get_map_field()), false), - true, - ), - ]) -} - -fn metadata_fields() -> Vec { - Vec::from_iter([ - Field::new("id", DataType::Utf8, false), - Field::new("name", DataType::Utf8, true), - Field::new("description", DataType::Utf8, true), - Field::new( - "format", - DataType::Struct(Fields::from_iter([ - Field::new("provider", DataType::Utf8, true), - Field::new( - "options", - DataType::Map( - Arc::new(Field::new( - "key_value", - DataType::Struct(Fields::from_iter([ - Field::new("key", DataType::Utf8, false), - Field::new("value", DataType::Utf8, true), - ])), - false, - )), - false, - ), - false, - ), - ])), - false, - ), - Field::new("schemaString", DataType::Utf8, false), - Field::new("createdTime", DataType::Int64, true), - Field::new( - "partitionColumns", - DataType::List(Arc::new(Field::new("element", DataType::Utf8, false))), - false, - ), - Field::new( - "configuration", - DataType::Map( - Arc::new(Field::new( - "key_value", - DataType::Struct(Fields::from_iter([ - Field::new("key", DataType::Utf8, false), - Field::new("value", DataType::Utf8, true), - ])), - false, - )), - false, - ), - true, - ), - ]) -} - -fn protocol_fields() -> Vec { - Vec::from_iter([ - Field::new("minReaderVersion", DataType::Int32, false), - Field::new("minWriterVersion", DataType::Int32, false), - Field::new( - "readerFeatures", - DataType::List(Arc::new(Field::new("element", DataType::Utf8, false))), - true, - ), - Field::new( - "writerFeatures", - DataType::List(Arc::new(Field::new("element", DataType::Utf8, false))), - true, - ), - ]) -} - -fn txn_fields() -> Vec { - Vec::from_iter([ - Field::new("appId", DataType::Utf8, true), - Field::new("version", DataType::Int64, true), - Field::new("lastUpdated", DataType::Int64, true), - ]) -} - -fn commit_info_fields() -> Vec { - Vec::from_iter([ - Field::new("timestamp", DataType::Int64, true), - Field::new("operation", DataType::Utf8, true), - Field::new("isolationLevel", DataType::Utf8, true), - Field::new("isBlindAppend", DataType::Boolean, true), - Field::new("txnId", DataType::Utf8, true), - Field::new("readVersion", DataType::Int32, true), - Field::new( - "operationParameters", - DataType::Map(Arc::new(get_map_field()), false), - true, - ), - Field::new( - "operationMetrics", - DataType::Map(Arc::new(get_map_field()), false), - true, - ), - ]) -} - -fn domain_metadata_fields() -> Vec { - Vec::from_iter([ - Field::new("domain", DataType::Utf8, true), - Field::new( - "configuration", - DataType::Map(Arc::new(get_map_field()), false), - true, - ), - Field::new("removed", DataType::Boolean, true), - ]) -} - -fn get_map_field() -> Field { - Field::new( - "key_value", - DataType::Struct(Fields::from_iter([ - Field::new("key", DataType::Utf8, false), - Field::new("value", DataType::Utf8, true), - ])), - false, - ) -} diff --git a/crates/deltalake-core/src/kernel/actions/mod.rs b/crates/deltalake-core/src/kernel/actions/mod.rs index b03eb26381..ee01792a2f 100644 --- a/crates/deltalake-core/src/kernel/actions/mod.rs +++ b/crates/deltalake-core/src/kernel/actions/mod.rs @@ -34,6 +34,10 @@ pub enum ActionType { Remove, /// Transactional information Txn, + /// Checkpoint metadata + CheckpointMetadata, + /// Sidecar + Sidecar, } #[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)] diff --git a/crates/deltalake-core/src/kernel/actions/schemas.rs b/crates/deltalake-core/src/kernel/actions/schemas.rs index b1ebca3912..ad3e3ccbad 100644 --- a/crates/deltalake-core/src/kernel/actions/schemas.rs +++ b/crates/deltalake-core/src/kernel/actions/schemas.rs @@ -24,7 +24,7 @@ lazy_static! { DataType::string(), true, ))), - false, + true, ), ]))), false, @@ -71,7 +71,7 @@ lazy_static! { static ref COMMIT_INFO_FIELD: StructField = StructField::new( "commitInfo", DataType::Struct(Box::new(StructType::new(vec![ - StructField::new("timestamp", DataType::long(), false), + StructField::new("timestamp", DataType::timestamp(), false), StructField::new("operation", DataType::string(), false), StructField::new("isolationLevel", DataType::string(), true), StructField::new("isBlindAppend", DataType::boolean(), true), @@ -103,39 +103,13 @@ lazy_static! { "add", DataType::Struct(Box::new(StructType::new(vec![ StructField::new("path", DataType::string(), false), - StructField::new( - "partitionValues", - DataType::Map(Box::new(MapType::new( - DataType::string(), - DataType::string(), - true, - ))), - false, - ), + partition_values_field(), StructField::new("size", DataType::long(), false), StructField::new("modificationTime", DataType::timestamp(), false), StructField::new("dataChange", DataType::boolean(), false), StructField::new("stats", DataType::string(), true), - StructField::new( - "tags", - DataType::Map(Box::new(MapType::new( - DataType::string(), - DataType::string(), - true, - ))), - true, - ), - StructField::new( - "deletionVector", - DataType::Struct(Box::new(StructType::new(vec![ - StructField::new("storageType", DataType::string(), false), - StructField::new("pathOrInlineDv", DataType::string(), false), - StructField::new("offset", DataType::integer(), true), - StructField::new("sizeInBytes", DataType::integer(), false), - StructField::new("cardinality", DataType::long(), false), - ]))), - true, - ), + tags_field(), + deletion_vector_field(), StructField::new("baseRowId", DataType::long(), true), StructField::new("defaultRowCommitVersion", DataType::long(), true), ]))), @@ -149,25 +123,22 @@ lazy_static! { StructField::new("deletionTimestamp", DataType::timestamp(), true), StructField::new("dataChange", DataType::boolean(), false), StructField::new("extendedFileMetadata", DataType::boolean(), true), + partition_values_field(), StructField::new("size", DataType::long(), true), - StructField::new( - "partitionValues", - DataType::Map(Box::new(MapType::new( - DataType::string(), - DataType::string(), - true, - ))), - true, - ), - StructField::new( - "tags", - DataType::Map(Box::new(MapType::new( - DataType::string(), - DataType::string(), - true, - ))), - true, - ), + StructField::new("stats", DataType::string(), true), + tags_field(), + deletion_vector_field(), + StructField::new("baseRowId", DataType::long(), true), + StructField::new("defaultRowCommitVersion", DataType::long(), true), + ]))), + true, + ); + static ref REMOVE_FIELD_CHECKPOINT: StructField = StructField::new( + "remove", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("path", DataType::string(), false), + StructField::new("deletionTimestamp", DataType::timestamp(), true), + StructField::new("dataChange", DataType::boolean(), false), ]))), true, ); @@ -176,26 +147,10 @@ lazy_static! { "cdc", DataType::Struct(Box::new(StructType::new(vec![ StructField::new("path", DataType::string(), false), - StructField::new( - "partitionValues", - DataType::Map(Box::new(MapType::new( - DataType::string(), - DataType::string(), - true, - ))), - false, - ), + partition_values_field(), StructField::new("size", DataType::long(), false), StructField::new("dataChange", DataType::boolean(), false), - StructField::new( - "tags", - DataType::Map(Box::new(MapType::new( - DataType::string(), - DataType::string(), - true, - ))), - true, - ), + tags_field(), ]))), true, ); @@ -227,6 +182,65 @@ lazy_static! { ]))), true, ); + // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#checkpoint-metadata + static ref CHECKPOINT_METADATA_FIELD: StructField = StructField::new( + "checkpointMetadata", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("flavor", DataType::string(), false), + tags_field(), + ]))), + true, + ); + // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#sidecar-file-information + static ref SIDECAR_FIELD: StructField = StructField::new( + "sidecar", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("path", DataType::string(), false), + StructField::new("sizeInBytes", DataType::long(), false), + StructField::new("modificationTime", DataType::timestamp(), false), + StructField::new("type", DataType::string(), false), + tags_field(), + ]))), + true, + ); +} + +fn tags_field() -> StructField { + StructField::new( + "tags", + DataType::Map(Box::new(MapType::new( + DataType::string(), + DataType::string(), + true, + ))), + true, + ) +} + +fn partition_values_field() -> StructField { + StructField::new( + "partitionValues", + DataType::Map(Box::new(MapType::new( + DataType::string(), + DataType::string(), + true, + ))), + false, + ) +} + +fn deletion_vector_field() -> StructField { + StructField::new( + "deletionVector", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("storageType", DataType::string(), false), + StructField::new("pathOrInlineDv", DataType::string(), false), + StructField::new("offset", DataType::integer(), true), + StructField::new("sizeInBytes", DataType::integer(), false), + StructField::new("cardinality", DataType::long(), false), + ]))), + true, + ) } impl ActionType { @@ -241,6 +255,8 @@ impl ActionType { Self::Cdc => &CDC_FIELD, Self::Txn => &TXN_FIELD, Self::DomainMetadata => &DOMAIN_METADATA_FIELD, + Self::CheckpointMetadata => &CHECKPOINT_METADATA_FIELD, + Self::Sidecar => &SIDECAR_FIELD, } } } diff --git a/crates/deltalake-core/src/kernel/actions/types.rs b/crates/deltalake-core/src/kernel/actions/types.rs index 94a0caf4fe..aa60823e4a 100644 --- a/crates/deltalake-core/src/kernel/actions/types.rs +++ b/crates/deltalake-core/src/kernel/actions/types.rs @@ -787,11 +787,11 @@ pub struct DomainMetadata { /// This action is only allowed in checkpoints following V2 spec. It describes the details about the checkpoint. pub struct CheckpointMetadata { /// The flavor of the V2 checkpoint. Allowed values: "flat". - flavor: String, + pub flavor: String, /// Map containing any additional metadata about the v2 spec checkpoint. #[serde(skip_serializing_if = "Option::is_none")] - tags: Option>>, + pub tags: Option>>, } /// The sidecar action references a sidecar file which provides some of the checkpoint's file actions. @@ -816,7 +816,7 @@ pub struct Sidecar { /// Map containing any additional metadata about the checkpoint sidecar file. #[serde(skip_serializing_if = "Option::is_none")] - tags: Option>>, + pub tags: Option>>, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] diff --git a/crates/deltalake-core/src/table/state_arrow.rs b/crates/deltalake-core/src/table/state_arrow.rs index 9d82c87326..8b20fecd5d 100644 --- a/crates/deltalake-core/src/table/state_arrow.rs +++ b/crates/deltalake-core/src/table/state_arrow.rs @@ -708,6 +708,7 @@ fn json_value_to_array_general<'a>( .map(|value| value.and_then(|value| value.as_str().map(|value| value.as_bytes()))) .collect_vec(), ))), + // TODO should this actually be the type for timestamp without timezone? DataType::Timestamp(TimeUnit::Microsecond, None) => { Ok(Arc::new(TimestampMicrosecondArray::from( values @@ -719,6 +720,21 @@ fn json_value_to_array_general<'a>( .collect_vec(), ))) } + DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) + if tz.eq_ignore_ascii_case("utc") + || tz.eq_ignore_ascii_case("+00:00") + || tz.eq_ignore_ascii_case("-00:00") => + { + Ok(Arc::new(TimestampMicrosecondArray::from( + values + .map(|value| { + value.and_then(|value| { + value.as_str().and_then(TimestampMicrosecondType::parse) + }) + }) + .collect_vec(), + ))) + } DataType::Date32 => Ok(Arc::new(Date32Array::from( values .map(|value| value.and_then(|value| value.as_str().and_then(Date32Type::parse)))