diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index a7ff949f5..facf8688f 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -240,6 +240,7 @@ impl ArrowReader { // The converter that converts `BoundPredicates` to `ArrowPredicates` let mut converter = PredicateConverter { + parquet_schema: &parquet_schema, column_map: &field_id_map, column_indices: &column_indices, }; @@ -442,6 +443,8 @@ impl BoundPredicateVisitor for CollectFieldIdVisitor { /// A visitor to convert Iceberg bound predicates to Arrow predicates. struct PredicateConverter<'a> { + /// The Parquet schema descriptor. + pub parquet_schema: &'a SchemaDescriptor, /// The map between field id and leaf column index in Parquet schema. pub column_map: &'a HashMap, /// The required column indices in Parquet schema for the predicates. @@ -453,17 +456,32 @@ impl PredicateConverter<'_> { /// required column indices which is used to project the column in the record batch. /// Return None if the field id is not found in the column map, which is possible /// due to schema evolution. - fn bound_reference(&mut self, reference: &BoundReference) -> Option { + fn bound_reference(&mut self, reference: &BoundReference) -> Result> { // The leaf column's index in Parquet schema. - let column_idx = self.column_map.get(&reference.field().id)?; + if let Some(column_idx) = self.column_map.get(&reference.field().id) { + if self.parquet_schema.get_column_root_idx(*column_idx) != *column_idx { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Leave column `{}` in predicates isn't a root column in Parquet schema.", + reference.field().name + ), + )); + } - // The leaf column's index in the required column indices. - let index = self - .column_indices - .iter() - .position(|&idx| idx == *column_idx)?; + // The leaf column's index in the required column indices. + let index = self + .column_indices + .iter() + .position(|&idx| idx == *column_idx).ok_or(Error::new(ErrorKind::DataInvalid, format!( + "Leave column `{}` in predicates cannot be found in the required column indices.", + reference.field().name + )))?; - Some(index) + Ok(Some(index)) + } else { + Ok(None) + } } /// Build an Arrow predicate that always returns true. @@ -547,7 +565,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, _predicate: &BoundPredicate, ) -> Result> { - if let Some(idx) = self.bound_reference(reference) { + if let Some(idx) = self.bound_reference(reference)? { Ok(Box::new(move |batch| { let column = project_column(&batch, idx)?; is_null(&column) @@ -563,7 +581,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, _predicate: &BoundPredicate, ) -> Result> { - if let Some(idx) = self.bound_reference(reference) { + if let Some(idx) = self.bound_reference(reference)? { Ok(Box::new(move |batch| { let column = project_column(&batch, idx)?; is_not_null(&column) @@ -579,7 +597,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, _predicate: &BoundPredicate, ) -> Result> { - if self.bound_reference(reference).is_some() { + if self.bound_reference(reference)?.is_some() { self.build_always_true() } else { // A missing column, treating it as null. @@ -592,7 +610,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, _predicate: &BoundPredicate, ) -> Result> { - if self.bound_reference(reference).is_some() { + if self.bound_reference(reference)?.is_some() { self.build_always_false() } else { // A missing column, treating it as null. @@ -606,7 +624,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - if let Some(idx) = self.bound_reference(reference) { + if let Some(idx) = self.bound_reference(reference)? { let literal = get_arrow_datum(literal)?; Ok(Box::new(move |batch| { @@ -625,7 +643,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - if let Some(idx) = self.bound_reference(reference) { + if let Some(idx) = self.bound_reference(reference)? { let literal = get_arrow_datum(literal)?; Ok(Box::new(move |batch| { @@ -644,7 +662,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - if let Some(idx) = self.bound_reference(reference) { + if let Some(idx) = self.bound_reference(reference)? { let literal = get_arrow_datum(literal)?; Ok(Box::new(move |batch| { @@ -663,7 +681,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - if let Some(idx) = self.bound_reference(reference) { + if let Some(idx) = self.bound_reference(reference)? { let literal = get_arrow_datum(literal)?; Ok(Box::new(move |batch| { @@ -682,7 +700,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - if let Some(idx) = self.bound_reference(reference) { + if let Some(idx) = self.bound_reference(reference)? { let literal = get_arrow_datum(literal)?; Ok(Box::new(move |batch| { @@ -701,7 +719,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - if let Some(idx) = self.bound_reference(reference) { + if let Some(idx) = self.bound_reference(reference)? { let literal = get_arrow_datum(literal)?; Ok(Box::new(move |batch| {