-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
apply a schema to fix column names #331
Changes from 9 commits
5a7c419
31e1977
5acbdb2
1ea7554
54493aa
532a870
a5feb17
91ca99c
29b09f7
451a76d
ada2f7b
1633e49
4af1bbd
e0e3ed6
ce4d3f3
9d3b49d
69c1e27
e232c77
40833db
f7652d5
4113603
7ca2705
39cfe0f
07c56ca
f2f75b8
3dda376
e67360c
085bd19
88b4db3
09bc974
29645a7
927bbaf
b605e5e
2d00469
0b80c0e
a4c6de6
38456d1
5e65247
9020a25
9cbd992
17a5303
ae67479
b2e143a
0b48b4c
65b6218
479758d
efc5193
e02357a
eb786d8
d9af456
622c03f
dd65781
ecbd668
929a187
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,7 @@ use std::sync::Arc; | |
use arrow_arith::boolean::{and_kleene, is_null, not, or_kleene}; | ||
use arrow_arith::numeric::{add, div, mul, sub}; | ||
use arrow_array::cast::AsArray; | ||
use arrow_array::types::*; | ||
use arrow_array::{types::*, MapArray}; | ||
use arrow_array::{ | ||
Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Datum, Decimal128Array, Float32Array, | ||
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, RecordBatch, | ||
|
@@ -21,6 +21,7 @@ use arrow_select::concat::concat; | |
use itertools::Itertools; | ||
|
||
use super::arrow_conversion::LIST_ARRAY_ROOT; | ||
use super::arrow_utils::make_arrow_error; | ||
use crate::engine::arrow_data::ArrowEngineData; | ||
use crate::engine::arrow_utils::ensure_data_types; | ||
use crate::engine::arrow_utils::prim_array_cmp; | ||
|
@@ -214,10 +215,9 @@ fn evaluate_expression( | |
let output_fields: Vec<ArrowField> = output_cols | ||
.iter() | ||
.zip(schema.fields()) | ||
.map(|(array, input_field)| -> DeltaResult<_> { | ||
ensure_data_types(input_field.data_type(), array.data_type())?; | ||
.map(|(array, output_field)| -> DeltaResult<_> { | ||
Ok(ArrowField::new( | ||
input_field.name(), | ||
output_field.name(), | ||
array.data_type().clone(), | ||
array.is_nullable(), | ||
)) | ||
|
@@ -366,6 +366,153 @@ fn evaluate_expression( | |
} | ||
} | ||
|
||
// return a RecordBatch where the names of fields in `sa` have been transformed to match those in | ||
// schema specified by `output_type` | ||
nicklan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
fn apply_schema(sa: &StructArray, output_type: &DataType) -> DeltaResult<RecordBatch> { | ||
let applied = apply_to_col(sa.data_type(), sa, output_type)?.ok_or(Error::generic( | ||
"apply_to_col at top-level should return something", | ||
))?; | ||
let applied_sa = applied.as_struct_opt().ok_or(Error::generic( | ||
"apply_to_col at top-level should return a struct array", | ||
))?; | ||
Ok(applied_sa.into()) | ||
} | ||
|
||
// make column `col` with type `arrow_type` look like `kernel_type`. For now this only handles name | ||
// transforms. if the actual data types don't match, this will return an error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about nullability? IIRC we have to read parquet with everything nullable, because parquet can't express the concept of a non-nullable field nesting inside a nullable field. Or did we handle that already by just making everything nullable in our action schema? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We made all the top level structs nullable in the action schema (which is correct). But we do have non-nullable things inside. So I suspect maybe the final things don't match on nullability if the parquet schema really says everything is nullable. I will test and if we get mismatch we can fix that here too There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we do decide to allow non-nullable fields nested inside nullable fields, we'll have to verify that our default engine's row visitor handles it gracefully? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do indeed have nullability mismatches in that the arrow schema says things can be null and our schema says they cannot. I feel like this is something that might need a little thought to fix as it'll slow down metadata parsing if we have to go through and fix up the schema every time with nullability changes, although that will never be exposed to the connector. There's also the issue of if we should adjust the metadata of each schema element, although that's less semantically important. Thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code now really makes the schema match the target, both for nullability and for metadata. This isn't cheap so we need to discuss it a bit probably There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can we add to this comment and give the result<option<...>> semantics? that is (AFAICT) we return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my other comment -- I think we can simplify the method contract |
||
fn apply_to_col( | ||
arrow_type: &ArrowDataType, | ||
col: &dyn Array, | ||
kernel_type: &DataType, | ||
) -> DeltaResult<Option<Arc<dyn Array>>> { | ||
match (kernel_type, arrow_type) { | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
(DataType::Struct(kernel_fields), ArrowDataType::Struct(arrow_fields)) => { | ||
if kernel_fields.fields.len() != arrow_fields.len() { | ||
nicklan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return Err(make_arrow_error(format!( | ||
"Kernel schema had {} fields, but data has {}", | ||
kernel_fields.fields.len(), | ||
arrow_fields.len() | ||
))); | ||
} | ||
let sa = col.as_struct_opt().ok_or(make_arrow_error( | ||
"Arrow claimed to be a struct but isn't a StructArray".to_string(), | ||
))?; | ||
nicklan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let (fields, sa_cols, sa_nulls) = sa.clone().into_parts(); | ||
nicklan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let result_iter = fields | ||
.into_iter() | ||
.zip(sa_cols) | ||
.zip(kernel_fields.fields()) | ||
.map( | ||
|((sa_field, sa_col), kernel_field)| -> DeltaResult<(ArrowField, Arc<dyn Array>)> { | ||
let transformed_col = | ||
apply_to_col(sa_field.data_type(), &sa_col, kernel_field.data_type())? | ||
.unwrap_or(sa_col); | ||
nicklan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let transformed_field = sa_field | ||
.as_ref() | ||
.clone() | ||
.with_name(kernel_field.name.clone()) | ||
.with_data_type(transformed_col.data_type().clone()); | ||
Ok((transformed_field, transformed_col)) | ||
}, | ||
); | ||
let (transformed_fields, transformed_cols): (Vec<ArrowField>, Vec<Arc<dyn Array>>) = | ||
result_iter.process_results(|iter| iter.unzip())?; | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let transformed_array = | ||
StructArray::try_new(transformed_fields.into(), transformed_cols, sa_nulls)?; | ||
Ok(Some(Arc::new(transformed_array))) | ||
} | ||
(DataType::Array(inner_type), ArrowDataType::List(_arrow_list_type)) => { | ||
nicklan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// deconstruct the array, the rebuild the mapped version | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let la = col.as_list_opt().ok_or(make_arrow_error( | ||
"Arrow claimed to be a list but isn't a ListArray".to_string(), | ||
))?; | ||
let (field, offset_buffer, values, nulls) = la.clone().into_parts(); | ||
let transformed_values = | ||
apply_to_col(field.data_type(), &values, &inner_type.element_type)? | ||
.unwrap_or(values); | ||
let transformed_field = Arc::new( | ||
field | ||
.as_ref() | ||
.clone() | ||
.with_data_type(transformed_values.data_type().clone()), | ||
); | ||
let transformed_array = | ||
ListArray::try_new(transformed_field, offset_buffer, transformed_values, nulls)?; | ||
Ok(Some(Arc::new(transformed_array))) | ||
} | ||
(DataType::Map(kernel_map_type), ArrowDataType::Map(arrow_map_type, _)) => { | ||
let ma = col.as_map_opt().ok_or(make_arrow_error( | ||
"Arrow claimed to be a map but isn't a MapArray".to_string(), | ||
))?; | ||
let (map_field, offset_buffer, map_struct_array, nulls, ordered) = | ||
ma.clone().into_parts(); | ||
if let ArrowDataType::Struct(_) = arrow_map_type.data_type() { | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let (fields, msa_cols, msa_nulls) = map_struct_array.clone().into_parts(); | ||
let mut fields = fields.into_iter(); | ||
let key_field = fields.next().ok_or(make_arrow_error( | ||
"Arrow map struct didn't have a key field".to_string(), | ||
))?; | ||
let value_field = fields.next().ok_or(make_arrow_error( | ||
"Arrow map struct didn't have a value field".to_string(), | ||
))?; | ||
if fields.next().is_some() { | ||
return Err(Error::generic("map fields had more than 2 members")); | ||
} | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let transformed_key = apply_to_col( | ||
key_field.data_type(), | ||
msa_cols[0].as_ref(), | ||
&kernel_map_type.key_type, | ||
)? | ||
.unwrap_or(msa_cols[0].clone()); | ||
let transformed_values = apply_to_col( | ||
value_field.data_type(), | ||
msa_cols[1].as_ref(), | ||
&kernel_map_type.value_type, | ||
)? | ||
.unwrap_or(msa_cols[1].clone()); | ||
let transformed_struct_fields = vec![ | ||
key_field | ||
.as_ref() | ||
.clone() | ||
.with_data_type(transformed_key.data_type().clone()), | ||
value_field | ||
.as_ref() | ||
.clone() | ||
.with_data_type(transformed_values.data_type().clone()), | ||
]; | ||
let transformed_struct_cols = vec![transformed_key, transformed_values]; | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let transformed_map_struct_array = StructArray::try_new( | ||
transformed_struct_fields.into(), | ||
transformed_struct_cols, | ||
msa_nulls, | ||
)?; | ||
let transformed_map_field = Arc::new( | ||
map_field | ||
.as_ref() | ||
.clone() | ||
.with_data_type(transformed_map_struct_array.data_type().clone()), | ||
); | ||
let transformed_map = MapArray::try_new( | ||
transformed_map_field, | ||
offset_buffer, | ||
transformed_map_struct_array, | ||
nulls, | ||
ordered, | ||
)?; | ||
Ok(Some(Arc::new(transformed_map))) | ||
} else { | ||
Err(make_arrow_error( | ||
"Arrow map type wasn't a struct.".to_string(), | ||
)) | ||
} | ||
} | ||
_ => { | ||
ensure_data_types(kernel_type, arrow_type)?; | ||
Ok(None) | ||
} | ||
} | ||
} | ||
|
||
#[derive(Debug)] | ||
pub struct ArrowExpressionHandler; | ||
|
||
|
@@ -410,10 +557,13 @@ impl ExpressionEvaluator for DefaultExpressionEvaluator { | |
let array_ref = evaluate_expression(&self.expression, batch, Some(&self.output_type))?; | ||
let arrow_type: ArrowDataType = ArrowDataType::try_from(&self.output_type)?; | ||
let batch: RecordBatch = if let DataType::Struct(_) = self.output_type { | ||
array_ref | ||
let sa: &StructArray = array_ref | ||
.as_struct_opt() | ||
.ok_or(Error::unexpected_column_type("Expected a struct array"))? | ||
.into() | ||
.ok_or(Error::unexpected_column_type("Expected a struct array"))?; | ||
nicklan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
match ensure_data_types(&self.output_type, sa.data_type()) { | ||
Ok(_) => sa.into(), | ||
Err(_) => apply_schema(sa, &self.output_type)?, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't I wonder if it's possible to somehow implement this as a bottom-up recursion, so that we don't have to deconstruct a given array until we've proven at least one field name changed somewhere "below" it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we could add an initial traversal that recurses through the two schemas looking for "stopping points"? Basically, any time we traverse through a complex type we do (pseudocode): child_name_changes = [has_name_change(arrow_child, kernel_child)
for (arrow_child, kernel_child) in ...]
if any(child_name_changes):
# track individual children that don't need name changes
for (i, name_change) in enumerate(name_changes):
if not name_change:
terminal_nodes.add(... arrow field i ...)
return True
# report that we have a no name change; parent decides whether to track us or not
return False With the set of terminal nodes in hand, the "real" rewrites can avoid descending deeper than is necessary. I verified that arrow does impl Hash for Field, so we can build a the necessary hash set. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably not worth tackling this optimization now, but maybe worth tracking as a TODO? Good first issue even? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, added: #396 (will mark as good first once this merges)
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} else { | ||
let schema = ArrowSchema::new(vec![ArrowField::new("output", arrow_type, true)]); | ||
RecordBatch::try_new(Arc::new(schema), vec![array_ref])? | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,7 +12,7 @@ use crate::actions::{get_log_schema, ADD_NAME, REMOVE_NAME}; | |
use crate::expressions::{Expression, Scalar}; | ||
use crate::features::ColumnMappingMode; | ||
use crate::scan::state::{DvInfo, Stats}; | ||
use crate::schema::{DataType, Schema, SchemaRef, StructField, StructType}; | ||
use crate::schema::{ArrayType, DataType, MapType, Schema, SchemaRef, StructField, StructType}; | ||
use crate::snapshot::Snapshot; | ||
use crate::{DeltaResult, Engine, EngineData, Error, FileMeta}; | ||
|
||
|
@@ -384,6 +384,78 @@ fn parse_partition_value(raw: Option<&String>, data_type: &DataType) -> DeltaRes | |
} | ||
} | ||
|
||
/// Transform a logical field into the physical form. Currently just renames things for 'name' | ||
/// column mapping. | ||
fn make_field_physical( | ||
logical_field: &StructField, | ||
column_mapping_mode: ColumnMappingMode, | ||
) -> DeltaResult<StructField> { | ||
match column_mapping_mode { | ||
ColumnMappingMode::None => Ok(logical_field.clone()), | ||
ColumnMappingMode::Name => { | ||
let physical_name = logical_field.physical_name(column_mapping_mode)?; | ||
nicklan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let field_data_type = logical_field.data_type(); | ||
let mapped_data_type = make_data_type_physical(field_data_type, column_mapping_mode)?; | ||
Ok(StructField { | ||
name: physical_name.to_string(), | ||
data_type: mapped_data_type, | ||
nullable: logical_field.nullable, | ||
metadata: logical_field.metadata.clone(), | ||
}) | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
ColumnMappingMode::Id => panic!("No id"), | ||
} | ||
} | ||
|
||
/// Transform a DataType into the physical form. Currently just renames anything in a nested type | ||
/// for 'name' column mapping. | ||
fn make_data_type_physical( | ||
logical_dt: &DataType, | ||
column_mapping_mode: ColumnMappingMode, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI: Normally, the field's field id and physical name are stored in the field's metadata... but Iceberg requires field ids even for the internal columns used by Map and Array, and there's no way to associate metadata with those. So, when IcebergCompatV2 table feature is enabled, we have to remember the most recently-seen field, as well as the column path we descended through since then, so we can fetch the field ids out of that parent field's metadata. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I don't think Delta cares about those iceberg field ids -- even in column mapping field mode -- so maybe we can ignore all of this on the read path. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm going to close my eyes and hope you're right ;) |
||
) -> DeltaResult<DataType> { | ||
match column_mapping_mode { | ||
ColumnMappingMode::None => Ok(logical_dt.clone()), | ||
ColumnMappingMode::Name => { | ||
// we don't need to rename at this level, just need to keep the recursion going | ||
// because there might be structs below us | ||
match logical_dt { | ||
DataType::Array(array_type) => { | ||
let new_type = | ||
make_data_type_physical(&array_type.element_type, column_mapping_mode)?; | ||
Ok(DataType::Array(Box::new(ArrayType::new( | ||
new_type, | ||
array_type.contains_null, | ||
)))) | ||
} | ||
DataType::Map(map_type) => { | ||
let new_key_type = | ||
make_data_type_physical(&map_type.key_type, column_mapping_mode)?; | ||
let new_value_type = | ||
make_data_type_physical(&map_type.value_type, column_mapping_mode)?; | ||
Ok(DataType::Map(Box::new(MapType::new( | ||
new_key_type, | ||
new_value_type, | ||
map_type.value_contains_null, | ||
)))) | ||
} | ||
DataType::Struct(struct_type) => { | ||
// build up the mapped child fields | ||
let children = struct_type | ||
.fields() | ||
.map(|field| make_field_physical(field, column_mapping_mode)) | ||
nicklan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.try_collect()?; | ||
Ok(DataType::Struct(Box::new(StructType::new(children)))) | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
_ => { | ||
// types with no children don't change | ||
Ok(logical_dt.clone()) | ||
} | ||
} | ||
} | ||
ColumnMappingMode::Id => panic!("No id"), | ||
} | ||
} | ||
|
||
/// Get the state needed to process a scan. In particular this returns a triple of | ||
/// (all_fields_in_query, fields_to_read_from_parquet, have_partition_cols) where: | ||
/// - all_fields_in_query - all fields in the query as [`ColumnType`] enums | ||
|
@@ -413,10 +485,11 @@ fn get_state_info( | |
} else { | ||
// Add to read schema, store field so we can build a `Column` expression later | ||
// if needed (i.e. if we have partition columns) | ||
let physical_name = logical_field.physical_name(column_mapping_mode)?; | ||
let physical_field = logical_field.with_name(physical_name); | ||
let physical_field = make_field_physical(logical_field, column_mapping_mode)?; | ||
debug!("\n\n{logical_field:#?}\nAfter mapping: {physical_field:#?}\n\n"); | ||
let name = physical_field.name.clone(); | ||
read_fields.push(physical_field); | ||
Ok(ColumnType::Selected(physical_name.to_string())) | ||
Ok(ColumnType::Selected(name)) | ||
} | ||
}) | ||
.try_collect()?; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rescuing #331 (comment):
Down to 7% now...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I've added some more tests and it's up now