diff --git a/crates/deltalake-core/src/errors.rs b/crates/deltalake-core/src/errors.rs index bd088e9a4f..aaa21a4801 100644 --- a/crates/deltalake-core/src/errors.rs +++ b/crates/deltalake-core/src/errors.rs @@ -211,6 +211,9 @@ pub enum DeltaTableError { #[from] source: crate::kernel::Error, }, + + #[error("Table metadata is invalid: {0}")] + MetadataError(String), } impl From for DeltaTableError { diff --git a/crates/deltalake-core/src/kernel/error.rs b/crates/deltalake-core/src/kernel/error.rs index a37dbdae67..d4110f8f53 100644 --- a/crates/deltalake-core/src/kernel/error.rs +++ b/crates/deltalake-core/src/kernel/error.rs @@ -64,6 +64,9 @@ pub enum Error { /// Invariant expression. line: String, }, + + #[error("Table metadata is invalid: {0}")] + MetadataError(String), } #[cfg(feature = "object_store")] diff --git a/crates/deltalake-core/src/kernel/schema.rs b/crates/deltalake-core/src/kernel/schema.rs index 7694501dca..bc83c05070 100644 --- a/crates/deltalake-core/src/kernel/schema.rs +++ b/crates/deltalake-core/src/kernel/schema.rs @@ -166,6 +166,20 @@ impl StructField { self.nullable } + /// Returns the physical name of the column + /// Equals the name if column mapping is not enabled on table + pub fn physical_name(&self) -> Result<&str, Error> { + // Even on mapping type id the physical name should be there for partitions + let phys_name = self.get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName); + match phys_name { + None => Ok(&self.name), + Some(MetadataValue::String(s)) => Ok(s), + Some(MetadataValue::Number(_)) => Err(Error::MetadataError( + "Unexpected type for physical name".to_string(), + )), + } + } + #[inline] /// Returns the data type of the column pub const fn data_type(&self) -> &DataType { diff --git a/crates/deltalake-core/src/protocol/mod.rs b/crates/deltalake-core/src/protocol/mod.rs index e2add9b529..264a45eb8b 100644 --- a/crates/deltalake-core/src/protocol/mod.rs +++ b/crates/deltalake-core/src/protocol/mod.rs @@ -1100,6 +1100,80 @@ mod tests { assert_eq!(expected, actions); } + #[tokio::test] + async fn test_with_column_mapping() { + // test table with column mapping and partitions + let path = "./tests/data/table_with_column_mapping"; + let table = crate::open_table(path).await.unwrap(); + let actions = table.get_state().add_actions_table(true).unwrap(); + let expected_columns: Vec<(&str, ArrayRef)> = vec![ + ( + "path", + Arc::new(array::StringArray::from(vec![ + "BH/part-00000-4d6e745c-8e04-48d9-aa60-438228358f1a.c000.zstd.parquet", + "8v/part-00001-69b4a452-aeac-4ffa-bf5c-a0c2833d05eb.c000.zstd.parquet", + ])), + ), + ( + "size_bytes", + Arc::new(array::Int64Array::from(vec![890, 810])), + ), + ( + "modification_time", + Arc::new(arrow::array::TimestampMillisecondArray::from(vec![ + 1699946088000, + 1699946088000, + ])), + ), + ( + "data_change", + Arc::new(array::BooleanArray::from(vec![true, true])), + ), + ( + "partition.Company Very Short", + Arc::new(array::StringArray::from(vec!["BMS", "BME"])), + ), + ("num_records", Arc::new(array::Int64Array::from(vec![4, 1]))), + ( + "null_count.Company Very Short", + Arc::new(array::NullArray::new(2)), + ), + ("min.Company Very Short", Arc::new(array::NullArray::new(2))), + ("max.Company Very Short", Arc::new(array::NullArray::new(2))), + ("null_count.Super Name", Arc::new(array::NullArray::new(2))), + ("min.Super Name", Arc::new(array::NullArray::new(2))), + ("max.Super Name", Arc::new(array::NullArray::new(2))), + ( + "tags.INSERTION_TIME", + Arc::new(array::StringArray::from(vec![ + "1699946088000000", + "1699946088000001", + ])), + ), + ( + "tags.MAX_INSERTION_TIME", + Arc::new(array::StringArray::from(vec![ + "1699946088000000", + "1699946088000001", + ])), + ), + ( + "tags.MIN_INSERTION_TIME", + Arc::new(array::StringArray::from(vec![ + "1699946088000000", + "1699946088000001", + ])), + ), + ( + "tags.OPTIMIZE_TARGET_SIZE", + Arc::new(array::StringArray::from(vec!["33554432", "33554432"])), + ), + ]; + let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap(); + + assert_eq!(expected, actions); + } + #[tokio::test] async fn test_with_stats() { // test table with stats diff --git a/crates/deltalake-core/src/table/config.rs b/crates/deltalake-core/src/table/config.rs index 5b82b401b6..79130de028 100644 --- a/crates/deltalake-core/src/table/config.rs +++ b/crates/deltalake-core/src/table/config.rs @@ -294,6 +294,14 @@ impl<'a> TableConfig<'a> { .and_then(|o| o.as_ref().and_then(|v| v.parse().ok())) .unwrap_or_default() } + + /// Return the column mapping mode according to delta.columnMapping.mode + pub fn column_mapping_mode(&self) -> ColumnMappingMode { + self.0 + .get(DeltaConfigKey::ColumnMappingMode.as_ref()) + .and_then(|o| o.as_ref().and_then(|v| v.parse().ok())) + .unwrap_or_default() + } } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] @@ -394,6 +402,49 @@ impl FromStr for CheckpointPolicy { } } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +/// The Column Mapping modes used for reading and writing data +#[serde(rename_all = "camelCase")] +pub enum ColumnMappingMode { + /// No column mapping is applied + None, + /// Columns are mapped by their field_id in parquet + Id, + /// Columns are mapped to a physical name + Name, +} + +impl Default for ColumnMappingMode { + fn default() -> Self { + Self::None + } +} + +impl AsRef for ColumnMappingMode { + fn as_ref(&self) -> &str { + match self { + Self::None => "none", + Self::Id => "id", + Self::Name => "name", + } + } +} + +impl FromStr for ColumnMappingMode { + type Err = DeltaTableError; + + fn from_str(s: &str) -> Result { + match s.to_ascii_lowercase().as_str() { + "none" => Ok(Self::None), + "id" => Ok(Self::Id), + "name" => Ok(Self::Name), + _ => Err(DeltaTableError::Generic( + "Invalid string for ColumnMappingMode".into(), + )), + } + } +} + const SECONDS_PER_MINUTE: u64 = 60; const SECONDS_PER_HOUR: u64 = 60 * SECONDS_PER_MINUTE; const SECONDS_PER_DAY: u64 = 24 * SECONDS_PER_HOUR; diff --git a/crates/deltalake-core/src/table/state_arrow.rs b/crates/deltalake-core/src/table/state_arrow.rs index 9d82c87326..3dbb460879 100644 --- a/crates/deltalake-core/src/table/state_arrow.rs +++ b/crates/deltalake-core/src/table/state_arrow.rs @@ -16,6 +16,7 @@ use arrow_array::{ use arrow_schema::{DataType, Field, Fields, TimeUnit}; use itertools::Itertools; +use super::config::ColumnMappingMode; use super::state::DeltaTableState; use crate::errors::DeltaTableError; use crate::kernel::{DataType as DeltaDataType, StructType}; @@ -145,7 +146,7 @@ impl DeltaTableState { flatten: bool, ) -> Result { let metadata = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?; - + let column_mapping_mode = self.table_config().column_mapping_mode(); let partition_column_types: Vec = metadata .partition_columns .iter() @@ -167,13 +168,44 @@ impl DeltaTableState { }) .collect::>(); + let physical_name_to_logical_name = match column_mapping_mode { + ColumnMappingMode::None => HashMap::with_capacity(0), // No column mapping, no need for this HashMap + ColumnMappingMode::Id | ColumnMappingMode::Name => metadata + .partition_columns + .iter() + .map(|name| -> Result<_, DeltaTableError> { + let physical_name = metadata + .schema + .field_with_name(name) + .or(Err(DeltaTableError::MetadataError(format!( + "Invalid partition column {0}", + name + ))))? + .physical_name() + .map_err(|e| DeltaTableError::Kernel { source: e })?; + Ok((physical_name, name.as_str())) + }) + .collect::, DeltaTableError>>()?, + }; // Append values for action in self.files() { for (name, maybe_value) in action.partition_values.iter() { + let logical_name = match column_mapping_mode { + ColumnMappingMode::None => name.as_str(), + ColumnMappingMode::Id | ColumnMappingMode::Name => { + physical_name_to_logical_name.get(name.as_str()).ok_or( + DeltaTableError::MetadataError(format!( + "Invalid partition column {0}", + name + )), + )? + } + }; if let Some(value) = maybe_value { - builders.get_mut(name.as_str()).unwrap().append_value(value); + builders.get_mut(logical_name).unwrap().append_value(value); + // Unwrap is safe here since the name exists in the mapping where we check validity already } else { - builders.get_mut(name.as_str()).unwrap().append_null(); + builders.get_mut(logical_name).unwrap().append_null(); } } } diff --git a/crates/deltalake-core/tests/data/table_with_column_mapping/8v/part-00001-69b4a452-aeac-4ffa-bf5c-a0c2833d05eb.c000.zstd.parquet b/crates/deltalake-core/tests/data/table_with_column_mapping/8v/part-00001-69b4a452-aeac-4ffa-bf5c-a0c2833d05eb.c000.zstd.parquet new file mode 100644 index 0000000000..f8aadf03ba Binary files /dev/null and b/crates/deltalake-core/tests/data/table_with_column_mapping/8v/part-00001-69b4a452-aeac-4ffa-bf5c-a0c2833d05eb.c000.zstd.parquet differ diff --git a/crates/deltalake-core/tests/data/table_with_column_mapping/BH/part-00000-4d6e745c-8e04-48d9-aa60-438228358f1a.c000.zstd.parquet b/crates/deltalake-core/tests/data/table_with_column_mapping/BH/part-00000-4d6e745c-8e04-48d9-aa60-438228358f1a.c000.zstd.parquet new file mode 100644 index 0000000000..4fd4d407c9 Binary files /dev/null and b/crates/deltalake-core/tests/data/table_with_column_mapping/BH/part-00000-4d6e745c-8e04-48d9-aa60-438228358f1a.c000.zstd.parquet differ diff --git a/crates/deltalake-core/tests/data/table_with_column_mapping/_delta_log/00000000000000000000.crc b/crates/deltalake-core/tests/data/table_with_column_mapping/_delta_log/00000000000000000000.crc new file mode 100644 index 0000000000..cb4c652bec --- /dev/null +++ b/crates/deltalake-core/tests/data/table_with_column_mapping/_delta_log/00000000000000000000.crc @@ -0,0 +1 @@ +{"txnId":"0e8eece8-347f-4c77-bc4f-daf3a5985dc9","tableSizeBytes":1700,"numFiles":2,"numMetadata":1,"numProtocol":1,"setTransactions":[],"domainMetadata":[],"metadata":{"id":"592de637-dd77-4aaa-af00-97d723a7f1f1","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"Company Very Short\",\"type\":\"string\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":1,\"delta.columnMapping.physicalName\":\"col-173b4db9-b5ad-427f-9e75-516aae37fbbb\"}},{\"name\":\"Super Name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":2,\"delta.columnMapping.physicalName\":\"col-3877fd94-0973-4941-ac6b-646849a1ff65\"}}]}","partitionColumns":["Company Very Short"],"configuration":{"delta.columnMapping.mode":"name","delta.autoOptimize.optimizeWrite":"true","delta.columnMapping.maxColumnId":"2","delta.targetFileSize":"33554432","delta.tuneFileSizesForRewrites":"true"},"createdTime":1699946083038},"protocol":{"minReaderVersion":2,"minWriterVersion":5},"histogramOpt":{"sortedBinBoundaries":[0,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,12582912,16777216,20971520,25165824,29360128,33554432,37748736,41943040,50331648,58720256,67108864,75497472,83886080,92274688,100663296,109051904,117440512,125829120,130023424,134217728,138412032,142606336,146800640,150994944,167772160,184549376,201326592,218103808,234881024,251658240,268435456,285212672,301989888,318767104,335544320,352321536,369098752,385875968,402653184,419430400,436207616,452984832,469762048,486539264,503316480,520093696,536870912,553648128,570425344,587202560,603979776,671088640,738197504,805306368,872415232,939524096,1006632960,1073741824,1140850688,1207959552,1275068416,1342177280,1409286144,1476395008,1610612736,1744830464,1879048192,2013265920,2147483648,2415919104,2684354560,2952790016,3221225472,3489660928,3758096384,4026531840,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944],"fileCounts":[2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"totalBytes":[1700,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"allFiles":[{"path":"8v/part-00001-69b4a452-aeac-4ffa-bf5c-a0c2833d05eb.c000.zstd.parquet","partitionValues":{"col-173b4db9-b5ad-427f-9e75-516aae37fbbb":"BME"},"size":810,"modificationTime":1699946088000,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":\"Timothy Lamb\"},\"maxValues\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":\"Timothy Lamb\"},\"nullCount\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":0}}","tags":{"INSERTION_TIME":"1699946088000001","MIN_INSERTION_TIME":"1699946088000001","MAX_INSERTION_TIME":"1699946088000001","OPTIMIZE_TARGET_SIZE":"33554432"}},{"path":"BH/part-00000-4d6e745c-8e04-48d9-aa60-438228358f1a.c000.zstd.parquet","partitionValues":{"col-173b4db9-b5ad-427f-9e75-516aae37fbbb":"BMS"},"size":890,"modificationTime":1699946088000,"dataChange":false,"stats":"{\"numRecords\":4,\"minValues\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":\"Anthony Johnson\"},\"maxValues\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":\"Stephanie Mcgrath\"},\"nullCount\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":0}}","tags":{"INSERTION_TIME":"1699946088000000","MIN_INSERTION_TIME":"1699946088000000","MAX_INSERTION_TIME":"1699946088000000","OPTIMIZE_TARGET_SIZE":"33554432"}}]} diff --git a/crates/deltalake-core/tests/data/table_with_column_mapping/_delta_log/00000000000000000000.json b/crates/deltalake-core/tests/data/table_with_column_mapping/_delta_log/00000000000000000000.json new file mode 100644 index 0000000000..c7f52c72d3 --- /dev/null +++ b/crates/deltalake-core/tests/data/table_with_column_mapping/_delta_log/00000000000000000000.json @@ -0,0 +1,5 @@ +{"commitInfo":{"timestamp":1699946089972,"userId":"2797914831036774","userName":"censoredmail@bmsuisse.ch","operation":"WRITE","operationParameters":{"mode":"Overwrite","statsOnLoad":false,"partitionBy":"[\"Company Very Short\"]"},"notebook":{"notebookId":"3271485675102593"},"clusterId":"0428-070410-lm8e9giw","isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{"numFiles":"2","numOutputRows":"5","numOutputBytes":"1700"},"tags":{"restoresDeletedRows":"false"},"engineInfo":"Databricks-Runtime/13.3.x-photon-scala2.12","txnId":"0e8eece8-347f-4c77-bc4f-daf3a5985dc9"}} +{"metaData":{"id":"592de637-dd77-4aaa-af00-97d723a7f1f1","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"Company Very Short\",\"type\":\"string\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":1,\"delta.columnMapping.physicalName\":\"col-173b4db9-b5ad-427f-9e75-516aae37fbbb\"}},{\"name\":\"Super Name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":2,\"delta.columnMapping.physicalName\":\"col-3877fd94-0973-4941-ac6b-646849a1ff65\"}}]}","partitionColumns":["Company Very Short"],"configuration":{"delta.columnMapping.mode":"name","delta.autoOptimize.optimizeWrite":"true","delta.columnMapping.maxColumnId":"2","delta.targetFileSize":"33554432","delta.tuneFileSizesForRewrites":"true"},"createdTime":1699946083038}} +{"protocol":{"minReaderVersion":2,"minWriterVersion":5}} +{"add":{"path":"BH/part-00000-4d6e745c-8e04-48d9-aa60-438228358f1a.c000.zstd.parquet","partitionValues":{"col-173b4db9-b5ad-427f-9e75-516aae37fbbb":"BMS"},"size":890,"modificationTime":1699946088000,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":\"Anthony Johnson\"},\"maxValues\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":\"Stephanie Mcgrath\"},\"nullCount\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":0}}","tags":{"INSERTION_TIME":"1699946088000000","MIN_INSERTION_TIME":"1699946088000000","MAX_INSERTION_TIME":"1699946088000000","OPTIMIZE_TARGET_SIZE":"33554432"}}} +{"add":{"path":"8v/part-00001-69b4a452-aeac-4ffa-bf5c-a0c2833d05eb.c000.zstd.parquet","partitionValues":{"col-173b4db9-b5ad-427f-9e75-516aae37fbbb":"BME"},"size":810,"modificationTime":1699946088000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":\"Timothy Lamb\"},\"maxValues\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":\"Timothy Lamb\"},\"nullCount\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":0}}","tags":{"INSERTION_TIME":"1699946088000001","MIN_INSERTION_TIME":"1699946088000001","MAX_INSERTION_TIME":"1699946088000001","OPTIMIZE_TARGET_SIZE":"33554432"}}}