Skip to content

Commit

Permalink
More
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed May 6, 2024
1 parent 5453c57 commit abe7afc
Showing 1 changed file with 36 additions and 18 deletions.
54 changes: 36 additions & 18 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ impl ArrowReader {

// The converter that converts `BoundPredicates` to `ArrowPredicates`
let mut converter = PredicateConverter {
parquet_schema: &parquet_schema,
column_map: &field_id_map,
column_indices: &column_indices,
};
Expand Down Expand Up @@ -442,6 +443,8 @@ impl BoundPredicateVisitor for CollectFieldIdVisitor {

/// A visitor to convert Iceberg bound predicates to Arrow predicates.
struct PredicateConverter<'a> {
/// The Parquet schema descriptor.
pub parquet_schema: &'a SchemaDescriptor,
/// The map between field id and leaf column index in Parquet schema.
pub column_map: &'a HashMap<i32, usize>,
/// The required column indices in Parquet schema for the predicates.
Expand All @@ -453,17 +456,32 @@ impl PredicateConverter<'_> {
/// required column indices which is used to project the column in the record batch.
/// Return None if the field id is not found in the column map, which is possible
/// due to schema evolution.
fn bound_reference(&mut self, reference: &BoundReference) -> Option<usize> {
fn bound_reference(&mut self, reference: &BoundReference) -> Result<Option<usize>> {
// The leaf column's index in Parquet schema.
let column_idx = self.column_map.get(&reference.field().id)?;
if let Some(column_idx) = self.column_map.get(&reference.field().id) {
if self.parquet_schema.get_column_root_idx(*column_idx) != *column_idx {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Leave column `{}` in predicates isn't a root column in Parquet schema.",
reference.field().name
),
));
}

// The leaf column's index in the required column indices.
let index = self
.column_indices
.iter()
.position(|&idx| idx == *column_idx)?;
// The leaf column's index in the required column indices.
let index = self
.column_indices
.iter()
.position(|&idx| idx == *column_idx).ok_or(Error::new(ErrorKind::DataInvalid, format!(
"Leave column `{}` in predicates cannot be found in the required column indices.",
reference.field().name
)))?;

Some(index)
Ok(Some(index))
} else {
Ok(None)
}
}

/// Build an Arrow predicate that always returns true.
Expand Down Expand Up @@ -547,7 +565,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
reference: &BoundReference,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference) {
if let Some(idx) = self.bound_reference(reference)? {
Ok(Box::new(move |batch| {
let column = project_column(&batch, idx)?;
is_null(&column)
Expand All @@ -563,7 +581,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
reference: &BoundReference,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference) {
if let Some(idx) = self.bound_reference(reference)? {
Ok(Box::new(move |batch| {
let column = project_column(&batch, idx)?;
is_not_null(&column)
Expand All @@ -579,7 +597,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
reference: &BoundReference,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if self.bound_reference(reference).is_some() {
if self.bound_reference(reference)?.is_some() {
self.build_always_true()
} else {
// A missing column, treating it as null.
Expand All @@ -592,7 +610,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
reference: &BoundReference,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if self.bound_reference(reference).is_some() {
if self.bound_reference(reference)?.is_some() {
self.build_always_false()
} else {
// A missing column, treating it as null.
Expand All @@ -606,7 +624,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference) {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;

Ok(Box::new(move |batch| {
Expand All @@ -625,7 +643,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference) {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;

Ok(Box::new(move |batch| {
Expand All @@ -644,7 +662,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference) {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;

Ok(Box::new(move |batch| {
Expand All @@ -663,7 +681,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference) {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;

Ok(Box::new(move |batch| {
Expand All @@ -682,7 +700,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference) {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;

Ok(Box::new(move |batch| {
Expand All @@ -701,7 +719,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference) {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;

Ok(Box::new(move |batch| {
Expand Down

0 comments on commit abe7afc

Please sign in to comment.