Skip to content

Commit

Permalink
Simplify column mapping mode handling
Browse files Browse the repository at this point in the history
  • Loading branch information
scovich committed Nov 26, 2024
1 parent 6eb2dc4 commit f938c7a
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 68 deletions.
3 changes: 1 addition & 2 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ use std::collections::HashMap;
use std::sync::LazyLock;

use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
use crate::expressions::{column_name, ColumnName};
use crate::schema::{ColumnNamesAndTypes, DataType};
use crate::schema::{column_name, ColumnName, ColumnNamesAndTypes, DataType};
use crate::utils::require;
use crate::{DeltaResult, Error};

Expand Down
3 changes: 1 addition & 2 deletions kernel/src/engine/arrow_data.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::engine_data::{EngineData, EngineList, EngineMap, GetData, RowVisitor};
use crate::expressions::ColumnName;
use crate::schema::{DataType};
use crate::schema::{ColumnName, DataType};
use crate::{DeltaResult, Error};

use arrow_array::cast::AsArray;
Expand Down
3 changes: 1 addition & 2 deletions kernel/src/engine_data.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Traits that engines need to implement in order to pass data between themselves and kernel.
use crate::expressions::ColumnName;
use crate::schema::DataType;
use crate::schema::{ColumnName, DataType};
use crate::{AsAny, DeltaResult, Error};

use tracing::debug;
Expand Down
7 changes: 2 additions & 5 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ impl ScanBuilder {
let (all_fields, read_fields, have_partition_cols) = get_state_info(
logical_schema.as_ref(),
&self.snapshot.metadata().partition_columns,
self.snapshot.column_mapping_mode,
)?;
let physical_schema = Arc::new(StructType::new(read_fields));

Expand Down Expand Up @@ -399,7 +398,6 @@ fn parse_partition_value(raw: Option<&String>, data_type: &DataType) -> DeltaRes
fn get_state_info(
logical_schema: &Schema,
partition_columns: &[String],
column_mapping_mode: ColumnMappingMode,
) -> DeltaResult<(Vec<ColumnType>, Vec<StructField>, bool)> {
let mut have_partition_cols = false;
let mut read_fields = Vec::with_capacity(logical_schema.fields.len());
Expand All @@ -419,7 +417,7 @@ fn get_state_info(
} else {
// Add to read schema, store field so we can build a `Column` expression later
// if needed (i.e. if we have partition columns)
let physical_field = logical_field.make_physical(column_mapping_mode)?;
let physical_field = logical_field.make_physical();
debug!("\n\n{logical_field:#?}\nAfter mapping: {physical_field:#?}\n\n");
let physical_name = physical_field.name.clone();
read_fields.push(physical_field);
Expand Down Expand Up @@ -451,7 +449,6 @@ pub fn transform_to_logical(
let (all_fields, _read_fields, have_partition_cols) = get_state_info(
&global_state.logical_schema,
&global_state.partition_columns,
global_state.column_mapping_mode,
)?;
transform_to_logical_internal(
engine,
Expand Down Expand Up @@ -488,7 +485,7 @@ fn transform_to_logical_internal(
"logical schema did not contain expected field, can't transform data",
));
};
let name = field.physical_name(global_state.column_mapping_mode)?;
let name = field.physical_name();
let value_expression =
parse_partition_value(partition_values.get(name), field.data_type())?;
Ok(value_expression.into())
Expand Down
3 changes: 1 addition & 2 deletions kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
use std::collections::HashMap;
use std::sync::LazyLock;

use crate::expressions::ColumnName;
use crate::utils::require;
use crate::{
actions::{
deletion_vector::{treemap_to_bools, DeletionVectorDescriptor},
visitors::visit_deletion_vector_at,
},
engine_data::{GetData, RowVisitor, TypedGetData as _},
schema::{ColumnNamesAndTypes, DataType, SchemaRef},
schema::{ColumnName, ColumnNamesAndTypes, DataType, SchemaRef},
table_features::ColumnMappingMode,
DeltaResult, Engine, EngineData, Error,
};
Expand Down
67 changes: 30 additions & 37 deletions kernel/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use indexmap::IndexMap;
use itertools::Itertools;
use serde::{Deserialize, Serialize};

use crate::expressions::ColumnName;
use crate::table_features::ColumnMappingMode;
// re-export because many call sites that use schemas do not necessarily use expressions
pub(crate) use crate::expressions::{column_name, ColumnName};
use crate::utils::require;
use crate::{DeltaResult, Error};

Expand Down Expand Up @@ -131,20 +131,18 @@ impl StructField {
self.metadata.get(key.as_ref())
}

/// Get the physical name for this field as it should be read from parquet, based on the
/// specified column mapping mode.
pub fn physical_name(&self, mapping_mode: ColumnMappingMode) -> DeltaResult<&str> {
let physical_name_key = ColumnMetadataKey::ColumnMappingPhysicalName.as_ref();
let name_mapped_name = self.metadata.get(physical_name_key);
match (mapping_mode, name_mapped_name) {
(ColumnMappingMode::None, _) => Ok(self.name.as_str()),
(ColumnMappingMode::Name, Some(MetadataValue::String(name))) => Ok(name),
(ColumnMappingMode::Name, invalid) => Err(Error::generic(format!(
"Missing or invalid {physical_name_key}: {invalid:?}"
))),
(ColumnMappingMode::Id, _) => {
Err(Error::generic("Don't support id column mapping yet"))
}
/// Get the physical name for this field as it should be read from parquet.
///
/// NOTE: Caller affirms that the schema was already validated by
/// [`crate::table_features::validate_column_mapping_schema`], to ensure that
/// annotations are always and only present when column mapping mode is enabled.
pub fn physical_name(&self) -> &str {
match self
.metadata
.get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref())
{
Some(MetadataValue::String(physical_name)) => physical_name,
_ => &self.name,
}
}

Expand Down Expand Up @@ -188,32 +186,27 @@ impl StructField {
.collect()
}

pub fn make_physical(&self, mapping_mode: ColumnMappingMode) -> DeltaResult<Self> {
use ColumnMappingMode::*;
match mapping_mode {
Id => return Err(Error::generic("Column ID mapping mode not supported")),
None => return Ok(self.clone()),
Name => {} // fall out
}

struct ApplyNameMapping;
impl<'a> SchemaTransform<'a> for ApplyNameMapping {
/// Applies physical name mappings to this field
///
/// NOTE: Caller affirms that the schema was alreasdy validated by
/// [`crate::table_features::validate_column_mapping_schema`], to ensure that
/// annotations are always and only present when column mapping mode is enabled.
pub fn make_physical(&self) -> Self {
struct MakePhysical;
impl<'a> SchemaTransform<'a> for MakePhysical {
fn transform_struct_field(
&mut self,
field: &'a StructField,
) -> Option<Cow<'a, StructField>> {
let field = self.recurse_into_struct_field(field)?;
match field.get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName) {
Some(MetadataValue::String(physical_name)) => {
Some(Cow::Owned(field.with_name(physical_name)))
}
_ => Some(field),
}
Some(Cow::Owned(field.with_name(field.physical_name())))
}
}

let field = ApplyNameMapping.transform_struct_field(self);
Ok(field.unwrap().into_owned())
// NOTE: unwrap is safe because the transformer is incapable of returning None
MakePhysical
.transform_struct_field(self)
.unwrap()
.into_owned()
}
}

Expand Down Expand Up @@ -1034,10 +1027,10 @@ mod tests {
.unwrap();
assert!(matches!(col_id, MetadataValue::Number(num) if *num == 4));
assert_eq!(
field.physical_name(ColumnMappingMode::Name).unwrap(),
field.physical_name(),
"col-5f422f40-de70-45b2-88ab-1d5c90e94db1"
);
let physical_field = field.make_physical(ColumnMappingMode::Name).unwrap();
let physical_field = field.make_physical();
assert_eq!(
physical_field.name,
"col-5f422f40-de70-45b2-88ab-1d5c90e94db1"
Expand Down
7 changes: 6 additions & 1 deletion kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use crate::actions::{Metadata, Protocol};
use crate::log_segment::LogSegment;
use crate::scan::ScanBuilder;
use crate::schema::Schema;
use crate::table_features::{column_mapping_mode, ColumnMappingMode};
use crate::table_features::{
column_mapping_mode, validate_schema_column_mapping, ColumnMappingMode,
};
use crate::table_properties::TableProperties;
use crate::{DeltaResult, Engine, Error, FileSystemClient, Version};

Expand Down Expand Up @@ -82,9 +84,12 @@ impl Snapshot {
// important! before a read/write to the table we must check it is supported
protocol.ensure_read_supported()?;

// validate column mapping mode -- all schema fields should be correctly (un)annotated
let schema = metadata.schema()?;
let table_properties = metadata.parse_table_properties();
let column_mapping_mode = column_mapping_mode(&protocol, &table_properties);
validate_schema_column_mapping(&schema, column_mapping_mode)?;

Ok(Self {
table_root: location,
log_segment,
Expand Down
6 changes: 0 additions & 6 deletions kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::log_segment::LogSegment;
use crate::path::AsUrl;
use crate::schema::{DataType, Schema, StructField, StructType};
use crate::snapshot::Snapshot;
use crate::table_features::ColumnMappingMode;
use crate::{DeltaResult, Engine, Error, Version};

pub mod scan;
Expand Down Expand Up @@ -164,11 +163,6 @@ impl TableChanges {
pub(crate) fn partition_columns(&self) -> &Vec<String> {
&self.end_snapshot.metadata().partition_columns
}
/// The column mapping mode at the end schema.
#[allow(unused)]
pub(crate) fn column_mapping_mode(&self) -> &ColumnMappingMode {
&self.end_snapshot.column_mapping_mode
}

/// Create a [`TableChangesScanBuilder`] for an `Arc<TableChanges>`.
pub fn scan_builder(self: Arc<Self>) -> TableChangesScanBuilder {
Expand Down
3 changes: 1 addition & 2 deletions kernel/src/table_changes/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ impl TableChangesScanBuilder {
} else {
// Add to read schema, store field so we can build a `Column` expression later
// if needed (i.e. if we have partition columns)
let physical_field =
logical_field.make_physical(*self.table_changes.column_mapping_mode())?;
let physical_field = logical_field.make_physical();
debug!("\n\n{logical_field:#?}\nAfter mapping: {physical_field:#?}\n\n");
let physical_name = physical_field.name.clone();
read_fields.push(physical_field);
Expand Down
137 changes: 129 additions & 8 deletions kernel/src/table_features/column_mapping.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
//! Code to handle column mapping, including modes and schema transforms
use super::ReaderFeatures;
use crate::actions::Protocol;
use crate::schema::{ColumnName, DataType, MetadataValue, Schema, SchemaTransform, StructField};
use crate::table_properties::TableProperties;
use crate::{DeltaResult, Error};

use std::borrow::Cow;

use serde::{Deserialize, Serialize};
use strum::EnumString;
Expand All @@ -25,18 +29,135 @@ pub(crate) fn column_mapping_mode(
protocol: &Protocol,
table_properties: &TableProperties,
) -> ColumnMappingMode {
match table_properties.column_mapping_mode {
Some(mode) if protocol.min_reader_version() == 2 => mode,
Some(mode)
if protocol.min_reader_version() == 3
&& protocol.has_reader_feature(&ReaderFeatures::ColumnMapping) =>
{
mode
}
match (
table_properties.column_mapping_mode,
protocol.min_reader_version(),
) {
(Some(mode), 2) => mode,
(Some(mode), 3) if protocol.has_reader_feature(&ReaderFeatures::ColumnMapping) => mode,
_ => ColumnMappingMode::None,
}
}

/// When column mapping mode is enabled, verify that each field in the schema is annotated with a
/// physical name and field_id; when not enabled, verify that no fields are annotated.
pub(crate) fn validate_schema_column_mapping(
schema: &Schema,
mode: ColumnMappingMode,
) -> DeltaResult<()> {
if mode == ColumnMappingMode::Id {
// TODO: Support column mapping ID mode
return Err(Error::unsupported("Column mapping ID mode not supported"));
}

let mut validator = ValidateColumnMappings {
mode,
path: vec![],
err: None,
};
let _ = validator.transform_struct(schema);
match validator.err {
Some(err) => Err(err),
None => Ok(()),
}
}

struct ValidateColumnMappings<'a> {
mode: ColumnMappingMode,
path: Vec<&'a str>,
err: Option<Error>,
}

impl<'a> ValidateColumnMappings<'a> {
fn transform_inner_type(
&mut self,
data_type: &'a DataType,
name: &'a str,
) -> Option<Cow<'a, DataType>> {
if self.err.is_none() {
self.path.push(name);
let _ = self.transform(data_type);
self.path.pop();
}
None
}
fn check_annotations(&mut self, field: &StructField) {
// The iterator yields `&&str` but `ColumnName::new` needs `&str`
let column_name = || ColumnName::new(self.path.iter().copied());
let annotation = "delta.columnMapping.physicalName";
match (self.mode, field.metadata.get(annotation)) {
// Both Id and Name modes require a physical name annotation; None mode forbids it.
(ColumnMappingMode::None, None) => {}
(ColumnMappingMode::Name | ColumnMappingMode::Id, Some(MetadataValue::String(_))) => {}
(ColumnMappingMode::Name | ColumnMappingMode::Id, Some(_)) => {
self.err = Some(Error::invalid_column_mapping_mode(format!(
"The {annotation} annotation on field '{}' must be a string",
column_name()
)));
}
(ColumnMappingMode::None, Some(_)) => {
self.err = Some(Error::invalid_column_mapping_mode(format!(
"Column mapping is not enabled but field '{annotation}' is annotated with {}",
column_name()
)));
}
(ColumnMappingMode::Name | ColumnMappingMode::Id, None) => {
self.err = Some(Error::invalid_column_mapping_mode(format!(
"Column mapping is enabled but field '{}' lacks the {annotation} annotation",
column_name()
)));
}
}

let annotation = "delta.columnMapping.id";
match (self.mode, field.metadata.get(annotation)) {
// Both Id and Name modes require a field ID annotation; None mode forbids it.
(ColumnMappingMode::None, None) => {}
(ColumnMappingMode::Name | ColumnMappingMode::Id, Some(MetadataValue::Number(_))) => {}
(ColumnMappingMode::Name | ColumnMappingMode::Id, Some(_)) => {
self.err = Some(Error::invalid_column_mapping_mode(format!(
"The {annotation} annotation on field '{}' must be a number",
column_name()
)));
}
(ColumnMappingMode::None, Some(_)) => {
self.err = Some(Error::invalid_column_mapping_mode(format!(
"Column mapping is not enabled but field '{}' is annotated with {annotation}",
column_name()
)));
}
(ColumnMappingMode::Name | ColumnMappingMode::Id, None) => {
self.err = Some(Error::invalid_column_mapping_mode(format!(
"Column mapping is enabled but field '{}' lacks the {annotation} annotation",
column_name()
)));
}
}
}
}

impl<'a> SchemaTransform<'a> for ValidateColumnMappings<'a> {
// Override array element and map key/value for better error messages
fn transform_array_element(&mut self, etype: &'a DataType) -> Option<Cow<'a, DataType>> {
self.transform_inner_type(etype, "<array element>")
}
fn transform_map_key(&mut self, ktype: &'a DataType) -> Option<Cow<'a, DataType>> {
self.transform_inner_type(ktype, "<map key>")
}
fn transform_map_value(&mut self, vtype: &'a DataType) -> Option<Cow<'a, DataType>> {
self.transform_inner_type(vtype, "<map value>")
}
fn transform_struct_field(&mut self, field: &'a StructField) -> Option<Cow<'a, StructField>> {
if self.err.is_none() {
self.path.push(&field.name);
self.check_annotations(field);
let _ = self.recurse_into_struct_field(field);
self.path.pop();
}
None
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit f938c7a

Please sign in to comment.