diff --git a/crates/deltalake-core/src/kernel/actions/mod.rs b/crates/deltalake-core/src/kernel/actions/mod.rs index 865c9d3cd9..637d520c41 100644 --- a/crates/deltalake-core/src/kernel/actions/mod.rs +++ b/crates/deltalake-core/src/kernel/actions/mod.rs @@ -7,9 +7,7 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; -#[cfg(all(feature = "arrow", feature = "parquet"))] -pub(crate) mod arrow; -// pub(crate) mod schemas; +pub(crate) mod schemas; mod serde_path; pub(crate) mod types; @@ -32,10 +30,12 @@ pub enum ActionType { Protocol, /// modify the data in a table by removing individual logical files Remove, - /// The Row ID high-water mark tracks the largest ID that has been assigned to a row in the table. - RowIdHighWaterMark, /// Transactional information Txn, + /// Checkpoint metadata + CheckpointMetadata, + /// Sidecar + Sidecar, } #[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)] diff --git a/crates/deltalake-core/src/kernel/actions/schemas.rs b/crates/deltalake-core/src/kernel/actions/schemas.rs index 0cc870318f..ad3e3ccbad 100644 --- a/crates/deltalake-core/src/kernel/actions/schemas.rs +++ b/crates/deltalake-core/src/kernel/actions/schemas.rs @@ -1,255 +1,262 @@ -use std::sync::Arc; +//! Schema definitions for action types -use arrow_schema::{DataType, Field, Fields, Schema}; +use lazy_static::lazy_static; use super::ActionType; +use crate::kernel::schema::{ArrayType, DataType, MapType, StructField, StructType}; -impl ActionType { - /// Returns the root field for the action type - pub fn field(&self) -> Field { - match self { - Self::Add => get_root("add", self.fields()), - Self::Cdc => get_root("cdc", self.fields()), - Self::CommitInfo => get_root("commitInfo", self.fields()), - Self::DomainMetadata => get_root("domainMetadata", self.fields()), - Self::Metadata => get_root("metaData", self.fields()), - Self::Protocol => get_root("protocol", self.fields()), - Self::Remove => get_root("remove", self.fields()), - Self::RowIdHighWaterMark => get_root("rowIdHighWaterMark", self.fields()), - Self::Txn => get_root("txn", self.fields()), - } - } - - /// Returns the child fields for the action type - pub fn fields(&self) -> Vec { - match self { - Self::Add => add_fields(), - Self::Cdc => cdc_fields(), - Self::CommitInfo => commit_info_fields(), - Self::DomainMetadata => domain_metadata_fields(), - Self::Metadata => metadata_fields(), - Self::Protocol => protocol_fields(), - Self::Remove => remove_fields(), - Self::RowIdHighWaterMark => watermark_fields(), - Self::Txn => txn_fields(), - } - } -} - -/// Returns the schema for the delta log -pub fn get_log_schema() -> Schema { - Schema { - fields: Fields::from_iter([ - ActionType::Add.field(), - ActionType::Cdc.field(), - ActionType::CommitInfo.field(), - ActionType::DomainMetadata.field(), - ActionType::Metadata.field(), - ActionType::Protocol.field(), - ActionType::Remove.field(), - ActionType::RowIdHighWaterMark.field(), - ActionType::Txn.field(), - ]), - metadata: Default::default(), - } -} - -fn get_root(name: &str, fields: Vec) -> Field { - Field::new(name, DataType::Struct(Fields::from_iter(fields)), true) -} - -fn add_fields() -> Vec { - Vec::from_iter([ - Field::new("path", DataType::Utf8, false), - Field::new("size", DataType::Int64, false), - Field::new("modificationTime", DataType::Int64, false), - Field::new("dataChange", DataType::Boolean, false), - Field::new("stats", DataType::Utf8, true), - Field::new( - "partitionValues", - DataType::Map(Arc::new(get_map_field()), false), - true, - ), - Field::new( - "tags", - DataType::Map(Arc::new(get_map_field()), false), - true, - ), - Field::new( - "deletionVector", - DataType::Struct(Fields::from(vec![ - Field::new("storageType", DataType::Utf8, false), - Field::new("pathOrInlineDv", DataType::Utf8, false), - Field::new("offset", DataType::Int32, true), - Field::new("sizeInBytes", DataType::Int32, false), - Field::new("cardinality", DataType::Int64, false), - ])), - true, - ), - Field::new("baseRowId", DataType::Int64, true), - Field::new("defaultRowCommitVersion", DataType::Int64, true), - ]) -} - -fn cdc_fields() -> Vec { - Vec::from_iter([ - Field::new("path", DataType::Utf8, true), - Field::new( - "partitionValues", - DataType::Map(Arc::new(get_map_field()), false), - true, - ), - Field::new("size", DataType::Int64, true), - Field::new("dataChange", DataType::Boolean, true), - Field::new( - "tags", - DataType::Map(Arc::new(get_map_field()), false), - true, - ), - ]) -} - -fn remove_fields() -> Vec { - Vec::from_iter([ - Field::new("path", DataType::Utf8, true), - Field::new("deletionTimestamp", DataType::Int64, true), - Field::new("dataChange", DataType::Boolean, true), - Field::new("extendedFileMetadata", DataType::Boolean, true), - Field::new("size", DataType::Int64, true), - Field::new( - "partitionValues", - DataType::Map(Arc::new(get_map_field()), false), - true, - ), - Field::new( - "tags", - DataType::Map(Arc::new(get_map_field()), false), - true, - ), - ]) -} - -fn metadata_fields() -> Vec { - Vec::from_iter([ - Field::new("id", DataType::Utf8, false), - Field::new("name", DataType::Utf8, true), - Field::new("description", DataType::Utf8, true), - Field::new( - "format", - DataType::Struct(Fields::from_iter([ - Field::new("provider", DataType::Utf8, true), - Field::new( - "options", - DataType::Map( - Arc::new(Field::new( - "key_value", - DataType::Struct(Fields::from_iter([ - Field::new("key", DataType::Utf8, false), - Field::new("value", DataType::Utf8, true), - ])), - false, - )), - false, +lazy_static! { + // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-metadata + static ref METADATA_FIELD: StructField = StructField::new( + "metaData", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("id", DataType::string(), false), + StructField::new("name", DataType::string(), true), + StructField::new("description", DataType::string(), true), + StructField::new( + "format", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("provider", DataType::string(), false), + StructField::new( + "configuration", + DataType::Map(Box::new(MapType::new( + DataType::string(), + DataType::string(), + true, + ))), + true, ), - false, - ), - ])), - false, - ), - Field::new("schemaString", DataType::Utf8, false), - Field::new("createdTime", DataType::Int64, true), - Field::new( - "partitionColumns", - DataType::List(Arc::new(Field::new("element", DataType::Utf8, false))), - false, - ), - Field::new( - "configuration", - DataType::Map( - Arc::new(Field::new( - "key_value", - DataType::Struct(Fields::from_iter([ - Field::new("key", DataType::Utf8, false), - Field::new("value", DataType::Utf8, true), - ])), - false, - )), + ]))), false, ), - true, - ), - ]) + StructField::new("schemaString", DataType::string(), false), + StructField::new( + "partitionColumns", + DataType::Array(Box::new(ArrayType::new(DataType::string(), false))), + false, + ), + StructField::new("createdTime", DataType::long(), true), + StructField::new( + "configuration", + DataType::Map(Box::new(MapType::new( + DataType::string(), + DataType::string(), + true, + ))), + false, + ), + ]))), + true, + ); + // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#protocol-evolution + static ref PROTOCOL_FIELD: StructField = StructField::new( + "protocol", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("minReaderVersion", DataType::integer(), false), + StructField::new("minWriterVersion", DataType::integer(), false), + StructField::new( + "readerFeatures", + DataType::Array(Box::new(ArrayType::new(DataType::string(), false))), + true, + ), + StructField::new( + "writerFeatures", + DataType::Array(Box::new(ArrayType::new(DataType::string(), false))), + true, + ), + ]))), + true, + ); + // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#commit-provenance-information + static ref COMMIT_INFO_FIELD: StructField = StructField::new( + "commitInfo", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("timestamp", DataType::timestamp(), false), + StructField::new("operation", DataType::string(), false), + StructField::new("isolationLevel", DataType::string(), true), + StructField::new("isBlindAppend", DataType::boolean(), true), + StructField::new("txnId", DataType::string(), true), + StructField::new("readVersion", DataType::long(), true), + StructField::new( + "operationParameters", + DataType::Map(Box::new(MapType::new( + DataType::string(), + DataType::string(), + true, + ))), + true, + ), + StructField::new( + "operationMetrics", + DataType::Map(Box::new(MapType::new( + DataType::string(), + DataType::string(), + true, + ))), + true, + ), + ]))), + true, + ); + // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file + static ref ADD_FIELD: StructField = StructField::new( + "add", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("path", DataType::string(), false), + partition_values_field(), + StructField::new("size", DataType::long(), false), + StructField::new("modificationTime", DataType::timestamp(), false), + StructField::new("dataChange", DataType::boolean(), false), + StructField::new("stats", DataType::string(), true), + tags_field(), + deletion_vector_field(), + StructField::new("baseRowId", DataType::long(), true), + StructField::new("defaultRowCommitVersion", DataType::long(), true), + ]))), + true, + ); + // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file + static ref REMOVE_FIELD: StructField = StructField::new( + "remove", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("path", DataType::string(), false), + StructField::new("deletionTimestamp", DataType::timestamp(), true), + StructField::new("dataChange", DataType::boolean(), false), + StructField::new("extendedFileMetadata", DataType::boolean(), true), + partition_values_field(), + StructField::new("size", DataType::long(), true), + StructField::new("stats", DataType::string(), true), + tags_field(), + deletion_vector_field(), + StructField::new("baseRowId", DataType::long(), true), + StructField::new("defaultRowCommitVersion", DataType::long(), true), + ]))), + true, + ); + static ref REMOVE_FIELD_CHECKPOINT: StructField = StructField::new( + "remove", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("path", DataType::string(), false), + StructField::new("deletionTimestamp", DataType::timestamp(), true), + StructField::new("dataChange", DataType::boolean(), false), + ]))), + true, + ); + // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-cdc-file + static ref CDC_FIELD: StructField = StructField::new( + "cdc", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("path", DataType::string(), false), + partition_values_field(), + StructField::new("size", DataType::long(), false), + StructField::new("dataChange", DataType::boolean(), false), + tags_field(), + ]))), + true, + ); + // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#transaction-identifiers + static ref TXN_FIELD: StructField = StructField::new( + "txn", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("appId", DataType::string(), false), + StructField::new("version", DataType::long(), false), + StructField::new("lastUpdated", DataType::timestamp(), true), + ]))), + true, + ); + // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#domain-metadata + static ref DOMAIN_METADATA_FIELD: StructField = StructField::new( + "domainMetadata", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("domain", DataType::string(), false), + StructField::new( + "configuration", + DataType::Map(Box::new(MapType::new( + DataType::string(), + DataType::string(), + true, + ))), + false, + ), + StructField::new("removed", DataType::boolean(), false), + ]))), + true, + ); + // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#checkpoint-metadata + static ref CHECKPOINT_METADATA_FIELD: StructField = StructField::new( + "checkpointMetadata", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("flavor", DataType::string(), false), + tags_field(), + ]))), + true, + ); + // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#sidecar-file-information + static ref SIDECAR_FIELD: StructField = StructField::new( + "sidecar", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("path", DataType::string(), false), + StructField::new("sizeInBytes", DataType::long(), false), + StructField::new("modificationTime", DataType::timestamp(), false), + StructField::new("type", DataType::string(), false), + tags_field(), + ]))), + true, + ); } -fn protocol_fields() -> Vec { - Vec::from_iter([ - Field::new("minReaderVersion", DataType::Int32, false), - Field::new("minWriterVersion", DataType::Int32, false), - Field::new( - "readerFeatures", - DataType::List(Arc::new(Field::new("element", DataType::Utf8, false))), +fn tags_field() -> StructField { + StructField::new( + "tags", + DataType::Map(Box::new(MapType::new( + DataType::string(), + DataType::string(), true, - ), - Field::new( - "writerFeatures", - DataType::List(Arc::new(Field::new("element", DataType::Utf8, false))), - true, - ), - ]) -} - -fn txn_fields() -> Vec { - Vec::from_iter([ - Field::new("appId", DataType::Utf8, true), - Field::new("version", DataType::Int64, true), - Field::new("lastUpdated", DataType::Int64, true), - ]) -} - -fn watermark_fields() -> Vec { - Vec::from_iter([Field::new("highWaterMark", DataType::Int64, true)]) + ))), + true, + ) } -fn commit_info_fields() -> Vec { - Vec::from_iter([ - Field::new("timestamp", DataType::Int64, true), - Field::new("operation", DataType::Utf8, true), - Field::new("isolationLevel", DataType::Utf8, true), - Field::new("isBlindAppend", DataType::Boolean, true), - Field::new("txnId", DataType::Utf8, true), - Field::new("readVersion", DataType::Int32, true), - Field::new( - "operationParameters", - DataType::Map(Arc::new(get_map_field()), false), - true, - ), - Field::new( - "operationMetrics", - DataType::Map(Arc::new(get_map_field()), false), +fn partition_values_field() -> StructField { + StructField::new( + "partitionValues", + DataType::Map(Box::new(MapType::new( + DataType::string(), + DataType::string(), true, - ), - ]) + ))), + false, + ) } -fn domain_metadata_fields() -> Vec { - Vec::from_iter([ - Field::new("domain", DataType::Utf8, true), - Field::new( - "configuration", - DataType::Map(Arc::new(get_map_field()), false), - true, - ), - Field::new("removed", DataType::Boolean, true), - ]) +fn deletion_vector_field() -> StructField { + StructField::new( + "deletionVector", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("storageType", DataType::string(), false), + StructField::new("pathOrInlineDv", DataType::string(), false), + StructField::new("offset", DataType::integer(), true), + StructField::new("sizeInBytes", DataType::integer(), false), + StructField::new("cardinality", DataType::long(), false), + ]))), + true, + ) } -fn get_map_field() -> Field { - Field::new( - "key_value", - DataType::Struct(Fields::from_iter([ - Field::new("key", DataType::Utf8, false), - Field::new("value", DataType::Utf8, true), - ])), - false, - ) +impl ActionType { + /// Returns the type of the corresponding field in the delta log schema + pub fn schema_field(&self) -> &StructField { + match self { + Self::Metadata => &METADATA_FIELD, + Self::Protocol => &PROTOCOL_FIELD, + Self::CommitInfo => &COMMIT_INFO_FIELD, + Self::Add => &ADD_FIELD, + Self::Remove => &REMOVE_FIELD, + Self::Cdc => &CDC_FIELD, + Self::Txn => &TXN_FIELD, + Self::DomainMetadata => &DOMAIN_METADATA_FIELD, + Self::CheckpointMetadata => &CHECKPOINT_METADATA_FIELD, + Self::Sidecar => &SIDECAR_FIELD, + } + } } diff --git a/crates/deltalake-core/src/kernel/actions/types.rs b/crates/deltalake-core/src/kernel/actions/types.rs index a788315b82..aa60823e4a 100644 --- a/crates/deltalake-core/src/kernel/actions/types.rs +++ b/crates/deltalake-core/src/kernel/actions/types.rs @@ -174,7 +174,7 @@ pub enum ReaderFeatures { /// Mapping of one column to another ColumnMapping, /// Deletion vectors for merge, update, delete - DeleteionVecotrs, + DeletionVectors, /// timestamps without timezone support #[serde(alias = "timestampNtz")] TimestampWithoutTimezone, @@ -185,26 +185,13 @@ pub enum ReaderFeatures { Other(String), } -#[allow(clippy::from_over_into)] -impl Into for ReaderFeatures { - fn into(self) -> usize { - match self { - ReaderFeatures::Other(_) => 0, - ReaderFeatures::ColumnMapping => 2, - ReaderFeatures::DeleteionVecotrs - | ReaderFeatures::TimestampWithoutTimezone - | ReaderFeatures::V2Checkpoint => 3, - } - } -} - #[cfg(all(not(feature = "parquet2"), feature = "parquet"))] impl From<&parquet::record::Field> for ReaderFeatures { fn from(value: &parquet::record::Field) -> Self { match value { parquet::record::Field::Str(feature) => match feature.as_str() { "columnMapping" => ReaderFeatures::ColumnMapping, - "deletionVectors" => ReaderFeatures::DeleteionVecotrs, + "deletionVectors" => ReaderFeatures::DeletionVectors, "timestampNtz" => ReaderFeatures::TimestampWithoutTimezone, "v2Checkpoint" => ReaderFeatures::V2Checkpoint, f => ReaderFeatures::Other(f.to_string()), @@ -216,9 +203,15 @@ impl From<&parquet::record::Field> for ReaderFeatures { impl From for ReaderFeatures { fn from(value: String) -> Self { - match value.as_str() { + value.as_str().into() + } +} + +impl From<&str> for ReaderFeatures { + fn from(value: &str) -> Self { + match value { "columnMapping" => ReaderFeatures::ColumnMapping, - "deletionVectors" => ReaderFeatures::DeleteionVecotrs, + "deletionVectors" => ReaderFeatures::DeletionVectors, "timestampNtz" => ReaderFeatures::TimestampWithoutTimezone, "v2Checkpoint" => ReaderFeatures::V2Checkpoint, f => ReaderFeatures::Other(f.to_string()), @@ -230,7 +223,7 @@ impl AsRef for ReaderFeatures { fn as_ref(&self) -> &str { match self { ReaderFeatures::ColumnMapping => "columnMapping", - ReaderFeatures::DeleteionVecotrs => "deletionVectors", + ReaderFeatures::DeletionVectors => "deletionVectors", ReaderFeatures::TimestampWithoutTimezone => "timestampNtz", ReaderFeatures::V2Checkpoint => "v2Checkpoint", ReaderFeatures::Other(f) => f, @@ -264,7 +257,7 @@ pub enum WriterFeatures { /// ID Columns IdentityColumns, /// Deletion vectors for merge, update, delete - DeleteionVecotrs, + DeletionVectors, /// Row tracking on tables RowTracking, /// timestamps without timezone support @@ -281,29 +274,15 @@ pub enum WriterFeatures { Other(String), } -#[allow(clippy::from_over_into)] -impl Into for WriterFeatures { - fn into(self) -> usize { - match self { - WriterFeatures::Other(_) => 0, - WriterFeatures::AppendOnly | WriterFeatures::Invariants => 2, - WriterFeatures::CheckConstraints => 3, - WriterFeatures::ChangeDataFeed | WriterFeatures::GeneratedColumns => 4, - WriterFeatures::ColumnMapping => 5, - WriterFeatures::IdentityColumns - | WriterFeatures::DeleteionVecotrs - | WriterFeatures::RowTracking - | WriterFeatures::TimestampWithoutTimezone - | WriterFeatures::DomainMetadata - | WriterFeatures::V2Checkpoint - | WriterFeatures::IcebergCompatV1 => 7, - } +impl From for WriterFeatures { + fn from(value: String) -> Self { + value.as_str().into() } } -impl From for WriterFeatures { - fn from(value: String) -> Self { - match value.as_str() { +impl From<&str> for WriterFeatures { + fn from(value: &str) -> Self { + match value { "appendOnly" => WriterFeatures::AppendOnly, "invariants" => WriterFeatures::Invariants, "checkConstraints" => WriterFeatures::CheckConstraints, @@ -311,7 +290,7 @@ impl From for WriterFeatures { "generatedColumns" => WriterFeatures::GeneratedColumns, "columnMapping" => WriterFeatures::ColumnMapping, "identityColumns" => WriterFeatures::IdentityColumns, - "deletionVectors" => WriterFeatures::DeleteionVecotrs, + "deletionVectors" => WriterFeatures::DeletionVectors, "rowTracking" => WriterFeatures::RowTracking, "timestampNtz" => WriterFeatures::TimestampWithoutTimezone, "domainMetadata" => WriterFeatures::DomainMetadata, @@ -332,7 +311,7 @@ impl AsRef for WriterFeatures { WriterFeatures::GeneratedColumns => "generatedColumns", WriterFeatures::ColumnMapping => "columnMapping", WriterFeatures::IdentityColumns => "identityColumns", - WriterFeatures::DeleteionVecotrs => "deletionVectors", + WriterFeatures::DeletionVectors => "deletionVectors", WriterFeatures::RowTracking => "rowTracking", WriterFeatures::TimestampWithoutTimezone => "timestampNtz", WriterFeatures::DomainMetadata => "domainMetadata", @@ -361,7 +340,7 @@ impl From<&parquet::record::Field> for WriterFeatures { "generatedColumns" => WriterFeatures::GeneratedColumns, "columnMapping" => WriterFeatures::ColumnMapping, "identityColumns" => WriterFeatures::IdentityColumns, - "deletionVectors" => WriterFeatures::DeleteionVecotrs, + "deletionVectors" => WriterFeatures::DeletionVectors, "rowTracking" => WriterFeatures::RowTracking, "timestampNtz" => WriterFeatures::TimestampWithoutTimezone, "domainMetadata" => WriterFeatures::DomainMetadata, @@ -421,7 +400,7 @@ impl AsRef for StorageType { impl ToString for StorageType { fn to_string(&self) -> String { - self.as_ref().to_string() + self.as_ref().into() } } @@ -450,6 +429,7 @@ pub struct DeletionVectorDescriptor { /// Start of the data for this DV in number of bytes from the beginning of the file it is stored in. /// Always None (absent in JSON) when `storageType = 'i'`. + #[serde(skip_serializing_if = "Option::is_none")] pub offset: Option, /// Size of the serialized DV in bytes (raw data size, i.e. before base85 encoding, if inline). @@ -662,9 +642,11 @@ pub struct Remove { pub data_change: bool, /// The time this logical file was created, as milliseconds since the epoch. + #[serde(skip_serializing_if = "Option::is_none")] pub deletion_timestamp: Option, /// When true the fields `partition_values`, `size`, and `tags` are present + #[serde(skip_serializing_if = "Option::is_none")] pub extended_file_metadata: Option, /// A map from partition column to value for this logical file. @@ -686,9 +668,11 @@ pub struct Remove { /// Default generated Row ID of the first row in the file. The default generated Row IDs /// of the other rows in the file can be reconstructed by adding the physical index of the /// row within the file to the base Row ID + #[serde(skip_serializing_if = "Option::is_none")] pub base_row_id: Option, /// First commit version in which an add action with the same path was committed to the table. + #[serde(skip_serializing_if = "Option::is_none")] pub default_row_commit_version: Option, } @@ -707,13 +691,18 @@ pub struct AddCDCFile { /// absolute path to a CDC file #[serde(with = "serde_path")] pub path: String, + /// The size of this file in bytes pub size: i64, + /// A map from partition column to value for this file pub partition_values: HashMap>, + /// Should always be set to false because they do not change the underlying data of the table pub data_change: bool, + /// Map containing metadata about this file + #[serde(skip_serializing_if = "Option::is_none")] pub tags: Option>>, } @@ -724,9 +713,12 @@ pub struct AddCDCFile { pub struct Txn { /// A unique identifier for the application performing the transaction. pub app_id: String, + /// An application-specific numeric identifier for this transaction. pub version: i64, + /// The time when this transaction action was created in milliseconds since the Unix epoch. + #[serde(skip_serializing_if = "Option::is_none")] pub last_updated: Option, } @@ -739,30 +731,39 @@ pub struct CommitInfo { /// Timestamp in millis when the commit was created #[serde(skip_serializing_if = "Option::is_none")] pub timestamp: Option, + /// Id of the user invoking the commit #[serde(skip_serializing_if = "Option::is_none")] pub user_id: Option, + /// Name of the user invoking the commit #[serde(skip_serializing_if = "Option::is_none")] pub user_name: Option, + /// The operation performed during the #[serde(skip_serializing_if = "Option::is_none")] pub operation: Option, + /// Parameters used for table operation #[serde(skip_serializing_if = "Option::is_none")] pub operation_parameters: Option>, + /// Version of the table when the operation was started #[serde(skip_serializing_if = "Option::is_none")] pub read_version: Option, + /// The isolation level of the commit #[serde(skip_serializing_if = "Option::is_none")] pub isolation_level: Option, + /// TODO #[serde(skip_serializing_if = "Option::is_none")] pub is_blind_append: Option, + /// Delta engine which created the commit. #[serde(skip_serializing_if = "Option::is_none")] pub engine_info: Option, + /// Additional provenance information for the commit #[serde(flatten, default)] pub info: HashMap, @@ -774,12 +775,50 @@ pub struct CommitInfo { pub struct DomainMetadata { /// Identifier for this domain (system or user-provided) pub domain: String, + /// String containing configuration for the metadata domain pub configuration: String, + /// When `true` the action serves as a tombstone pub removed: bool, } +#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)] +/// This action is only allowed in checkpoints following V2 spec. It describes the details about the checkpoint. +pub struct CheckpointMetadata { + /// The flavor of the V2 checkpoint. Allowed values: "flat". + pub flavor: String, + + /// Map containing any additional metadata about the v2 spec checkpoint. + #[serde(skip_serializing_if = "Option::is_none")] + pub tags: Option>>, +} + +/// The sidecar action references a sidecar file which provides some of the checkpoint's file actions. +/// This action is only allowed in checkpoints following V2 spec. +#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct Sidecar { + /// The name of the sidecar file (not a path). + /// The file must reside in the _delta_log/_sidecars directory. + pub file_name: String, + + /// The size of the sidecar file in bytes + pub size_in_bytes: i64, + + /// The time this sidecar file was created, as milliseconds since the epoch. + pub modification_time: i64, + + /// Type of sidecar. Valid values are: "fileaction". + /// This could be extended in future to allow different kinds of sidecars. + #[serde(rename = "type")] + pub sidecar_type: String, + + /// Map containing any additional metadata about the checkpoint sidecar file. + #[serde(skip_serializing_if = "Option::is_none")] + pub tags: Option>>, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] /// The isolation level applied during transaction pub enum IsolationLevel { diff --git a/crates/deltalake-core/src/kernel/actions/arrow.rs b/crates/deltalake-core/src/kernel/arrow/mod.rs similarity index 98% rename from crates/deltalake-core/src/kernel/actions/arrow.rs rename to crates/deltalake-core/src/kernel/arrow/mod.rs index d292362604..0c89f6ab48 100644 --- a/crates/deltalake-core/src/kernel/actions/arrow.rs +++ b/crates/deltalake-core/src/kernel/arrow/mod.rs @@ -1,3 +1,5 @@ +//! Conversions between Delta and Arrow data types + use std::sync::Arc; use arrow_schema::{ @@ -6,7 +8,12 @@ use arrow_schema::{ }; use lazy_static::lazy_static; -use super::super::schema::{ArrayType, DataType, MapType, PrimitiveType, StructField, StructType}; +use super::schema::{ArrayType, DataType, MapType, PrimitiveType, StructField, StructType}; + +pub mod schemas; + +const MAP_KEYS_NAME: &str = "keys"; +const MAP_VALUES_NAME: &str = "values"; impl TryFrom<&StructType> for ArrowSchema { type Error = ArrowError; @@ -64,9 +71,9 @@ impl TryFrom<&MapType> for ArrowField { "entries", ArrowDataType::Struct( vec![ - ArrowField::new("key", ArrowDataType::try_from(a.key_type())?, false), + ArrowField::new(MAP_KEYS_NAME, ArrowDataType::try_from(a.key_type())?, false), ArrowField::new( - "value", + MAP_VALUES_NAME, ArrowDataType::try_from(a.value_type())?, a.value_contains_null(), ), @@ -143,12 +150,12 @@ impl TryFrom<&DataType> for ArrowDataType { ArrowDataType::Struct( vec![ ArrowField::new( - "keys", + MAP_KEYS_NAME, >::try_from(m.key_type())?, false, ), ArrowField::new( - "values", + MAP_VALUES_NAME, >::try_from(m.value_type())?, m.value_contains_null(), ), diff --git a/crates/deltalake-core/src/kernel/arrow/schemas.rs b/crates/deltalake-core/src/kernel/arrow/schemas.rs new file mode 100644 index 0000000000..80a29e065e --- /dev/null +++ b/crates/deltalake-core/src/kernel/arrow/schemas.rs @@ -0,0 +1,63 @@ +//! Arrow schemas for the delta log + +use arrow_schema::{Field, Fields, Schema}; +use lazy_static::lazy_static; + +use super::super::ActionType; + +lazy_static! { + static ref ARROW_METADATA_FIELD: Field = + ActionType::Metadata.schema_field().try_into().unwrap(); + static ref ARROW_PROTOCOL_FIELD: Field = + ActionType::Protocol.schema_field().try_into().unwrap(); + static ref ARROW_COMMIT_INFO_FIELD: Field = + ActionType::CommitInfo.schema_field().try_into().unwrap(); + static ref ARROW_ADD_FIELD: Field = ActionType::Add.schema_field().try_into().unwrap(); + static ref ARROW_REMOVE_FIELD: Field = ActionType::Remove.schema_field().try_into().unwrap(); + static ref ARROW_CDC_FIELD: Field = ActionType::Cdc.schema_field().try_into().unwrap(); + static ref ARROW_TXN_FIELD: Field = ActionType::Txn.schema_field().try_into().unwrap(); + static ref ARROW_DOMAIN_METADATA_FIELD: Field = ActionType::DomainMetadata + .schema_field() + .try_into() + .unwrap(); + static ref ARROW_CHECKPOINT_METADATA_FIELD: Field = ActionType::CheckpointMetadata + .schema_field() + .try_into() + .unwrap(); + static ref ARROW_SIDECAR_FIELD: Field = ActionType::Sidecar.schema_field().try_into().unwrap(); +} + +impl ActionType { + /// Returns the root field for the action type + pub fn arrow_field(&self) -> &Field { + match self { + Self::Metadata => &ARROW_METADATA_FIELD, + Self::Protocol => &ARROW_PROTOCOL_FIELD, + Self::CommitInfo => &ARROW_COMMIT_INFO_FIELD, + Self::Add => &ARROW_ADD_FIELD, + Self::Remove => &ARROW_REMOVE_FIELD, + Self::Cdc => &ARROW_CDC_FIELD, + Self::Txn => &ARROW_TXN_FIELD, + Self::DomainMetadata => &ARROW_DOMAIN_METADATA_FIELD, + Self::CheckpointMetadata => &ARROW_CHECKPOINT_METADATA_FIELD, + Self::Sidecar => &ARROW_SIDECAR_FIELD, + } + } +} + +/// Returns the schema for the delta log +pub fn get_log_schema() -> Schema { + Schema { + fields: Fields::from_iter([ + ActionType::Add.arrow_field().clone(), + ActionType::Cdc.arrow_field().clone(), + ActionType::CommitInfo.arrow_field().clone(), + ActionType::DomainMetadata.arrow_field().clone(), + ActionType::Metadata.arrow_field().clone(), + ActionType::Protocol.arrow_field().clone(), + ActionType::Remove.arrow_field().clone(), + ActionType::Txn.arrow_field().clone(), + ]), + metadata: Default::default(), + } +} diff --git a/crates/deltalake-core/src/kernel/error.rs b/crates/deltalake-core/src/kernel/error.rs index 8ec799ca96..a37dbdae67 100644 --- a/crates/deltalake-core/src/kernel/error.rs +++ b/crates/deltalake-core/src/kernel/error.rs @@ -23,9 +23,8 @@ pub enum Error { #[error("Arrow error: {0}")] Parquet(#[from] parquet::errors::ParquetError), - #[cfg(feature = "object_store")] #[error("Error interacting with object store: {0}")] - ObjectStore(object_store::Error), + ObjectStore(#[from] object_store::Error), #[error("File not found: {0}")] FileNotFound(String), diff --git a/crates/deltalake-core/src/kernel/mod.rs b/crates/deltalake-core/src/kernel/mod.rs index 7785c273f9..54f742c3fb 100644 --- a/crates/deltalake-core/src/kernel/mod.rs +++ b/crates/deltalake-core/src/kernel/mod.rs @@ -1,6 +1,8 @@ //! Kernel module pub mod actions; +#[cfg(all(feature = "arrow", feature = "parquet"))] +pub mod arrow; pub mod error; pub mod schema; diff --git a/crates/deltalake-core/src/protocol/checkpoints.rs b/crates/deltalake-core/src/protocol/checkpoints.rs index a4cc1b66c7..837483c35c 100644 --- a/crates/deltalake-core/src/protocol/checkpoints.rs +++ b/crates/deltalake-core/src/protocol/checkpoints.rs @@ -18,7 +18,7 @@ use regex::Regex; use serde_json::Value; use super::{time_utils, ProtocolError}; -use crate::kernel::actions::arrow::delta_log_schema_for_table; +use crate::kernel::arrow::delta_log_schema_for_table; use crate::kernel::{ Action, Add as AddAction, DataType, Metadata, PrimitiveType, Protocol, StructField, StructType, Txn, diff --git a/crates/deltalake-core/src/schema/arrow_convert.rs b/crates/deltalake-core/src/schema/arrow_convert.rs deleted file mode 100644 index d292362604..0000000000 --- a/crates/deltalake-core/src/schema/arrow_convert.rs +++ /dev/null @@ -1,1049 +0,0 @@ -use std::sync::Arc; - -use arrow_schema::{ - ArrowError, DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, - Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit, -}; -use lazy_static::lazy_static; - -use super::super::schema::{ArrayType, DataType, MapType, PrimitiveType, StructField, StructType}; - -impl TryFrom<&StructType> for ArrowSchema { - type Error = ArrowError; - - fn try_from(s: &StructType) -> Result { - let fields = s - .fields() - .iter() - .map(>::try_from) - .collect::, ArrowError>>()?; - - Ok(ArrowSchema::new(fields)) - } -} - -impl TryFrom<&StructField> for ArrowField { - type Error = ArrowError; - - fn try_from(f: &StructField) -> Result { - let metadata = f - .metadata() - .iter() - .map(|(key, val)| Ok((key.clone(), serde_json::to_string(val)?))) - .collect::>() - .map_err(|err| ArrowError::JsonError(err.to_string()))?; - - let field = ArrowField::new( - f.name(), - ArrowDataType::try_from(f.data_type())?, - f.is_nullable(), - ) - .with_metadata(metadata); - - Ok(field) - } -} - -impl TryFrom<&ArrayType> for ArrowField { - type Error = ArrowError; - - fn try_from(a: &ArrayType) -> Result { - Ok(ArrowField::new( - "item", - ArrowDataType::try_from(a.element_type())?, - a.contains_null(), - )) - } -} - -impl TryFrom<&MapType> for ArrowField { - type Error = ArrowError; - - fn try_from(a: &MapType) -> Result { - Ok(ArrowField::new( - "entries", - ArrowDataType::Struct( - vec![ - ArrowField::new("key", ArrowDataType::try_from(a.key_type())?, false), - ArrowField::new( - "value", - ArrowDataType::try_from(a.value_type())?, - a.value_contains_null(), - ), - ] - .into(), - ), - false, // always non-null - )) - } -} - -impl TryFrom<&DataType> for ArrowDataType { - type Error = ArrowError; - - fn try_from(t: &DataType) -> Result { - match t { - DataType::Primitive(p) => { - match p { - PrimitiveType::String => Ok(ArrowDataType::Utf8), - PrimitiveType::Long => Ok(ArrowDataType::Int64), // undocumented type - PrimitiveType::Integer => Ok(ArrowDataType::Int32), - PrimitiveType::Short => Ok(ArrowDataType::Int16), - PrimitiveType::Byte => Ok(ArrowDataType::Int8), - PrimitiveType::Float => Ok(ArrowDataType::Float32), - PrimitiveType::Double => Ok(ArrowDataType::Float64), - PrimitiveType::Boolean => Ok(ArrowDataType::Boolean), - PrimitiveType::Binary => Ok(ArrowDataType::Binary), - PrimitiveType::Decimal(precision, scale) => { - let precision = u8::try_from(*precision).map_err(|_| { - ArrowError::SchemaError(format!( - "Invalid precision for decimal: {}", - precision - )) - })?; - let scale = i8::try_from(*scale).map_err(|_| { - ArrowError::SchemaError(format!("Invalid scale for decimal: {}", scale)) - })?; - - if precision <= 38 { - Ok(ArrowDataType::Decimal128(precision, scale)) - } else if precision <= 76 { - Ok(ArrowDataType::Decimal256(precision, scale)) - } else { - Err(ArrowError::SchemaError(format!( - "Precision too large to be represented in Arrow: {}", - precision - ))) - } - } - PrimitiveType::Date => { - // A calendar date, represented as a year-month-day triple without a - // timezone. Stored as 4 bytes integer representing days since 1970-01-01 - Ok(ArrowDataType::Date32) - } - PrimitiveType::Timestamp => { - // Issue: https://github.com/delta-io/delta/issues/643 - Ok(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)) - } - } - } - DataType::Struct(s) => Ok(ArrowDataType::Struct( - s.fields() - .iter() - .map(>::try_from) - .collect::, ArrowError>>()? - .into(), - )), - DataType::Array(a) => Ok(ArrowDataType::List(Arc::new(>::try_from(a)?))), - DataType::Map(m) => Ok(ArrowDataType::Map( - Arc::new(ArrowField::new( - "entries", - ArrowDataType::Struct( - vec![ - ArrowField::new( - "keys", - >::try_from(m.key_type())?, - false, - ), - ArrowField::new( - "values", - >::try_from(m.value_type())?, - m.value_contains_null(), - ), - ] - .into(), - ), - false, - )), - false, - )), - } - } -} - -impl TryFrom<&ArrowSchema> for StructType { - type Error = ArrowError; - - fn try_from(arrow_schema: &ArrowSchema) -> Result { - let new_fields: Result, _> = arrow_schema - .fields() - .iter() - .map(|field| field.as_ref().try_into()) - .collect(); - Ok(StructType::new(new_fields?)) - } -} - -impl TryFrom for StructType { - type Error = ArrowError; - - fn try_from(arrow_schema: ArrowSchemaRef) -> Result { - arrow_schema.as_ref().try_into() - } -} - -impl TryFrom<&ArrowField> for StructField { - type Error = ArrowError; - - fn try_from(arrow_field: &ArrowField) -> Result { - Ok(StructField::new( - arrow_field.name().clone(), - arrow_field.data_type().try_into()?, - arrow_field.is_nullable(), - ) - .with_metadata(arrow_field.metadata().iter().map(|(k, v)| (k.clone(), v)))) - } -} - -impl TryFrom<&ArrowDataType> for DataType { - type Error = ArrowError; - - fn try_from(arrow_datatype: &ArrowDataType) -> Result { - match arrow_datatype { - ArrowDataType::Utf8 => Ok(DataType::Primitive(PrimitiveType::String)), - ArrowDataType::LargeUtf8 => Ok(DataType::Primitive(PrimitiveType::String)), - ArrowDataType::Int64 => Ok(DataType::Primitive(PrimitiveType::Long)), // undocumented type - ArrowDataType::Int32 => Ok(DataType::Primitive(PrimitiveType::Integer)), - ArrowDataType::Int16 => Ok(DataType::Primitive(PrimitiveType::Short)), - ArrowDataType::Int8 => Ok(DataType::Primitive(PrimitiveType::Byte)), - ArrowDataType::UInt64 => Ok(DataType::Primitive(PrimitiveType::Long)), // undocumented type - ArrowDataType::UInt32 => Ok(DataType::Primitive(PrimitiveType::Integer)), - ArrowDataType::UInt16 => Ok(DataType::Primitive(PrimitiveType::Short)), - ArrowDataType::UInt8 => Ok(DataType::Primitive(PrimitiveType::Boolean)), - ArrowDataType::Float32 => Ok(DataType::Primitive(PrimitiveType::Float)), - ArrowDataType::Float64 => Ok(DataType::Primitive(PrimitiveType::Double)), - ArrowDataType::Boolean => Ok(DataType::Primitive(PrimitiveType::Boolean)), - ArrowDataType::Binary => Ok(DataType::Primitive(PrimitiveType::Binary)), - ArrowDataType::FixedSizeBinary(_) => Ok(DataType::Primitive(PrimitiveType::Binary)), - ArrowDataType::LargeBinary => Ok(DataType::Primitive(PrimitiveType::Binary)), - ArrowDataType::Decimal128(p, s) => Ok(DataType::Primitive(PrimitiveType::Decimal( - *p as i32, *s as i32, - ))), - ArrowDataType::Decimal256(p, s) => Ok(DataType::Primitive(PrimitiveType::Decimal( - *p as i32, *s as i32, - ))), - ArrowDataType::Date32 => Ok(DataType::Primitive(PrimitiveType::Date)), - ArrowDataType::Date64 => Ok(DataType::Primitive(PrimitiveType::Date)), - ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => { - Ok(DataType::Primitive(PrimitiveType::Timestamp)) - } - ArrowDataType::Timestamp(TimeUnit::Microsecond, Some(tz)) - if tz.eq_ignore_ascii_case("utc") => - { - Ok(DataType::Primitive(PrimitiveType::Timestamp)) - } - ArrowDataType::Struct(fields) => { - let converted_fields: Result, _> = fields - .iter() - .map(|field| field.as_ref().try_into()) - .collect(); - Ok(DataType::Struct(Box::new(StructType::new( - converted_fields?, - )))) - } - ArrowDataType::List(field) => Ok(DataType::Array(Box::new(ArrayType::new( - (*field).data_type().try_into()?, - (*field).is_nullable(), - )))), - ArrowDataType::LargeList(field) => Ok(DataType::Array(Box::new(ArrayType::new( - (*field).data_type().try_into()?, - (*field).is_nullable(), - )))), - ArrowDataType::FixedSizeList(field, _) => Ok(DataType::Array(Box::new( - ArrayType::new((*field).data_type().try_into()?, (*field).is_nullable()), - ))), - ArrowDataType::Map(field, _) => { - if let ArrowDataType::Struct(struct_fields) = field.data_type() { - let key_type = struct_fields[0].data_type().try_into()?; - let value_type = struct_fields[1].data_type().try_into()?; - let value_type_nullable = struct_fields[1].is_nullable(); - Ok(DataType::Map(Box::new(MapType::new( - key_type, - value_type, - value_type_nullable, - )))) - } else { - panic!("DataType::Map should contain a struct field child"); - } - } - s => Err(ArrowError::SchemaError(format!( - "Invalid data type for Delta Lake: {s}" - ))), - } - } -} - -macro_rules! arrow_map { - ($fieldname: ident, null) => { - ArrowField::new( - stringify!($fieldname), - ArrowDataType::Map( - Arc::new(ArrowField::new( - "entries", - ArrowDataType::Struct( - vec![ - ArrowField::new("key", ArrowDataType::Utf8, false), - ArrowField::new("value", ArrowDataType::Utf8, true), - ] - .into(), - ), - false, - )), - false, - ), - true, - ) - }; - ($fieldname: ident, not_null) => { - ArrowField::new( - stringify!($fieldname), - ArrowDataType::Map( - Arc::new(ArrowField::new( - "entries", - ArrowDataType::Struct( - vec![ - ArrowField::new("key", ArrowDataType::Utf8, false), - ArrowField::new("value", ArrowDataType::Utf8, false), - ] - .into(), - ), - false, - )), - false, - ), - false, - ) - }; -} - -macro_rules! arrow_field { - ($fieldname:ident, $type_qual:ident, null) => { - ArrowField::new(stringify!($fieldname), ArrowDataType::$type_qual, true) - }; - ($fieldname:ident, $type_qual:ident, not_null) => { - ArrowField::new(stringify!($fieldname), ArrowDataType::$type_qual, false) - }; -} - -macro_rules! arrow_list { - ($fieldname:ident, $element_name:ident, $type_qual:ident, null) => { - ArrowField::new( - stringify!($fieldname), - ArrowDataType::List(Arc::new(ArrowField::new( - stringify!($element_name), - ArrowDataType::$type_qual, - true, - ))), - true, - ) - }; - ($fieldname:ident, $element_name:ident, $type_qual:ident, not_null) => { - ArrowField::new( - stringify!($fieldname), - ArrowDataType::List(Arc::new(ArrowField::new( - stringify!($element_name), - ArrowDataType::$type_qual, - true, - ))), - false, - ) - }; -} - -macro_rules! arrow_struct { - ($fieldname:ident, [$($inner:tt)+], null) => { - ArrowField::new( - stringify!($fieldname), - ArrowDataType::Struct( - arrow_defs! [$($inner)+].into() - ), - true - ) - }; - ($fieldname:ident, [$($inner:tt)+], not_null) => { - ArrowField::new( - stringify!($fieldname), - ArrowDataType::Struct( - arrow_defs! [$($inner)+].into() - ), - false - ) - } -} - -macro_rules! arrow_def { - ($fieldname:ident $(null)?) => { - arrow_map!($fieldname, null) - }; - ($fieldname:ident not_null) => { - arrow_map!($fieldname, not_null) - }; - ($fieldname:ident[$inner_name:ident]{$type_qual:ident} $(null)?) => { - arrow_list!($fieldname, $inner_name, $type_qual, null) - }; - ($fieldname:ident[$inner_name:ident]{$type_qual:ident} not_null) => { - arrow_list!($fieldname, $inner_name, $type_qual, not_null) - }; - ($fieldname:ident:$type_qual:ident $(null)?) => { - arrow_field!($fieldname, $type_qual, null) - }; - ($fieldname:ident:$type_qual:ident not_null) => { - arrow_field!($fieldname, $type_qual, not_null) - }; - ($fieldname:ident[$($inner:tt)+] $(null)?) => { - arrow_struct!($fieldname, [$($inner)+], null) - }; - ($fieldname:ident[$($inner:tt)+] not_null) => { - arrow_struct!($fieldname, [$($inner)+], not_null) - } -} - -/// A helper macro to create more readable Arrow field definitions, delimited by commas -/// -/// The argument patterns are as follows: -/// -/// fieldname (null|not_null)? -- An arrow field of type map with name "fieldname" consisting of Utf8 key-value pairs, and an -/// optional nullability qualifier (null if not specified). -/// -/// fieldname:type (null|not_null)? -- An Arrow field consisting of an atomic type. For example, -/// id:Utf8 gets mapped to ArrowField::new("id", ArrowDataType::Utf8, true). -/// where customerCount:Int64 not_null gets mapped to gets mapped to -/// ArrowField::new("customerCount", ArrowDataType::Utf8, true) -/// -/// fieldname[list_element]{list_element_type} (null|not_null)? -- An Arrow list, with the name of the elements wrapped in square brackets -/// and the type of the list elements wrapped in curly brackets. For example, -/// customers[name]{Utf8} is an nullable arrow field of type arrow list consisting -/// of elements called "name" with type Utf8. -/// -/// fieldname[element1, element2, element3, ....] (null|not_null)? -- An arrow struct with name "fieldname" consisting of elements adhering to any of the patterns -/// documented, including additional structs arbitrarily nested up to the recursion -/// limit for Rust macros. -macro_rules! arrow_defs { - () => { - vec![] as Vec - }; - ($($fieldname:ident$(:$type_qual:ident)?$([$($inner:tt)+])?$({$list_type_qual:ident})? $($nullable:ident)?),+) => { - vec![ - $(arrow_def!($fieldname$(:$type_qual)?$([$($inner)+])?$({$list_type_qual})? $($nullable)?)),+ - ] - } -} - -/// Returns an arrow schema representing the delta log for use in checkpoints -/// -/// # Arguments -/// -/// * `table_schema` - The arrow schema representing the table backed by the delta log -/// * `partition_columns` - The list of partition columns of the table. -/// * `use_extended_remove_schema` - Whether to include extended file metadata in remove action schema. -/// Required for compatibility with different versions of Databricks runtime. -pub(crate) fn delta_log_schema_for_table( - table_schema: ArrowSchema, - partition_columns: &[String], - use_extended_remove_schema: bool, -) -> ArrowSchemaRef { - lazy_static! { - static ref SCHEMA_FIELDS: Vec = arrow_defs![ - metaData[ - id:Utf8, - name:Utf8, - description:Utf8, - schemaString:Utf8, - createdTime:Int64, - partitionColumns[element]{Utf8}, - configuration, - format[provider:Utf8, options] - ], - protocol[ - minReaderVersion:Int32, - minWriterVersion:Int32 - ], - txn[ - appId:Utf8, - version:Int64 - ] - ]; - static ref ADD_FIELDS: Vec = arrow_defs![ - path:Utf8, - size:Int64, - modificationTime:Int64, - dataChange:Boolean, - stats:Utf8, - partitionValues, - tags, - deletionVector[ - storageType:Utf8 not_null, - pathOrInlineDv:Utf8 not_null, - offset:Int32 null, - sizeInBytes:Int32 not_null, - cardinality:Int64 not_null - ] - ]; - static ref REMOVE_FIELDS: Vec = arrow_defs![ - path: Utf8, - deletionTimestamp: Int64, - dataChange: Boolean, - extendedFileMetadata: Boolean - ]; - static ref REMOVE_EXTENDED_FILE_METADATA_FIELDS: Vec = - arrow_defs![size: Int64, partitionValues, tags]; - }; - - // create add fields according to the specific data table schema - let (partition_fields, non_partition_fields): (Vec, Vec) = - table_schema - .fields() - .iter() - .map(|field| field.to_owned()) - .partition(|field| partition_columns.contains(field.name())); - - let mut stats_parsed_fields: Vec = - vec![ArrowField::new("numRecords", ArrowDataType::Int64, true)]; - if !non_partition_fields.is_empty() { - let mut max_min_vec = Vec::new(); - non_partition_fields - .iter() - .for_each(|f| max_min_schema_for_fields(&mut max_min_vec, f)); - - stats_parsed_fields.extend(["minValues", "maxValues"].into_iter().map(|name| { - ArrowField::new( - name, - ArrowDataType::Struct(max_min_vec.clone().into()), - true, - ) - })); - - let mut null_count_vec = Vec::new(); - non_partition_fields - .iter() - .for_each(|f| null_count_schema_for_fields(&mut null_count_vec, f)); - let null_count_struct = ArrowField::new( - "nullCount", - ArrowDataType::Struct(null_count_vec.into()), - true, - ); - - stats_parsed_fields.push(null_count_struct); - } - let mut add_fields = ADD_FIELDS.clone(); - add_fields.push(ArrowField::new( - "stats_parsed", - ArrowDataType::Struct(stats_parsed_fields.into()), - true, - )); - if !partition_fields.is_empty() { - add_fields.push(ArrowField::new( - "partitionValues_parsed", - ArrowDataType::Struct(partition_fields.into()), - true, - )); - } - - // create remove fields with or without extendedFileMetadata - let mut remove_fields = REMOVE_FIELDS.clone(); - if use_extended_remove_schema { - remove_fields.extend(REMOVE_EXTENDED_FILE_METADATA_FIELDS.clone()); - } - - // include add and remove fields in checkpoint schema - let mut schema_fields = SCHEMA_FIELDS.clone(); - schema_fields.push(ArrowField::new( - "add", - ArrowDataType::Struct(add_fields.into()), - true, - )); - schema_fields.push(ArrowField::new( - "remove", - ArrowDataType::Struct(remove_fields.into()), - true, - )); - - let arrow_schema = ArrowSchema::new(schema_fields); - - std::sync::Arc::new(arrow_schema) -} - -fn max_min_schema_for_fields(dest: &mut Vec, f: &ArrowField) { - match f.data_type() { - ArrowDataType::Struct(struct_fields) => { - let mut child_dest = Vec::new(); - - for f in struct_fields { - max_min_schema_for_fields(&mut child_dest, f); - } - - dest.push(ArrowField::new( - f.name(), - ArrowDataType::Struct(child_dest.into()), - true, - )); - } - // don't compute min or max for list, map or binary types - ArrowDataType::List(_) | ArrowDataType::Map(_, _) | ArrowDataType::Binary => { /* noop */ } - _ => { - let f = f.clone(); - dest.push(f); - } - } -} - -fn null_count_schema_for_fields(dest: &mut Vec, f: &ArrowField) { - match f.data_type() { - ArrowDataType::Struct(struct_fields) => { - let mut child_dest = Vec::new(); - - for f in struct_fields { - null_count_schema_for_fields(&mut child_dest, f); - } - - dest.push(ArrowField::new( - f.name(), - ArrowDataType::Struct(child_dest.into()), - true, - )); - } - _ => { - let f = ArrowField::new(f.name(), ArrowDataType::Int64, true); - dest.push(f); - } - } -} - -#[cfg(test)] -mod tests { - use arrow::array::ArrayData; - use arrow_array::Array; - use arrow_array::{make_array, ArrayRef, MapArray, StringArray, StructArray}; - use arrow_buffer::{Buffer, ToByteSlice}; - use arrow_schema::Field; - - use super::*; - use std::collections::HashMap; - use std::sync::Arc; - - #[test] - fn delta_log_schema_for_table_test() { - // NOTE: We should future proof the checkpoint schema in case action schema changes. - // See https://github.com/delta-io/delta-rs/issues/287 - - let table_schema = ArrowSchema::new(vec![ - ArrowField::new("pcol", ArrowDataType::Int32, true), - ArrowField::new("col1", ArrowDataType::Int32, true), - ]); - let partition_columns = vec!["pcol".to_string()]; - let log_schema = - delta_log_schema_for_table(table_schema.clone(), partition_columns.as_slice(), false); - - // verify top-level schema contains all expected fields and they are named correctly. - let expected_fields = ["metaData", "protocol", "txn", "remove", "add"]; - for f in log_schema.fields().iter() { - assert!(expected_fields.contains(&f.name().as_str())); - } - assert_eq!(5, log_schema.fields().len()); - - // verify add fields match as expected. a lot of transformation goes into these. - let add_fields: Vec<_> = log_schema - .fields() - .iter() - .filter(|f| f.name() == "add") - .flat_map(|f| { - if let ArrowDataType::Struct(fields) = f.data_type() { - fields.iter().cloned() - } else { - unreachable!(); - } - }) - .collect(); - let field_names: Vec<&String> = add_fields.iter().map(|v| v.name()).collect(); - assert_eq!( - vec![ - "path", - "size", - "modificationTime", - "dataChange", - "stats", - "partitionValues", - "tags", - "deletionVector", - "stats_parsed", - "partitionValues_parsed" - ], - field_names - ); - let add_field_map: HashMap<_, _> = add_fields - .iter() - .map(|f| (f.name().to_owned(), f.clone())) - .collect(); - let partition_values_parsed = add_field_map.get("partitionValues_parsed").unwrap(); - if let ArrowDataType::Struct(fields) = partition_values_parsed.data_type() { - assert_eq!(1, fields.len()); - let field = fields.get(0).unwrap().to_owned(); - assert_eq!( - Arc::new(ArrowField::new("pcol", ArrowDataType::Int32, true)), - field - ); - } else { - unreachable!(); - } - let stats_parsed = add_field_map.get("stats_parsed").unwrap(); - if let ArrowDataType::Struct(fields) = stats_parsed.data_type() { - assert_eq!(4, fields.len()); - - let field_map: HashMap<_, _> = fields - .iter() - .map(|f| (f.name().to_owned(), f.clone())) - .collect(); - - for (k, v) in field_map.iter() { - match k.as_ref() { - "minValues" | "maxValues" | "nullCount" => match v.data_type() { - ArrowDataType::Struct(fields) => { - assert_eq!(1, fields.len()); - let field = fields.get(0).unwrap().to_owned(); - let data_type = if k == "nullCount" { - ArrowDataType::Int64 - } else { - ArrowDataType::Int32 - }; - assert_eq!(Arc::new(ArrowField::new("col1", data_type, true)), field); - } - _ => unreachable!(), - }, - "numRecords" => {} - _ => panic!(), - } - } - } else { - unreachable!(); - } - - // verify extended remove schema fields **ARE NOT** included when `use_extended_remove_schema` is false. - let num_remove_fields = log_schema - .fields() - .iter() - .filter(|f| f.name() == "remove") - .flat_map(|f| { - if let ArrowDataType::Struct(fields) = f.data_type() { - fields.iter().cloned() - } else { - unreachable!(); - } - }) - .count(); - assert_eq!(4, num_remove_fields); - - // verify extended remove schema fields **ARE** included when `use_extended_remove_schema` is true. - let log_schema = - delta_log_schema_for_table(table_schema, partition_columns.as_slice(), true); - let remove_fields: Vec<_> = log_schema - .fields() - .iter() - .filter(|f| f.name() == "remove") - .flat_map(|f| { - if let ArrowDataType::Struct(fields) = f.data_type() { - fields.iter().cloned() - } else { - unreachable!(); - } - }) - .collect(); - assert_eq!(7, remove_fields.len()); - let expected_fields = [ - "path", - "deletionTimestamp", - "dataChange", - "extendedFileMetadata", - "partitionValues", - "size", - "tags", - ]; - for f in remove_fields.iter() { - assert!(expected_fields.contains(&f.name().as_str())); - } - } - - #[test] - fn test_arrow_from_delta_decimal_type() { - let precision = 20; - let scale = 2; - let decimal_field = DataType::Primitive(PrimitiveType::Decimal(precision, scale)); - assert_eq!( - >::try_from(&decimal_field).unwrap(), - ArrowDataType::Decimal128(precision as u8, scale as i8) - ); - } - - #[test] - fn test_arrow_from_delta_timestamp_type() { - let timestamp_field = DataType::Primitive(PrimitiveType::Timestamp); - assert_eq!( - >::try_from(×tamp_field).unwrap(), - ArrowDataType::Timestamp(TimeUnit::Microsecond, None) - ); - } - - #[test] - fn test_delta_from_arrow_timestamp_type() { - let timestamp_field = ArrowDataType::Timestamp(TimeUnit::Microsecond, None); - assert_eq!( - >::try_from(×tamp_field).unwrap(), - DataType::Primitive(PrimitiveType::Timestamp) - ); - } - - #[test] - fn test_delta_from_arrow_timestamp_type_with_tz() { - let timestamp_field = - ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC".to_string().into())); - assert_eq!( - >::try_from(×tamp_field).unwrap(), - DataType::Primitive(PrimitiveType::Timestamp) - ); - } - - #[test] - fn test_delta_from_arrow_map_type() { - let arrow_map = ArrowDataType::Map( - Arc::new(ArrowField::new( - "entries", - ArrowDataType::Struct( - vec![ - ArrowField::new("key", ArrowDataType::Int8, false), - ArrowField::new("value", ArrowDataType::Binary, true), - ] - .into(), - ), - false, - )), - false, - ); - let converted_map: DataType = (&arrow_map).try_into().unwrap(); - - assert_eq!( - converted_map, - DataType::Map(Box::new(MapType::new( - DataType::Primitive(PrimitiveType::Byte), - DataType::Primitive(PrimitiveType::Binary), - true, - ))) - ); - } - - #[test] - fn test_record_batch_from_map_type() { - let keys = vec!["0", "1", "5", "6", "7"]; - let values: Vec<&[u8]> = vec![ - b"test_val_1", - b"test_val_2", - b"long_test_val_3", - b"4", - b"test_val_5", - ]; - let entry_offsets = vec![0u32, 1, 1, 4, 5, 5]; - let num_rows = keys.len(); - - // Copied the function `new_from_string` with the patched code from https://github.com/apache/arrow-rs/pull/4808 - // This should be reverted back [`MapArray::new_from_strings`] once arrow is upgraded in this project. - fn new_from_strings<'a>( - keys: impl Iterator, - values: &dyn Array, - entry_offsets: &[u32], - ) -> Result { - let entry_offsets_buffer = Buffer::from(entry_offsets.to_byte_slice()); - let keys_data = StringArray::from_iter_values(keys); - - let keys_field = Arc::new(Field::new("keys", ArrowDataType::Utf8, false)); - let values_field = Arc::new(Field::new( - "values", - values.data_type().clone(), - values.null_count() > 0, - )); - - let entry_struct = StructArray::from(vec![ - (keys_field, Arc::new(keys_data) as ArrayRef), - (values_field, make_array(values.to_data())), - ]); - - let map_data_type = ArrowDataType::Map( - Arc::new(Field::new( - "entries", - entry_struct.data_type().clone(), - false, - )), - false, - ); - - let map_data = ArrayData::builder(map_data_type) - .len(entry_offsets.len() - 1) - .add_buffer(entry_offsets_buffer) - .add_child_data(entry_struct.into_data()) - .build()?; - - Ok(MapArray::from(map_data)) - } - - let map_array = new_from_strings( - keys.into_iter(), - &arrow::array::BinaryArray::from(values), - entry_offsets.as_slice(), - ) - .expect("Could not create a map array"); - - let schema = - >::try_from(&StructType::new(vec![ - StructField::new( - "example".to_string(), - DataType::Map(Box::new(MapType::new( - DataType::Primitive(PrimitiveType::String), - DataType::Primitive(PrimitiveType::Binary), - false, - ))), - false, - ), - ])) - .expect("Could not get schema"); - - let record_batch = - arrow::record_batch::RecordBatch::try_new(Arc::new(schema), vec![Arc::new(map_array)]) - .expect("Failed to create RecordBatch"); - - assert_eq!(record_batch.num_columns(), 1); - assert_eq!(record_batch.num_rows(), num_rows); - } - - #[test] - fn test_max_min_schema_for_fields() { - let mut max_min_vec: Vec = Vec::new(); - let fields = [ - ArrowField::new("simple", ArrowDataType::Int32, true), - ArrowField::new( - "struct", - ArrowDataType::Struct( - vec![ArrowField::new("simple", ArrowDataType::Int32, true)].into(), - ), - true, - ), - ArrowField::new( - "list", - ArrowDataType::List(Arc::new(ArrowField::new( - "simple", - ArrowDataType::Int32, - true, - ))), - true, - ), - ArrowField::new( - "map", - ArrowDataType::Map( - Arc::new(ArrowField::new( - "struct", - ArrowDataType::Struct( - vec![ - ArrowField::new("key", ArrowDataType::Int32, true), - ArrowField::new("value", ArrowDataType::Int32, true), - ] - .into(), - ), - true, - )), - true, - ), - true, - ), - ArrowField::new("binary", ArrowDataType::Binary, true), - ]; - - let expected = vec![fields[0].clone(), fields[1].clone()]; - - fields - .iter() - .for_each(|f| max_min_schema_for_fields(&mut max_min_vec, f)); - - assert_eq!(max_min_vec, expected); - } - - #[test] - fn test_null_count_schema_for_fields() { - let mut null_count_vec: Vec = Vec::new(); - let fields = [ - ArrowField::new("int32", ArrowDataType::Int32, true), - ArrowField::new("int64", ArrowDataType::Int64, true), - ArrowField::new("Utf8", ArrowDataType::Utf8, true), - ArrowField::new( - "list", - ArrowDataType::List(Arc::new(ArrowField::new( - "simple", - ArrowDataType::Int32, - true, - ))), - true, - ), - ArrowField::new( - "map", - ArrowDataType::Map( - Arc::new(ArrowField::new( - "struct", - ArrowDataType::Struct( - vec![ - ArrowField::new("key", ArrowDataType::Int32, true), - ArrowField::new("value", ArrowDataType::Int32, true), - ] - .into(), - ), - true, - )), - true, - ), - true, - ), - ArrowField::new( - "struct", - ArrowDataType::Struct( - vec![ArrowField::new("int32", ArrowDataType::Int32, true)].into(), - ), - true, - ), - ]; - let expected = vec![ - ArrowField::new(fields[0].name(), ArrowDataType::Int64, true), - ArrowField::new(fields[1].name(), ArrowDataType::Int64, true), - ArrowField::new(fields[2].name(), ArrowDataType::Int64, true), - ArrowField::new(fields[3].name(), ArrowDataType::Int64, true), - ArrowField::new(fields[4].name(), ArrowDataType::Int64, true), - ArrowField::new( - fields[5].name(), - ArrowDataType::Struct( - vec![ArrowField::new("int32", ArrowDataType::Int64, true)].into(), - ), - true, - ), - ]; - fields - .iter() - .for_each(|f| null_count_schema_for_fields(&mut null_count_vec, f)); - assert_eq!(null_count_vec, expected); - } - - /* - * This test validates the trait implementation of - * TryFrom<&Arc> for schema::SchemaField which is required with Arrow 37 since - * iterators on Fields will give an &Arc - */ - #[test] - fn tryfrom_arrowfieldref_with_structs() { - let field = Arc::new(ArrowField::new( - "test_struct", - ArrowDataType::Struct( - vec![ - ArrowField::new("key", ArrowDataType::Int32, true), - ArrowField::new("value", ArrowDataType::Int32, true), - ] - .into(), - ), - true, - )); - let _converted: StructField = field.as_ref().try_into().unwrap(); - } -} diff --git a/crates/deltalake-core/src/table/config.rs b/crates/deltalake-core/src/table/config.rs index 3fa021ce6e..5b82b401b6 100644 --- a/crates/deltalake-core/src/table/config.rs +++ b/crates/deltalake-core/src/table/config.rs @@ -11,6 +11,7 @@ use crate::errors::DeltaTableError; /// /// #[derive(PartialEq, Eq, Hash)] +#[non_exhaustive] pub enum DeltaConfigKey { /// true for this Delta table to be append-only. If append-only, /// existing records cannot be deleted, and existing values cannot be updated. @@ -100,6 +101,9 @@ pub enum DeltaConfigKey { /// The target file size in bytes or higher units for file tuning. For example, 104857600 (bytes) or 100mb. TuneFileSizesForRewrites, + + /// 'classic' for classic Delta Lake checkpoints. 'v2' for v2 checkpoints. + CheckpointPolicy, } impl AsRef for DeltaConfigKey { @@ -111,6 +115,7 @@ impl AsRef for DeltaConfigKey { Self::AutoOptimizeOptimizeWrite => "delta.autoOptimize.optimizeWrite", Self::CheckpointWriteStatsAsJson => "delta.checkpoint.writeStatsAsJson", Self::CheckpointWriteStatsAsStruct => "delta.checkpoint.writeStatsAsStruct", + Self::CheckpointPolicy => "delta.checkpointPolicy", Self::ColumnMappingMode => "delta.columnMapping.mode", Self::DataSkippingNumIndexedCols => "delta.dataSkippingNumIndexedCols", Self::DeletedFileRetentionDuration => "delta.deletedFileRetentionDuration", @@ -140,6 +145,7 @@ impl FromStr for DeltaConfigKey { "delta.autoOptimize.optimizeWrite" => Ok(Self::AutoOptimizeOptimizeWrite), "delta.checkpoint.writeStatsAsJson" => Ok(Self::CheckpointWriteStatsAsJson), "delta.checkpoint.writeStatsAsStruct" => Ok(Self::CheckpointWriteStatsAsStruct), + "delta.checkpointPolicy" => Ok(Self::CheckpointPolicy), "delta.columnMapping.mode" => Ok(Self::ColumnMappingMode), "delta.dataSkippingNumIndexedCols" => Ok(Self::DataSkippingNumIndexedCols), "delta.deletedFileRetentionDuration" | "deletedFileRetentionDuration" => { @@ -280,6 +286,14 @@ impl<'a> TableConfig<'a> { .and_then(|o| o.as_ref().and_then(|v| v.parse().ok())) .unwrap_or_default() } + + /// Policy applied during chepoint creation + pub fn checkpoint_policy(&self) -> CheckpointPolicy { + self.0 + .get(DeltaConfigKey::CheckpointPolicy.as_ref()) + .and_then(|o| o.as_ref().and_then(|v| v.parse().ok())) + .unwrap_or_default() + } } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] @@ -338,6 +352,48 @@ impl FromStr for IsolationLevel { } } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +/// The checkpoint policy applied when writing checkpoints +#[serde(rename_all = "camelCase")] +pub enum CheckpointPolicy { + /// classic Delta Lake checkpoints + Classic, + /// v2 checkpoints + V2, + /// unknown checkpoint policy + Other(String), +} + +impl Default for CheckpointPolicy { + fn default() -> Self { + Self::Classic + } +} + +impl AsRef for CheckpointPolicy { + fn as_ref(&self) -> &str { + match self { + Self::Classic => "classic", + Self::V2 => "v2", + Self::Other(s) => s, + } + } +} + +impl FromStr for CheckpointPolicy { + type Err = DeltaTableError; + + fn from_str(s: &str) -> Result { + match s.to_ascii_lowercase().as_str() { + "classic" => Ok(Self::Classic), + "v2" => Ok(Self::V2), + _ => Err(DeltaTableError::Generic( + "Invalid string for CheckpointPolicy".into(), + )), + } + } +} + const SECONDS_PER_MINUTE: u64 = 60; const SECONDS_PER_HOUR: u64 = 60 * SECONDS_PER_MINUTE; const SECONDS_PER_DAY: u64 = 24 * SECONDS_PER_HOUR;