-
Notifications
You must be signed in to change notification settings - Fork 347
feat(core): Add support for _file column
#1824
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 21 commits
aab78d6
ee21cab
37b52e2
44463a0
b5449f6
e034009
4f0a4f1
51f76d3
d84e16b
984dacd
bd478cb
8593db0
9b186c7
30ae5fb
adf0da0
f4336a8
ef3a965
534490b
04bf463
9e88edf
060b45d
8572dae
f273add
5aa92ae
c05b886
33bb0ad
42167ff
cbc6b17
977c813
83443aa
35aba12
830e462
4eb8a63
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,15 +20,17 @@ use std::sync::Arc; | |
|
|
||
| use arrow_array::{ | ||
| Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Date32Array, Float32Array, | ||
| Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, StringArray, | ||
| Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, RunArray, | ||
| StringArray, | ||
| }; | ||
| use arrow_cast::cast; | ||
| use arrow_schema::{ | ||
| DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef, | ||
| DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef, | ||
| }; | ||
| use parquet::arrow::PARQUET_FIELD_ID_META_KEY; | ||
|
|
||
| use crate::arrow::schema_to_arrow_schema; | ||
| use crate::metadata_columns::get_metadata_column_name; | ||
| use crate::spec::{Literal, PrimitiveLiteral, Schema as IcebergSchema}; | ||
| use crate::{Error, ErrorKind, Result}; | ||
|
|
||
|
|
@@ -111,6 +113,8 @@ enum SchemaComparison { | |
| pub(crate) struct RecordBatchTransformer { | ||
| snapshot_schema: Arc<IcebergSchema>, | ||
| projected_iceberg_field_ids: Vec<i32>, | ||
| // Map from field ID to constant value for virtual/metadata fields | ||
| constants_map: HashMap<i32, PrimitiveLiteral>, | ||
|
|
||
| // BatchTransform gets lazily constructed based on the schema of | ||
| // the first RecordBatch we receive from the file | ||
|
|
@@ -129,10 +133,22 @@ impl RecordBatchTransformer { | |
| Self { | ||
| snapshot_schema, | ||
| projected_iceberg_field_ids, | ||
| constants_map: HashMap::new(), | ||
| batch_transform: None, | ||
| } | ||
| } | ||
|
|
||
| /// Add a constant value for a specific field ID. | ||
| /// This is used for virtual/metadata fields like _file that have constant values per batch. | ||
| /// | ||
| /// # Arguments | ||
| /// * `field_id` - The field ID to associate with the constant | ||
| /// * `value` - The constant value for this field | ||
| pub(crate) fn with_constant(mut self, field_id: i32, value: PrimitiveLiteral) -> Self { | ||
| self.constants_map.insert(field_id, value); | ||
| self | ||
| } | ||
|
|
||
| pub(crate) fn process_record_batch( | ||
| &mut self, | ||
| record_batch: RecordBatch, | ||
|
|
@@ -167,6 +183,7 @@ impl RecordBatchTransformer { | |
| record_batch.schema_ref(), | ||
| self.snapshot_schema.as_ref(), | ||
| &self.projected_iceberg_field_ids, | ||
| &self.constants_map, | ||
| )?); | ||
|
|
||
| self.process_record_batch(record_batch)? | ||
|
|
@@ -185,6 +202,7 @@ impl RecordBatchTransformer { | |
| source_schema: &ArrowSchemaRef, | ||
| snapshot_schema: &IcebergSchema, | ||
| projected_iceberg_field_ids: &[i32], | ||
| constants_map: &HashMap<i32, PrimitiveLiteral>, | ||
| ) -> Result<BatchTransform> { | ||
| let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?); | ||
| let field_id_to_mapped_schema_map = | ||
|
|
@@ -195,11 +213,24 @@ impl RecordBatchTransformer { | |
| let fields: Result<Vec<_>> = projected_iceberg_field_ids | ||
| .iter() | ||
| .map(|field_id| { | ||
| Ok(field_id_to_mapped_schema_map | ||
| .get(field_id) | ||
| .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))? | ||
| .0 | ||
| .clone()) | ||
| // Check if this is a constant/virtual field | ||
| if let Some(constant_value) = constants_map.get(field_id) { | ||
| // Create a field for the virtual column based on the constant type | ||
| let arrow_type = Self::primitive_literal_to_arrow_type(constant_value)?; | ||
| let field_name = get_metadata_column_name(*field_id)?; | ||
| Ok(Arc::new( | ||
| Field::new(field_name, arrow_type, false).with_metadata(HashMap::from([( | ||
| PARQUET_FIELD_ID_META_KEY.to_string(), | ||
| field_id.to_string(), | ||
| )])), | ||
| )) | ||
| } else { | ||
| Ok(field_id_to_mapped_schema_map | ||
| .get(field_id) | ||
| .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))? | ||
| .0 | ||
| .clone()) | ||
| } | ||
| }) | ||
| .collect(); | ||
|
|
||
|
|
@@ -214,6 +245,7 @@ impl RecordBatchTransformer { | |
| snapshot_schema, | ||
| projected_iceberg_field_ids, | ||
| field_id_to_mapped_schema_map, | ||
| constants_map, | ||
| )?, | ||
| target_schema, | ||
| }), | ||
|
|
@@ -270,11 +302,21 @@ impl RecordBatchTransformer { | |
| snapshot_schema: &IcebergSchema, | ||
| projected_iceberg_field_ids: &[i32], | ||
| field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>, | ||
| constants_map: &HashMap<i32, PrimitiveLiteral>, | ||
| ) -> Result<Vec<ColumnSource>> { | ||
| let field_id_to_source_schema_map = | ||
| Self::build_field_id_to_arrow_schema_map(source_schema)?; | ||
|
|
||
| projected_iceberg_field_ids.iter().map(|field_id|{ | ||
| // Check if this is a constant/virtual field first | ||
| if let Some(constant_value) = constants_map.get(field_id) { | ||
| // This is a virtual field - add it with the constant value | ||
| return Ok(ColumnSource::Add { | ||
| value: Some(constant_value.clone()), | ||
| target_type: Self::primitive_literal_to_arrow_type(constant_value)?, | ||
| }); | ||
| } | ||
|
|
||
| let (target_field, _) = field_id_to_mapped_schema_map.get(field_id).ok_or( | ||
| Error::new(ErrorKind::Unexpected, "could not find field in schema") | ||
| )?; | ||
|
|
@@ -429,6 +471,27 @@ impl RecordBatchTransformer { | |
| let vals: Vec<Option<f64>> = vec![None; num_rows]; | ||
| Arc::new(Float64Array::from(vals)) | ||
| } | ||
| (DataType::RunEndEncoded(_, _), Some(PrimitiveLiteral::String(value))) => { | ||
|
||
| // Create Run-End Encoded array for constant string values (e.g., file paths) | ||
| // This is more memory-efficient than repeating the same value for every row | ||
| let run_ends = if num_rows == 0 { | ||
| Int32Array::from(Vec::<i32>::new()) | ||
| } else { | ||
| Int32Array::from(vec![num_rows as i32]) | ||
| }; | ||
| let values = if num_rows == 0 { | ||
| StringArray::from(Vec::<&str>::new()) | ||
| } else { | ||
| StringArray::from(vec![value.as_str()]) | ||
| }; | ||
| Arc::new(RunArray::try_new(&run_ends, &values).map_err(|e| { | ||
| Error::new( | ||
| ErrorKind::Unexpected, | ||
| "Failed to create RunArray for constant string", | ||
| ) | ||
| .with_source(e) | ||
| })?) | ||
| } | ||
| (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => { | ||
| Arc::new(StringArray::from(vec![value.clone(); num_rows])) | ||
| } | ||
|
|
@@ -452,6 +515,33 @@ impl RecordBatchTransformer { | |
| } | ||
| }) | ||
| } | ||
|
|
||
| /// Converts a PrimitiveLiteral to its corresponding Arrow DataType. | ||
| /// This is used for virtual fields to determine the Arrow type based on the constant value. | ||
| fn primitive_literal_to_arrow_type(literal: &PrimitiveLiteral) -> Result<DataType> { | ||
| Ok(match literal { | ||
| PrimitiveLiteral::Boolean(_) => DataType::Boolean, | ||
| PrimitiveLiteral::Int(_) => DataType::Int32, | ||
| PrimitiveLiteral::Long(_) => DataType::Int64, | ||
| PrimitiveLiteral::Float(_) => DataType::Float32, | ||
| PrimitiveLiteral::Double(_) => DataType::Float64, | ||
| PrimitiveLiteral::String(_) => { | ||
| // Use Run-End Encoding for constant strings (memory efficient) | ||
| let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false)); | ||
| let values_field = Arc::new(Field::new("values", DataType::Utf8, true)); | ||
gbrgr marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| DataType::RunEndEncoded(run_ends_field, values_field) | ||
| } | ||
| PrimitiveLiteral::Binary(_) => DataType::Binary, | ||
| PrimitiveLiteral::Int128(_) => DataType::Decimal128(38, 0), | ||
| PrimitiveLiteral::UInt128(_) => DataType::Decimal128(38, 0), | ||
| PrimitiveLiteral::AboveMax | PrimitiveLiteral::BelowMin => { | ||
| return Err(Error::new( | ||
| ErrorKind::Unexpected, | ||
| "Cannot create arrow type for AboveMax/BelowMin literal", | ||
| )); | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -96,4 +96,5 @@ mod utils; | |
| pub mod writer; | ||
|
|
||
| mod delete_vector; | ||
| pub mod metadata_columns; | ||
| pub mod puffin; | ||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
| @@ -0,0 +1,89 @@ | ||||
| // Licensed to the Apache Software Foundation (ASF) under one | ||||
| // or more contributor license agreements. See the NOTICE file | ||||
| // distributed with this work for additional information | ||||
| // regarding copyright ownership. The ASF licenses this file | ||||
| // to you under the Apache License, Version 2.0 (the | ||||
| // "License"); you may not use this file except in compliance | ||||
| // with the License. You may obtain a copy of the License at | ||||
| // | ||||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||||
| // | ||||
| // Unless required by applicable law or agreed to in writing, | ||||
| // software distributed under the License is distributed on an | ||||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||
| // KIND, either express or implied. See the License for the | ||||
| // specific language governing permissions and limitations | ||||
| // under the License. | ||||
|
|
||||
| //! Metadata columns (virtual/reserved fields) for Iceberg tables. | ||||
| //! | ||||
| //! This module defines metadata columns that can be requested in projections | ||||
| //! but are not stored in data files. Instead, they are computed on-the-fly | ||||
| //! during reading. Examples include the _file column (file path) and future | ||||
| //! columns like partition values or row numbers. | ||||
| use crate::{Error, ErrorKind, Result}; | ||||
|
|
||||
| /// Reserved field ID for the file path (_file) column per Iceberg spec | ||||
| pub const RESERVED_FIELD_ID_FILE: i32 = 2147483646; | ||||
|
||||
|
|
||||
| /// Reserved column name for the file path metadata column | ||||
| pub const RESERVED_COL_NAME_FILE: &str = "_file"; | ||||
|
|
||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please create a lazy field for
Also please don't expose the static field directly, use a method to expose the field reference.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I created an Arrow lazy field (not an Iceberg field) in metadata_columns.rs |
||||
| /// Returns the column name for a metadata field ID. | ||||
| /// | ||||
| /// # Arguments | ||||
| /// * `field_id` - The metadata field ID | ||||
| /// | ||||
| /// # Returns | ||||
| /// The name of the metadata column, or an error if the field ID is not recognized | ||||
| pub fn get_metadata_column_name(field_id: i32) -> Result<&'static str> { | ||||
| match field_id { | ||||
| RESERVED_FIELD_ID_FILE => Ok(RESERVED_COL_NAME_FILE), | ||||
| _ => Err(Error::new( | ||||
| ErrorKind::Unexpected, | ||||
| format!("Unknown metadata field ID: {field_id}"), | ||||
| )), | ||||
| } | ||||
| } | ||||
|
|
||||
| /// Returns the field ID for a metadata column name. | ||||
| /// | ||||
| /// # Arguments | ||||
| /// * `column_name` - The metadata column name | ||||
| /// | ||||
| /// # Returns | ||||
| /// The field ID of the metadata column, or an error if the column name is not recognized | ||||
| pub fn get_metadata_field_id(column_name: &str) -> Result<i32> { | ||||
| match column_name { | ||||
| RESERVED_COL_NAME_FILE => Ok(RESERVED_FIELD_ID_FILE), | ||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wish that we could somehow reuse the mapping from
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Possible, but I don't think it pays off for a hand full of fields. |
||||
| _ => Err(Error::new( | ||||
| ErrorKind::Unexpected, | ||||
| format!("Unknown metadata column name: {column_name}"), | ||||
| )), | ||||
| } | ||||
| } | ||||
|
|
||||
| /// Checks if a field ID is a metadata field. | ||||
| /// | ||||
| /// # Arguments | ||||
| /// * `field_id` - The field ID to check | ||||
| /// | ||||
| /// # Returns | ||||
| /// `true` if the field ID is a metadata field, `false` otherwise | ||||
| pub fn is_metadata_field(field_id: i32) -> bool { | ||||
| field_id == RESERVED_FIELD_ID_FILE | ||||
gbrgr marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
| // Additional metadata fields can be checked here in the future | ||||
| } | ||||
|
|
||||
| /// Checks if a column name is a metadata column. | ||||
| /// | ||||
| /// # Arguments | ||||
| /// * `column_name` - The column name to check | ||||
| /// | ||||
| /// # Returns | ||||
| /// `true` if the column name is a metadata column, `false` otherwise | ||||
gbrgr marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
| pub fn is_metadata_column_name(column_name: &str) -> bool { | ||||
| column_name == RESERVED_COL_NAME_FILE | ||||
gbrgr marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
| // Additional metadata column names can be checked here in the future | ||||
| } | ||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels a bit redundant to have to not only lookup
constants_maptwice, but callprimitive_literal_to_arrow_typetwice (once here and once ingenerate_batch_transform)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I store fields now instead of constants, the double lookup is not easily avoidable in the current grand scheme of things, and should not hurt.