From 92627746cf298857c8ce45f89e29929e63e7a422 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 23 Mar 2024 21:33:45 -0700 Subject: [PATCH 01/17] feat: Convert predicate to arrow filter and push down to parquet reader --- Cargo.toml | 1 + crates/iceberg/Cargo.toml | 1 + crates/iceberg/src/arrow.rs | 431 ++++++++++++++++++++++++++- crates/iceberg/src/expr/predicate.rs | 45 ++- crates/iceberg/src/scan.rs | 150 +++++++++- crates/iceberg/src/spec/values.rs | 5 + 6 files changed, 615 insertions(+), 18 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c1cdcdf5f..2106f39f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ apache-avro = "0.16" array-init = "2" arrow-arith = { version = "51" } arrow-array = { version = "51" } +arrow-ord = { version = "51" } arrow-schema = { version = "51" } async-stream = "0.3.5" async-trait = "0.1" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 5aea856fe..31139552d 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -34,6 +34,7 @@ apache-avro = { workspace = true } array-init = { workspace = true } arrow-arith = { workspace = true } arrow-array = { workspace = true } +arrow-ord = { workspace = true } arrow-schema = { workspace = true } async-stream = { workspace = true } async-trait = { workspace = true } diff --git a/crates/iceberg/src/arrow.rs b/crates/iceberg/src/arrow.rs index 527fb1917..1859b6649 100644 --- a/crates/iceberg/src/arrow.rs +++ b/crates/iceberg/src/arrow.rs @@ -20,17 +20,30 @@ use async_stream::try_stream; use futures::stream::StreamExt; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; +use std::collections::HashMap; use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; -use crate::spec::SchemaRef; +use crate::spec::{Datum, PrimitiveLiteral, SchemaRef}; use crate::error::Result; +use crate::expr::{ + BinaryExpression, BoundPredicate, BoundReference, PredicateOperator, SetExpression, + UnaryExpression, +}; use crate::spec::{ ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type, }; use crate::{Error, ErrorKind}; +use arrow_arith::boolean::{and, is_not_null, is_null, not, or}; +use arrow_array::{ + BooleanArray, Datum as ArrowDatum, Float32Array, Float64Array, Int32Array, Int64Array, +}; +use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit}; +use bitvec::macros::internal::funty::Fundamental; +use parquet::arrow::arrow_reader::{ArrowPredicate, ArrowPredicateFn, RowFilter}; +use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; use std::sync::Arc; /// Builder to create ArrowReader @@ -38,6 +51,7 @@ pub struct ArrowReaderBuilder { batch_size: Option, file_io: FileIO, schema: SchemaRef, + predicates: Option>, } impl ArrowReaderBuilder { @@ -47,6 +61,7 @@ impl ArrowReaderBuilder { batch_size: None, file_io, schema, + predicates: None, } } @@ -57,12 +72,19 @@ impl ArrowReaderBuilder { self } + /// Sets the predicates to apply to the scan. + pub fn with_predicates(mut self, predicates: Vec) -> Self { + self.predicates = Some(predicates); + self + } + /// Build the ArrowReader. pub fn build(self) -> ArrowReader { ArrowReader { batch_size: self.batch_size, schema: self.schema, file_io: self.file_io, + predicates: self.predicates, } } } @@ -73,6 +95,7 @@ pub struct ArrowReader { #[allow(dead_code)] schema: SchemaRef, file_io: FileIO, + predicates: Option>, } impl ArrowReader { @@ -95,6 +118,13 @@ impl ArrowReader { .await? .with_projection(projection_mask); + let parquet_schema = batch_stream_builder.parquet_schema(); + let row_filter = self.get_row_filter(parquet_schema)?; + + if let Some(row_filter) = row_filter { + batch_stream_builder = batch_stream_builder.with_row_filter(row_filter); + } + if let Some(batch_size) = self.batch_size { batch_stream_builder = batch_stream_builder.with_batch_size(batch_size); } @@ -113,6 +143,405 @@ impl ArrowReader { // TODO: full implementation ProjectionMask::all() } + + fn get_row_filter(&self, parquet_schema: &SchemaDescriptor) -> Result> { + if let Some(predicates) = &self.predicates { + let field_id_map = self.build_field_id_map(parquet_schema)?; + + // Collect Parquet column indices from field ids + let column_indices = predicates + .iter() + .map(|predicate| { + let mut collector = CollectFieldIdVisitor { field_ids: vec![] }; + collector.visit_predicate(predicate).unwrap(); + collector + .field_ids + .iter() + .map(|field_id| { + field_id_map.get(field_id).cloned().ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "Field id not found in schema") + }) + }) + .collect::>>() + }) + .collect::>>()?; + + // Convert BoundPredicates to ArrowPredicates + let mut arrow_predicates = vec![]; + for (predicate, columns) in predicates.iter().zip(column_indices.iter()) { + let mut converter = PredicateConverter { + columns, + projection_mask: ProjectionMask::leaves(parquet_schema, columns.clone()), + parquet_schema, + column_map: &field_id_map, + }; + let arrow_predicate = converter.visit_predicate(predicate)?; + arrow_predicates.push(arrow_predicate); + } + Ok(Some(RowFilter::new(arrow_predicates))) + } else { + Ok(None) + } + } + + /// Build the map of field id to Parquet column index in the schema. + fn build_field_id_map(&self, parquet_schema: &SchemaDescriptor) -> Result> { + let mut column_map = HashMap::new(); + for (idx, field) in parquet_schema.columns().iter().enumerate() { + let field_type = field.self_type(); + match field_type { + ParquetType::PrimitiveType { basic_info, .. } => { + if !basic_info.has_id() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Leave column {:?} in schema doesn't have field id", + field_type + ), + )); + } + column_map.insert(basic_info.id(), idx); + } + ParquetType::GroupType { .. } => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Leave column in schema should be primitive type but got {:?}", + field_type + ), + )); + } + }; + } + + Ok(column_map) + } +} + +/// A visitor to collect field ids from bound predicates. +struct CollectFieldIdVisitor { + field_ids: Vec, +} + +impl BoundPredicateVisitor for CollectFieldIdVisitor { + type T = (); + type U = (); + + fn and(&mut self, _predicates: Vec) -> Result { + Ok(()) + } + + fn or(&mut self, _predicates: Vec) -> Result { + Ok(()) + } + + fn not(&mut self, _predicate: Self::T) -> Result { + Ok(()) + } + + fn visit_always_true(&mut self) -> Result { + Ok(()) + } + + fn visit_always_false(&mut self) -> Result { + Ok(()) + } + + fn visit_unary(&mut self, predicate: &UnaryExpression) -> Result { + self.bound_reference(predicate.term())?; + Ok(()) + } + + fn visit_binary(&mut self, predicate: &BinaryExpression) -> Result { + self.bound_reference(predicate.term())?; + Ok(()) + } + + fn visit_set(&mut self, predicate: &SetExpression) -> Result { + self.bound_reference(predicate.term())?; + Ok(()) + } + + fn bound_reference(&mut self, reference: &BoundReference) -> Result { + self.field_ids.push(reference.field().id); + Ok(()) + } +} + +struct PredicateConverter<'a> { + pub columns: &'a Vec, + pub projection_mask: ProjectionMask, + pub parquet_schema: &'a SchemaDescriptor, + pub column_map: &'a HashMap, +} + +fn get_arrow_datum(datum: &Datum) -> Box { + match datum.literal() { + PrimitiveLiteral::Boolean(value) => Box::new(BooleanArray::new_scalar(*value)), + PrimitiveLiteral::Int(value) => Box::new(Int32Array::new_scalar(*value)), + PrimitiveLiteral::Long(value) => Box::new(Int64Array::new_scalar(*value)), + PrimitiveLiteral::Float(value) => Box::new(Float32Array::new_scalar(value.as_f32())), + PrimitiveLiteral::Double(value) => Box::new(Float64Array::new_scalar(value.as_f64())), + _ => todo!("Unsupported literal type"), + } +} + +impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { + type T = Box; + type U = usize; + + fn visit_always_true(&mut self) -> Result { + Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), + ))) + } + + fn visit_always_false(&mut self) -> Result { + Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + |batch| Ok(BooleanArray::from(vec![false; batch.num_rows()])), + ))) + } + + fn visit_unary(&mut self, predicate: &UnaryExpression) -> Result { + let term_index = self.bound_reference(predicate.term())?; + + match predicate.op() { + PredicateOperator::IsNull => Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + move |batch| { + let column = batch.column(term_index); + is_null(column) + }, + ))), + PredicateOperator::NotNull => Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + move |batch| { + let column = batch.column(term_index); + is_not_null(column) + }, + ))), + PredicateOperator::IsNan => { + todo!("IsNan is not supported yet") + } + PredicateOperator::NotNan => { + todo!("NotNan is not supported yet") + } + op => Err(Error::new( + ErrorKind::DataInvalid, + format!("Unsupported unary operator: {op}"), + )), + } + } + + fn visit_binary(&mut self, predicate: &BinaryExpression) -> Result { + let term_index = self.bound_reference(predicate.term())?; + let literal = predicate.literal().clone(); + + match predicate.op() { + PredicateOperator::LessThan => Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + move |batch| { + let left = batch.column(term_index); + let literal = get_arrow_datum(&literal); + lt(left, literal.as_ref()) + }, + ))), + PredicateOperator::LessThanOrEq => Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + move |batch| { + let left = batch.column(term_index); + let literal = get_arrow_datum(&literal); + lt_eq(left, literal.as_ref()) + }, + ))), + PredicateOperator::GreaterThan => Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + move |batch| { + let left = batch.column(term_index); + let literal = get_arrow_datum(&literal); + gt(left, literal.as_ref()) + }, + ))), + PredicateOperator::GreaterThanOrEq => Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + move |batch| { + let left = batch.column(term_index); + let literal = get_arrow_datum(&literal); + gt_eq(left, literal.as_ref()) + }, + ))), + PredicateOperator::Eq => Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + move |batch| { + let left = batch.column(term_index); + let literal = get_arrow_datum(&literal); + eq(left, literal.as_ref()) + }, + ))), + PredicateOperator::NotEq => Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + move |batch| { + let left = batch.column(term_index); + let literal = get_arrow_datum(&literal); + neq(left, literal.as_ref()) + }, + ))), + op => Err(Error::new( + ErrorKind::DataInvalid, + format!("Unsupported binary operator: {op}"), + )), + } + } + + fn visit_set(&mut self, predicate: &SetExpression) -> Result { + match predicate.op() { + PredicateOperator::In => { + todo!("In is not supported yet") + } + PredicateOperator::NotIn => { + todo!("NotIn is not supported yet") + } + op => Err(Error::new( + ErrorKind::DataInvalid, + format!("Unsupported set operator: {op}"), + )), + } + } + + fn and(&mut self, mut predicates: Vec) -> Result { + Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + move |batch| { + let left = predicates.get_mut(0).unwrap().evaluate(batch.clone())?; + let right = predicates.get_mut(1).unwrap().evaluate(batch)?; + and(&left, &right) + }, + ))) + } + + fn or(&mut self, mut predicates: Vec) -> Result { + Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + move |batch| { + let left = predicates.get_mut(0).unwrap().evaluate(batch.clone())?; + let right = predicates.get_mut(1).unwrap().evaluate(batch)?; + or(&left, &right) + }, + ))) + } + + fn not(&mut self, mut predicate: Self::T) -> Result { + Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + move |batch| { + let evaluated = predicate.evaluate(batch.clone())?; + not(&evaluated) + }, + ))) + } + + fn bound_reference(&mut self, reference: &BoundReference) -> Result { + let column_idx = self.column_map.get(&reference.field().id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Field id {} not found in schema", reference.field().id), + ) + })?; + + let root_col_index = self.parquet_schema.get_column_root_idx(*column_idx); + + // Find the column index in projection mask. + let column_idx = self + .columns + .iter() + .position(|&x| x == root_col_index) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Column index {} not found in schema", root_col_index), + ) + })?; + + Ok(column_idx) + } +} + +/// A visitor for bound predicates. +pub trait BoundPredicateVisitor { + /// Return type of this visitor on bound predicate. + type T; + + /// Return type of this visitor on bound reference. + type U; + + /// Visit a bound predicate. + fn visit_predicate(&mut self, predicate: &BoundPredicate) -> Result { + match predicate { + BoundPredicate::And(predicates) => self.visit_and(predicates.inputs()), + BoundPredicate::Or(predicates) => self.visit_or(predicates.inputs()), + BoundPredicate::Not(predicate) => self.visit_not(predicate.inputs()), + BoundPredicate::AlwaysTrue => self.visit_always_true(), + BoundPredicate::AlwaysFalse => self.visit_always_false(), + BoundPredicate::Unary(unary) => self.visit_unary(unary), + BoundPredicate::Binary(binary) => self.visit_binary(binary), + BoundPredicate::Set(set) => self.visit_set(set), + } + } + + /// Visit an AND predicate. + fn visit_and(&mut self, predicates: [&BoundPredicate; 2]) -> Result { + let mut results = Vec::with_capacity(predicates.len()); + for predicate in predicates { + let result = self.visit_predicate(predicate)?; + results.push(result); + } + self.and(results) + } + + /// Visit an OR predicate. + fn visit_or(&mut self, predicates: [&BoundPredicate; 2]) -> Result { + let mut results = Vec::with_capacity(predicates.len()); + for predicate in predicates { + let result = self.visit_predicate(predicate)?; + results.push(result); + } + self.or(results) + } + + /// Visit a NOT predicate. + fn visit_not(&mut self, predicate: [&BoundPredicate; 1]) -> Result { + let result = self.visit_predicate(predicate.first().unwrap())?; + self.not(result) + } + + /// Visit an always true predicate. + fn visit_always_true(&mut self) -> Result; + + /// Visit an always false predicate. + fn visit_always_false(&mut self) -> Result; + + /// Visit a unary predicate. + fn visit_unary(&mut self, predicate: &UnaryExpression) -> Result; + + /// Visit a binary predicate. + fn visit_binary(&mut self, predicate: &BinaryExpression) -> Result; + + /// Visit a set predicate. + fn visit_set(&mut self, predicate: &SetExpression) -> Result; + + /// Called after visiting predicates of AND. + fn and(&mut self, predicates: Vec) -> Result; + + /// Called after visiting predicates of OR. + fn or(&mut self, predicates: Vec) -> Result; + + /// Called after visiting predicates of NOT. + fn not(&mut self, predicate: Self::T) -> Result; + + /// Visit a bound reference. + fn bound_reference(&mut self, reference: &BoundReference) -> Result; } /// A post order arrow schema visitor. diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index f8bcffe70..995b10be9 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -32,7 +32,7 @@ use crate::spec::{Datum, SchemaRef}; use crate::{Error, ErrorKind}; /// Logical expression, such as `AND`, `OR`, `NOT`. -#[derive(PartialEq)] +#[derive(PartialEq, Clone)] pub struct LogicalExpression { inputs: [Box; N], } @@ -79,7 +79,7 @@ where } /// Unary predicate, for example, `a IS NULL`. -#[derive(PartialEq)] +#[derive(PartialEq, Clone)] pub struct UnaryExpression { /// Operator of this predicate, must be single operand operator. op: PredicateOperator, @@ -116,10 +116,20 @@ impl UnaryExpression { debug_assert!(op.is_unary()); Self { op, term } } + + /// Return the term of this predicate. + pub fn term(&self) -> &T { + &self.term + } + + /// Return the operator of this predicate. + pub fn op(&self) -> &PredicateOperator { + &self.op + } } /// Binary predicate, for example, `a > 10`. -#[derive(PartialEq)] +#[derive(PartialEq, Clone)] pub struct BinaryExpression { /// Operator of this predicate, must be binary operator, such as `=`, `>`, `<`, etc. op: PredicateOperator, @@ -144,6 +154,21 @@ impl BinaryExpression { debug_assert!(op.is_binary()); Self { op, term, literal } } + + /// Return the term of this predicate. + pub fn term(&self) -> &T { + &self.term + } + + /// Return the operator of this predicate. + pub fn op(&self) -> &PredicateOperator { + &self.op + } + + /// Return the literal of this predicate. + pub fn literal(&self) -> &Datum { + &self.literal + } } impl Display for BinaryExpression { @@ -166,7 +191,7 @@ impl Bind for BinaryExpression { } /// Set predicates, for example, `a in (1, 2, 3)`. -#[derive(PartialEq)] +#[derive(PartialEq, Clone)] pub struct SetExpression { /// Operator of this predicate, must be set operator, such as `IN`, `NOT IN`, etc. op: PredicateOperator, @@ -191,6 +216,16 @@ impl SetExpression { debug_assert!(op.is_set()); Self { op, term, literals } } + + /// Return the term of this predicate. + pub fn term(&self) -> &T { + &self.term + } + + /// Return the operator of this predicate. + pub fn op(&self) -> &PredicateOperator { + &self.op + } } impl Bind for SetExpression { @@ -556,7 +591,7 @@ impl Not for Predicate { } /// Bound predicate expression after binding to a schema. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum BoundPredicate { /// An expression always evaluates to true. AlwaysTrue, diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 852bcafbb..ca93029a1 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -18,6 +18,7 @@ //! Table scan api. use crate::arrow::ArrowReaderBuilder; +use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadataRef}; use crate::table::Table; @@ -32,6 +33,7 @@ pub struct TableScanBuilder<'a> { table: &'a Table, // Empty column names means to select all columns column_names: Vec, + predicates: Vec, snapshot_id: Option, batch_size: Option, } @@ -41,6 +43,7 @@ impl<'a> TableScanBuilder<'a> { Self { table, column_names: vec![], + predicates: vec![], snapshot_id: None, batch_size: None, } @@ -59,6 +62,12 @@ impl<'a> TableScanBuilder<'a> { self } + /// Add a predicate to the scan. The scan will only return rows that match the predicate. + pub fn filter(mut self, predicate: Predicate) -> Self { + self.predicates.push(predicate); + self + } + /// Select some columns of the table. pub fn select(mut self, column_names: impl IntoIterator) -> Self { self.column_names = column_names @@ -115,11 +124,17 @@ impl<'a> TableScanBuilder<'a> { } } + let mut bound_predicates = Vec::new(); + for predicate in self.predicates { + bound_predicates.push(predicate.bind(schema.clone(), true)?); + } + Ok(TableScan { snapshot, file_io: self.table.file_io().clone(), table_metadata: self.table.metadata_ref(), column_names: self.column_names, + bound_predicates, schema, batch_size: self.batch_size, }) @@ -134,6 +149,7 @@ pub struct TableScan { table_metadata: TableMetadataRef, file_io: FileIO, column_names: Vec, + bound_predicates: Vec, schema: SchemaRef, batch_size: Option, } @@ -191,6 +207,8 @@ impl TableScan { arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); } + arrow_reader_builder = arrow_reader_builder.with_predicates(self.bound_predicates.clone()); + arrow_reader_builder.build().read(self.plan_files().await?) } } @@ -216,9 +234,10 @@ impl FileScanTask { #[cfg(test)] mod tests { + use crate::expr::Reference; use crate::io::{FileIO, OutputFile}; use crate::spec::{ - DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest, + DataContentType, DataFileBuilder, DataFileFormat, Datum, FormatVersion, Literal, Manifest, ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus, ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID, }; @@ -390,18 +409,39 @@ mod tests { // prepare data let schema = { - let fields = - vec![ - arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "0".to_string(), - )])), - ]; + let fields = vec![ + arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + ]; Arc::new(arrow_schema::Schema::new(fields)) }; - let col = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; - let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap(); + let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; + + let mut values = vec![2; 512]; + values.append(vec![3; 200].as_mut()); + values.append(vec![4; 300].as_mut()); + values.append(vec![5; 12].as_mut()); + + let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; + + let mut values = vec![3; 512]; + values.append(vec![4; 512].as_mut()); + + let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; + let to_write = RecordBatch::try_new(schema.clone(), vec![col1, col2, col3]).unwrap(); // Write the Parquet files let props = WriterProperties::builder() @@ -531,9 +571,95 @@ mod tests { let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); - let col = batches[0].column_by_name("col").unwrap(); + let col = batches[0].column_by_name("x").unwrap(); + + let int64_arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(int64_arr.value(0), 1); + } + + #[tokio::test] + async fn test_filter_on_arrow_lt() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Filter: y < 3 + let mut builder = fixture.table.scan(); + let predicate = Reference::new("y").less_than(Datum::long(3)); + builder = builder.filter(predicate); + let table_scan = builder.build().unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches[0].num_rows(), 512); + + let col = batches[0].column_by_name("x").unwrap(); + let int64_arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(int64_arr.value(0), 1); + + let col = batches[0].column_by_name("y").unwrap(); + let int64_arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(int64_arr.value(0), 2); + } + + #[tokio::test] + async fn test_filter_on_arrow_gt_eq() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Filter: y >= 5 + let mut builder = fixture.table.scan(); + let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5)); + builder = builder.filter(predicate); + let table_scan = builder.build().unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches[0].num_rows(), 12); + let col = batches[0].column_by_name("x").unwrap(); let int64_arr = col.as_any().downcast_ref::().unwrap(); assert_eq!(int64_arr.value(0), 1); + + let col = batches[0].column_by_name("y").unwrap(); + let int64_arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(int64_arr.value(0), 5); + } + + #[tokio::test] + async fn test_filter_on_arrow_is_null() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Filter: y is null + let mut builder = fixture.table.scan(); + let predicate = Reference::new("y").is_null(); + builder = builder.filter(predicate); + let table_scan = builder.build().unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 0); + } + + #[tokio::test] + async fn test_filter_on_arrow_is_not_null() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Filter: y is not null + let mut builder = fixture.table.scan(); + let predicate = Reference::new("y").is_not_null(); + builder = builder.filter(predicate); + let table_scan = builder.build().unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches[0].num_rows(), 1024); } } diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index f31d64779..3fe5b5eee 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -673,6 +673,11 @@ impl Datum { )), } } + + /// Returns the literal of the datum. + pub fn literal(&self) -> &PrimitiveLiteral { + &self.literal + } } /// Values present in iceberg type From a537092af1a884413d201e1a029b74b21c123a27 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 3 Apr 2024 14:32:35 -0700 Subject: [PATCH 02/17] For review --- crates/iceberg/src/arrow/reader.rs | 123 +++++---------------------- crates/iceberg/src/expr/predicate.rs | 88 +++++++++++++++++++ 2 files changed, 107 insertions(+), 104 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 9bc307792..8f11784d5 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -35,8 +35,8 @@ use std::str::FromStr; use crate::arrow::arrow_schema_to_schema; use crate::expr::{ - BinaryExpression, BoundPredicate, BoundReference, PredicateOperator, SetExpression, - UnaryExpression, + visit_predicate, BinaryExpression, BoundPredicate, BoundPredicateVisitor, BoundReference, + PredicateOperator, SetExpression, UnaryExpression, }; use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream}; @@ -225,7 +225,7 @@ impl ArrowReader { .iter() .map(|predicate| { let mut collector = CollectFieldIdVisitor { field_ids: vec![] }; - collector.visit_predicate(predicate).unwrap(); + visit_predicate(&mut collector, predicate).unwrap(); collector .field_ids .iter() @@ -247,7 +247,7 @@ impl ArrowReader { parquet_schema, column_map: &field_id_map, }; - let arrow_predicate = converter.visit_predicate(predicate)?; + let arrow_predicate = visit_predicate(&mut converter, predicate)?; arrow_predicates.push(arrow_predicate); } Ok(Some(RowFilter::new(arrow_predicates))) @@ -394,16 +394,11 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { is_not_null(column) }, ))), - PredicateOperator::IsNan => { - todo!("IsNan is not supported yet") - } - PredicateOperator::NotNan => { - todo!("NotNan is not supported yet") - } - op => Err(Error::new( - ErrorKind::DataInvalid, - format!("Unsupported unary operator: {op}"), - )), + // Unsupported operators, return always true. + _ => Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), + ))), } } @@ -460,25 +455,21 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { neq(left, literal.as_ref()) }, ))), - op => Err(Error::new( - ErrorKind::DataInvalid, - format!("Unsupported binary operator: {op}"), - )), + // Unsupported operators, return always true. + _ => Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), + ))), } } fn visit_set(&mut self, predicate: &SetExpression) -> Result { match predicate.op() { - PredicateOperator::In => { - todo!("In is not supported yet") - } - PredicateOperator::NotIn => { - todo!("NotIn is not supported yet") - } - op => Err(Error::new( - ErrorKind::DataInvalid, - format!("Unsupported set operator: {op}"), - )), + // Unsupported operators, return always true. + _ => Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), + ))), } } @@ -539,79 +530,3 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { Ok(column_idx) } } - -/// A visitor for bound predicates. -pub trait BoundPredicateVisitor { - /// Return type of this visitor on bound predicate. - type T; - - /// Return type of this visitor on bound reference. - type U; - - /// Visit a bound predicate. - fn visit_predicate(&mut self, predicate: &BoundPredicate) -> Result { - match predicate { - BoundPredicate::And(predicates) => self.visit_and(predicates.inputs()), - BoundPredicate::Or(predicates) => self.visit_or(predicates.inputs()), - BoundPredicate::Not(predicate) => self.visit_not(predicate.inputs()), - BoundPredicate::AlwaysTrue => self.visit_always_true(), - BoundPredicate::AlwaysFalse => self.visit_always_false(), - BoundPredicate::Unary(unary) => self.visit_unary(unary), - BoundPredicate::Binary(binary) => self.visit_binary(binary), - BoundPredicate::Set(set) => self.visit_set(set), - } - } - - /// Visit an AND predicate. - fn visit_and(&mut self, predicates: [&BoundPredicate; 2]) -> Result { - let mut results = Vec::with_capacity(predicates.len()); - for predicate in predicates { - let result = self.visit_predicate(predicate)?; - results.push(result); - } - self.and(results) - } - - /// Visit an OR predicate. - fn visit_or(&mut self, predicates: [&BoundPredicate; 2]) -> Result { - let mut results = Vec::with_capacity(predicates.len()); - for predicate in predicates { - let result = self.visit_predicate(predicate)?; - results.push(result); - } - self.or(results) - } - - /// Visit a NOT predicate. - fn visit_not(&mut self, predicate: [&BoundPredicate; 1]) -> Result { - let result = self.visit_predicate(predicate.first().unwrap())?; - self.not(result) - } - - /// Visit an always true predicate. - fn visit_always_true(&mut self) -> Result; - - /// Visit an always false predicate. - fn visit_always_false(&mut self) -> Result; - - /// Visit a unary predicate. - fn visit_unary(&mut self, predicate: &UnaryExpression) -> Result; - - /// Visit a binary predicate. - fn visit_binary(&mut self, predicate: &BinaryExpression) -> Result; - - /// Visit a set predicate. - fn visit_set(&mut self, predicate: &SetExpression) -> Result; - - /// Called after visiting predicates of AND. - fn and(&mut self, predicates: Vec) -> Result; - - /// Called after visiting predicates of OR. - fn or(&mut self, predicates: Vec) -> Result; - - /// Called after visiting predicates of NOT. - fn not(&mut self, predicate: Self::T) -> Result; - - /// Visit a bound reference. - fn bound_reference(&mut self, reference: &BoundReference) -> Result; -} diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index 4e81e579f..f69abb106 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -642,6 +642,94 @@ impl Display for BoundPredicate { } } +/// Visit a bound predicate. +pub fn visit_predicate( + visitor: &mut V, + predicate: &BoundPredicate, +) -> Result { + match predicate { + BoundPredicate::And(predicates) => visit_and(visitor, predicates.inputs()), + BoundPredicate::Or(predicates) => visit_or(visitor, predicates.inputs()), + BoundPredicate::Not(predicate) => visit_not(visitor, predicate.inputs()), + BoundPredicate::AlwaysTrue => visitor.visit_always_true(), + BoundPredicate::AlwaysFalse => visitor.visit_always_false(), + BoundPredicate::Unary(unary) => visitor.visit_unary(unary), + BoundPredicate::Binary(binary) => visitor.visit_binary(binary), + BoundPredicate::Set(set) => visitor.visit_set(set), + } +} + +/// Visit an AND predicate. +fn visit_and( + visitor: &mut V, + predicates: [&BoundPredicate; 2], +) -> Result { + let mut results = Vec::with_capacity(predicates.len()); + for predicate in predicates { + let result = visit_predicate(visitor, predicate)?; + results.push(result); + } + visitor.and(results) +} + +/// Visit an OR predicate. +fn visit_or( + visitor: &mut V, + predicates: [&BoundPredicate; 2], +) -> Result { + let mut results = Vec::with_capacity(predicates.len()); + for predicate in predicates { + let result = visit_predicate(visitor, predicate)?; + results.push(result); + } + visitor.or(results) +} + +/// Visit a NOT predicate. +fn visit_not( + visitor: &mut V, + predicate: [&BoundPredicate; 1], +) -> Result { + let result = visit_predicate(visitor, predicate.first().unwrap())?; + visitor.not(result) +} + +/// A visitor for bound predicates. +pub trait BoundPredicateVisitor { + /// Return type of this visitor on bound predicate. + type T; + + /// Return type of this visitor on bound reference. + type U; + + /// Visit an always true predicate. + fn visit_always_true(&mut self) -> Result; + + /// Visit an always false predicate. + fn visit_always_false(&mut self) -> Result; + + /// Visit a unary predicate. + fn visit_unary(&mut self, predicate: &UnaryExpression) -> Result; + + /// Visit a binary predicate. + fn visit_binary(&mut self, predicate: &BinaryExpression) -> Result; + + /// Visit a set predicate. + fn visit_set(&mut self, predicate: &SetExpression) -> Result; + + /// Called after visiting predicates of AND. + fn and(&mut self, predicates: Vec) -> Result; + + /// Called after visiting predicates of OR. + fn or(&mut self, predicates: Vec) -> Result; + + /// Called after visiting predicates of NOT. + fn not(&mut self, predicate: Self::T) -> Result; + + /// Visit a bound reference. + fn bound_reference(&mut self, reference: &BoundReference) -> Result; +} + #[cfg(test)] mod tests { use std::ops::Not; From d612d711f14cc46efdff11d6c9c52ae8748fc503 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 3 Apr 2024 16:07:13 -0700 Subject: [PATCH 03/17] Fix clippy --- crates/iceberg/src/arrow/reader.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 8f11784d5..25c449c05 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -464,6 +464,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { } fn visit_set(&mut self, predicate: &SetExpression) -> Result { + #[allow(clippy::match_single_binding)] match predicate.op() { // Unsupported operators, return always true. _ => Ok(Box::new(ArrowPredicateFn::new( From 86bde1956dab5e7b6310db5a2940535753b244a3 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 7 Apr 2024 14:15:05 -0700 Subject: [PATCH 04/17] Change from vector of BoundPredicate to BoundPredicate --- crates/iceberg/src/arrow/reader.rs | 71 +++++++++++++----------------- crates/iceberg/src/scan.rs | 21 +++++---- 2 files changed, 42 insertions(+), 50 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 25c449c05..1541b878b 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -49,7 +49,7 @@ pub struct ArrowReaderBuilder { field_ids: Vec, file_io: FileIO, schema: SchemaRef, - predicates: Option>, + predicates: Option, } impl ArrowReaderBuilder { @@ -78,7 +78,7 @@ impl ArrowReaderBuilder { } /// Sets the predicates to apply to the scan. - pub fn with_predicates(mut self, predicates: Vec) -> Self { + pub fn with_predicates(mut self, predicates: BoundPredicate) -> Self { self.predicates = Some(predicates); self } @@ -102,7 +102,7 @@ pub struct ArrowReader { #[allow(dead_code)] schema: SchemaRef, file_io: FileIO, - predicates: Option>, + predicates: Option, } impl ArrowReader { @@ -221,36 +221,27 @@ impl ArrowReader { let field_id_map = self.build_field_id_map(parquet_schema)?; // Collect Parquet column indices from field ids - let column_indices = predicates + let mut collector = CollectFieldIdVisitor { field_ids: vec![] }; + visit_predicate(&mut collector, predicates).unwrap(); + let column_indices = collector + .field_ids .iter() - .map(|predicate| { - let mut collector = CollectFieldIdVisitor { field_ids: vec![] }; - visit_predicate(&mut collector, predicate).unwrap(); - collector - .field_ids - .iter() - .map(|field_id| { - field_id_map.get(field_id).cloned().ok_or_else(|| { - Error::new(ErrorKind::DataInvalid, "Field id not found in schema") - }) - }) - .collect::>>() + .map(|field_id| { + field_id_map.get(field_id).cloned().ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "Field id not found in schema") + }) }) .collect::>>()?; // Convert BoundPredicates to ArrowPredicates - let mut arrow_predicates = vec![]; - for (predicate, columns) in predicates.iter().zip(column_indices.iter()) { - let mut converter = PredicateConverter { - columns, - projection_mask: ProjectionMask::leaves(parquet_schema, columns.clone()), - parquet_schema, - column_map: &field_id_map, - }; - let arrow_predicate = visit_predicate(&mut converter, predicate)?; - arrow_predicates.push(arrow_predicate); - } - Ok(Some(RowFilter::new(arrow_predicates))) + let mut converter = PredicateConverter { + columns: &column_indices, + projection_mask: ProjectionMask::leaves(parquet_schema, column_indices.clone()), + parquet_schema, + column_map: &field_id_map, + }; + let arrow_predicate = visit_predicate(&mut converter, predicates)?; + Ok(Some(RowFilter::new(vec![arrow_predicate]))) } else { Ok(None) } @@ -347,14 +338,17 @@ struct PredicateConverter<'a> { pub column_map: &'a HashMap, } -fn get_arrow_datum(datum: &Datum) -> Box { +fn get_arrow_datum(datum: &Datum) -> Result> { match datum.literal() { - PrimitiveLiteral::Boolean(value) => Box::new(BooleanArray::new_scalar(*value)), - PrimitiveLiteral::Int(value) => Box::new(Int32Array::new_scalar(*value)), - PrimitiveLiteral::Long(value) => Box::new(Int64Array::new_scalar(*value)), - PrimitiveLiteral::Float(value) => Box::new(Float32Array::new_scalar(value.as_f32())), - PrimitiveLiteral::Double(value) => Box::new(Float64Array::new_scalar(value.as_f64())), - _ => todo!("Unsupported literal type"), + PrimitiveLiteral::Boolean(value) => Ok(Box::new(BooleanArray::new_scalar(*value))), + PrimitiveLiteral::Int(value) => Ok(Box::new(Int32Array::new_scalar(*value))), + PrimitiveLiteral::Long(value) => Ok(Box::new(Int64Array::new_scalar(*value))), + PrimitiveLiteral::Float(value) => Ok(Box::new(Float32Array::new_scalar(value.as_f32()))), + PrimitiveLiteral::Double(value) => Ok(Box::new(Float64Array::new_scalar(value.as_f64()))), + l => Err(Error::new( + ErrorKind::DataInvalid, + format!("Unsupported literal type: {:?}", l), + )), } } @@ -405,13 +399,13 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { fn visit_binary(&mut self, predicate: &BinaryExpression) -> Result { let term_index = self.bound_reference(predicate.term())?; let literal = predicate.literal().clone(); + let literal = get_arrow_datum(&literal)?; match predicate.op() { PredicateOperator::LessThan => Ok(Box::new(ArrowPredicateFn::new( self.projection_mask.clone(), move |batch| { let left = batch.column(term_index); - let literal = get_arrow_datum(&literal); lt(left, literal.as_ref()) }, ))), @@ -419,7 +413,6 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { self.projection_mask.clone(), move |batch| { let left = batch.column(term_index); - let literal = get_arrow_datum(&literal); lt_eq(left, literal.as_ref()) }, ))), @@ -427,7 +420,6 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { self.projection_mask.clone(), move |batch| { let left = batch.column(term_index); - let literal = get_arrow_datum(&literal); gt(left, literal.as_ref()) }, ))), @@ -435,7 +427,6 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { self.projection_mask.clone(), move |batch| { let left = batch.column(term_index); - let literal = get_arrow_datum(&literal); gt_eq(left, literal.as_ref()) }, ))), @@ -443,7 +434,6 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { self.projection_mask.clone(), move |batch| { let left = batch.column(term_index); - let literal = get_arrow_datum(&literal); eq(left, literal.as_ref()) }, ))), @@ -451,7 +441,6 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { self.projection_mask.clone(), move |batch| { let left = batch.column(term_index); - let literal = get_arrow_datum(&literal); neq(left, literal.as_ref()) }, ))), diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index eb32ece39..6eea9eaf7 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -33,7 +33,7 @@ pub struct TableScanBuilder<'a> { table: &'a Table, // Empty column names means to select all columns column_names: Vec, - predicates: Vec, + predicates: Option, snapshot_id: Option, batch_size: Option, } @@ -43,7 +43,7 @@ impl<'a> TableScanBuilder<'a> { Self { table, column_names: vec![], - predicates: vec![], + predicates: None, snapshot_id: None, batch_size: None, } @@ -64,7 +64,7 @@ impl<'a> TableScanBuilder<'a> { /// Add a predicate to the scan. The scan will only return rows that match the predicate. pub fn filter(mut self, predicate: Predicate) -> Self { - self.predicates.push(predicate); + self.predicates = Some(predicate); self } @@ -127,10 +127,11 @@ impl<'a> TableScanBuilder<'a> { } } - let mut bound_predicates = Vec::new(); - for predicate in self.predicates { - bound_predicates.push(predicate.bind(schema.clone(), true)?); - } + let bound_predicates = if let Some(ref predicates) = self.predicates { + Some(predicates.bind(schema.clone(), true)?) + } else { + None + }; Ok(TableScan { snapshot, @@ -152,7 +153,7 @@ pub struct TableScan { table_metadata: TableMetadataRef, file_io: FileIO, column_names: Vec, - bound_predicates: Vec, + bound_predicates: Option, schema: SchemaRef, batch_size: Option, } @@ -250,7 +251,9 @@ impl TableScan { arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); } - arrow_reader_builder = arrow_reader_builder.with_predicates(self.bound_predicates.clone()); + if let Some(ref bound_predicates) = self.bound_predicates { + arrow_reader_builder = arrow_reader_builder.with_predicates(bound_predicates.clone()); + } arrow_reader_builder.build().read(self.plan_files().await?) } From f68a556d5e18547b7b8d850bd3465f2e20ad6d06 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 7 Apr 2024 14:29:49 -0700 Subject: [PATCH 05/17] Add test for CollectFieldIdVisitor --- crates/iceberg/src/arrow/reader.rs | 64 ++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 1541b878b..e5e83b709 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -520,3 +520,67 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { Ok(column_idx) } } + +#[cfg(test)] +mod tests { + use crate::arrow::reader::CollectFieldIdVisitor; + use crate::expr::{visit_predicate, Bind, Reference}; + use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type}; + use std::sync::Arc; + + fn table_schema_simple() -> SchemaRef { + Arc::new( + Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + NestedField::optional(4, "qux", Type::Primitive(PrimitiveType::Float)).into(), + ]) + .build() + .unwrap(), + ) + } + + #[test] + fn test_collect_field_id() { + let schema = table_schema_simple(); + let expr = Reference::new("qux").is_null(); + let bound_expr = expr.bind(schema, true).unwrap(); + + let mut visitor = CollectFieldIdVisitor { field_ids: vec![] }; + visit_predicate(&mut visitor, &bound_expr).unwrap(); + + assert_eq!(visitor.field_ids, vec![4]); + } + + #[test] + fn test_collect_field_id_with_and() { + let schema = table_schema_simple(); + let expr = Reference::new("qux") + .is_null() + .and(Reference::new("baz").is_null()); + let bound_expr = expr.bind(schema, true).unwrap(); + + let mut visitor = CollectFieldIdVisitor { field_ids: vec![] }; + visit_predicate(&mut visitor, &bound_expr).unwrap(); + + assert_eq!(visitor.field_ids, vec![4, 3]); + } + + #[test] + fn test_collect_field_id_with_or() { + let schema = table_schema_simple(); + let expr = Reference::new("qux") + .is_null() + .or(Reference::new("baz").is_null()); + let bound_expr = expr.bind(schema, true).unwrap(); + + let mut visitor = CollectFieldIdVisitor { field_ids: vec![] }; + visit_predicate(&mut visitor, &bound_expr).unwrap(); + + assert_eq!(visitor.field_ids, vec![4, 3]); + } +} From 733e0ca2dcc8f3660e569a721be0a8c92ac19334 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 7 Apr 2024 17:27:29 -0700 Subject: [PATCH 06/17] Return projection_mask for leaf column --- crates/iceberg/src/arrow/reader.rs | 104 ++++++++++++++++++----------- 1 file changed, 66 insertions(+), 38 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index e5e83b709..1a6051459 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -20,10 +20,11 @@ use crate::error::Result; use arrow_arith::boolean::{and, is_not_null, is_null, not, or}; use arrow_array::{ - BooleanArray, Datum as ArrowDatum, Float32Array, Float64Array, Int32Array, Int64Array, + ArrayRef, BooleanArray, Datum as ArrowDatum, Float32Array, Float64Array, Int32Array, + Int64Array, StructArray, }; use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; -use arrow_schema::SchemaRef as ArrowSchemaRef; +use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef}; use async_stream::try_stream; use bitvec::macros::internal::funty::Fundamental; use futures::stream::StreamExt; @@ -331,10 +332,15 @@ impl BoundPredicateVisitor for CollectFieldIdVisitor { } } +/// A visitor to convert Iceberg bound predicates to Arrow predicates. struct PredicateConverter<'a> { + /// The leaf column indices used in the predicates. pub columns: &'a Vec, + /// The projection mask for the Arrow predicates. pub projection_mask: ProjectionMask, + /// The Parquet schema descriptor. pub parquet_schema: &'a SchemaDescriptor, + /// The map between field id and leaf column index in Parquet schema. pub column_map: &'a HashMap, } @@ -352,9 +358,28 @@ fn get_arrow_datum(datum: &Datum) -> Result> { } } +/// Recursively get the leaf column from the record batch. Assume that the nested columns in +/// struct is projected to a single column. +fn get_leaf_column(column: &ArrayRef) -> std::result::Result { + match column.data_type() { + DataType::Struct(fields) => { + if fields.len() != 1 { + return Err(ArrowError::SchemaError( + "Struct column should have only one field after projection" + .parse() + .unwrap(), + )); + } + let struct_array = column.as_any().downcast_ref::().unwrap(); + get_leaf_column(struct_array.column(0)) + } + _ => Ok(column.clone()), + } +} + impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { type T = Box; - type U = usize; + type U = ProjectionMask; fn visit_always_true(&mut self) -> Result { Ok(Box::new(ArrowPredicateFn::new( @@ -371,77 +396,76 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { } fn visit_unary(&mut self, predicate: &UnaryExpression) -> Result { - let term_index = self.bound_reference(predicate.term())?; + let projected_mask = self.bound_reference(predicate.term())?; match predicate.op() { PredicateOperator::IsNull => Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), + projected_mask, move |batch| { - let column = batch.column(term_index); - is_null(column) + let column = get_leaf_column(batch.column(0))?; + is_null(&column) }, ))), PredicateOperator::NotNull => Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), + projected_mask, move |batch| { - let column = batch.column(term_index); - is_not_null(column) + let column = get_leaf_column(batch.column(0))?; + is_not_null(&column) }, ))), // Unsupported operators, return always true. - _ => Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), - |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), - ))), + _ => Ok(Box::new(ArrowPredicateFn::new(projected_mask, |batch| { + Ok(BooleanArray::from(vec![true; batch.num_rows()])) + }))), } } fn visit_binary(&mut self, predicate: &BinaryExpression) -> Result { - let term_index = self.bound_reference(predicate.term())?; + let projected_mask = self.bound_reference(predicate.term())?; let literal = predicate.literal().clone(); let literal = get_arrow_datum(&literal)?; match predicate.op() { PredicateOperator::LessThan => Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), + projected_mask, move |batch| { - let left = batch.column(term_index); - lt(left, literal.as_ref()) + let left = get_leaf_column(batch.column(0))?; + lt(&left, literal.as_ref()) }, ))), PredicateOperator::LessThanOrEq => Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), + projected_mask, move |batch| { - let left = batch.column(term_index); - lt_eq(left, literal.as_ref()) + let left = get_leaf_column(batch.column(0))?; + lt_eq(&left, literal.as_ref()) }, ))), PredicateOperator::GreaterThan => Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), + projected_mask, move |batch| { - let left = batch.column(term_index); - gt(left, literal.as_ref()) + let left = get_leaf_column(batch.column(0))?; + gt(&left, literal.as_ref()) }, ))), PredicateOperator::GreaterThanOrEq => Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), + projected_mask, move |batch| { - let left = batch.column(term_index); - gt_eq(left, literal.as_ref()) + let left = get_leaf_column(batch.column(0))?; + gt_eq(&left, literal.as_ref()) }, ))), PredicateOperator::Eq => Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), + projected_mask, move |batch| { - let left = batch.column(term_index); - eq(left, literal.as_ref()) + let left = get_leaf_column(batch.column(0))?; + eq(&left, literal.as_ref()) }, ))), PredicateOperator::NotEq => Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), + projected_mask, move |batch| { - let left = batch.column(term_index); - neq(left, literal.as_ref()) + let left = get_leaf_column(batch.column(0))?; + neq(&left, literal.as_ref()) }, ))), // Unsupported operators, return always true. @@ -495,7 +519,10 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { ))) } + /// When visiting a bound reference, we return the projection mask for the leaf column + /// which is used to project the column in the record batch. fn bound_reference(&mut self, reference: &BoundReference) -> Result { + // The leaf column's index in Parquet schema. let column_idx = self.column_map.get(&reference.field().id).ok_or_else(|| { Error::new( ErrorKind::DataInvalid, @@ -503,21 +530,22 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { ) })?; - let root_col_index = self.parquet_schema.get_column_root_idx(*column_idx); - // Find the column index in projection mask. let column_idx = self .columns .iter() - .position(|&x| x == root_col_index) + .position(|&x| x == *column_idx) .ok_or_else(|| { Error::new( ErrorKind::DataInvalid, - format!("Column index {} not found in schema", root_col_index), + format!("Column index {} not found in schema", *column_idx), ) })?; - Ok(column_idx) + Ok(ProjectionMask::leaves( + self.parquet_schema, + vec![self.columns[column_idx]], + )) } } From 732d43fddd9f4cc92975b7a29032dfe3295d7be4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 27 Apr 2024 18:45:27 -0700 Subject: [PATCH 07/17] Update --- crates/iceberg/src/arrow/reader.rs | 588 +++++++++++++++++++-------- crates/iceberg/src/expr/predicate.rs | 88 ---- 2 files changed, 412 insertions(+), 264 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 1a6051459..768f4abfa 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -27,6 +27,7 @@ use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef}; use async_stream::try_stream; use bitvec::macros::internal::funty::Fundamental; +use fnv::FnvHashSet; use futures::stream::StreamExt; use parquet::arrow::arrow_reader::{ArrowPredicate, ArrowPredicateFn, RowFilter}; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY}; @@ -35,10 +36,8 @@ use std::collections::HashMap; use std::str::FromStr; use crate::arrow::arrow_schema_to_schema; -use crate::expr::{ - visit_predicate, BinaryExpression, BoundPredicate, BoundPredicateVisitor, BoundReference, - PredicateOperator, SetExpression, UnaryExpression, -}; +use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; +use crate::expr::{BoundPredicate, BoundReference}; use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream}; use crate::spec::{Datum, PrimitiveLiteral, SchemaRef}; @@ -219,11 +218,11 @@ impl ArrowReader { fn get_row_filter(&self, parquet_schema: &SchemaDescriptor) -> Result> { if let Some(predicates) = &self.predicates { - let field_id_map = self.build_field_id_map(parquet_schema)?; + let field_id_map = build_field_id_map(parquet_schema)?; // Collect Parquet column indices from field ids let mut collector = CollectFieldIdVisitor { field_ids: vec![] }; - visit_predicate(&mut collector, predicates).unwrap(); + visit(&mut collector, predicates).unwrap(); let column_indices = collector .field_ids .iter() @@ -241,45 +240,45 @@ impl ArrowReader { parquet_schema, column_map: &field_id_map, }; - let arrow_predicate = visit_predicate(&mut converter, predicates)?; + let arrow_predicate = visit(&mut converter, predicates)?; Ok(Some(RowFilter::new(vec![arrow_predicate]))) } else { Ok(None) } } +} - /// Build the map of field id to Parquet column index in the schema. - fn build_field_id_map(&self, parquet_schema: &SchemaDescriptor) -> Result> { - let mut column_map = HashMap::new(); - for (idx, field) in parquet_schema.columns().iter().enumerate() { - let field_type = field.self_type(); - match field_type { - ParquetType::PrimitiveType { basic_info, .. } => { - if !basic_info.has_id() { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Leave column {:?} in schema doesn't have field id", - field_type - ), - )); - } - column_map.insert(basic_info.id(), idx); - } - ParquetType::GroupType { .. } => { +/// Build the map of field id to Parquet column index in the schema. +fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result> { + let mut column_map = HashMap::new(); + for (idx, field) in parquet_schema.columns().iter().enumerate() { + let field_type = field.self_type(); + match field_type { + ParquetType::PrimitiveType { basic_info, .. } => { + if !basic_info.has_id() { return Err(Error::new( ErrorKind::DataInvalid, format!( - "Leave column in schema should be primitive type but got {:?}", + "Leave column {:?} in schema doesn't have field id", field_type ), )); } - }; - } - - Ok(column_map) + column_map.insert(basic_info.id(), idx); + } + ParquetType::GroupType { .. } => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Leave column in schema should be primitive type but got {:?}", + field_type + ), + )); + } + }; } + + Ok(column_map) } /// A visitor to collect field ids from bound predicates. @@ -289,44 +288,159 @@ struct CollectFieldIdVisitor { impl BoundPredicateVisitor for CollectFieldIdVisitor { type T = (); - type U = (); - fn and(&mut self, _predicates: Vec) -> Result { + fn always_true(&mut self) -> Result { + Ok(()) + } + + fn always_false(&mut self) -> Result { + Ok(()) + } + + fn and(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result { + Ok(()) + } + + fn or(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result { + Ok(()) + } + + fn not(&mut self, _inner: Self::T) -> Result { + Ok(()) + } + + fn is_null( + &mut self, + reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> Result { + self.field_ids.push(reference.field().id); + Ok(()) + } + + fn not_null( + &mut self, + reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> Result { + self.field_ids.push(reference.field().id); + Ok(()) + } + + fn is_nan( + &mut self, + reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> Result { + self.field_ids.push(reference.field().id); + Ok(()) + } + + fn not_nan( + &mut self, + reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> Result { + self.field_ids.push(reference.field().id); + Ok(()) + } + + fn less_than( + &mut self, + reference: &BoundReference, + _literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + self.field_ids.push(reference.field().id); + Ok(()) + } + + fn less_than_or_eq( + &mut self, + reference: &BoundReference, + _literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + self.field_ids.push(reference.field().id); Ok(()) } - fn or(&mut self, _predicates: Vec) -> Result { + fn greater_than( + &mut self, + reference: &BoundReference, + _literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + self.field_ids.push(reference.field().id); Ok(()) } - fn not(&mut self, _predicate: Self::T) -> Result { + fn greater_than_or_eq( + &mut self, + reference: &BoundReference, + _literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + self.field_ids.push(reference.field().id); Ok(()) } - fn visit_always_true(&mut self) -> Result { + fn eq( + &mut self, + reference: &BoundReference, + _literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + self.field_ids.push(reference.field().id); Ok(()) } - fn visit_always_false(&mut self) -> Result { + fn not_eq( + &mut self, + reference: &BoundReference, + _literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + self.field_ids.push(reference.field().id); Ok(()) } - fn visit_unary(&mut self, predicate: &UnaryExpression) -> Result { - self.bound_reference(predicate.term())?; + fn starts_with( + &mut self, + reference: &BoundReference, + _literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + self.field_ids.push(reference.field().id); Ok(()) } - fn visit_binary(&mut self, predicate: &BinaryExpression) -> Result { - self.bound_reference(predicate.term())?; + fn not_starts_with( + &mut self, + reference: &BoundReference, + _literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + self.field_ids.push(reference.field().id); Ok(()) } - fn visit_set(&mut self, predicate: &SetExpression) -> Result { - self.bound_reference(predicate.term())?; + fn r#in( + &mut self, + reference: &BoundReference, + _literals: &FnvHashSet, + _predicate: &BoundPredicate, + ) -> Result { + self.field_ids.push(reference.field().id); Ok(()) } - fn bound_reference(&mut self, reference: &BoundReference) -> Result { + fn not_in( + &mut self, + reference: &BoundReference, + _literals: &FnvHashSet, + _predicate: &BoundPredicate, + ) -> Result { self.field_ids.push(reference.field().id); Ok(()) } @@ -344,6 +458,37 @@ struct PredicateConverter<'a> { pub column_map: &'a HashMap, } +impl PredicateConverter<'_> { + /// When visiting a bound reference, we return the projection mask for the leaf column + /// which is used to project the column in the record batch. + fn bound_reference(&mut self, reference: &BoundReference) -> Result { + // The leaf column's index in Parquet schema. + let column_idx = self.column_map.get(&reference.field().id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Field id {} not found in schema", reference.field().id), + ) + })?; + + // Find the column index in projection mask. + let column_idx = self + .columns + .iter() + .position(|&x| x == *column_idx) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Column index {} not found in schema", *column_idx), + ) + })?; + + Ok(ProjectionMask::leaves( + self.parquet_schema, + vec![self.columns[column_idx]], + )) + } +} + fn get_arrow_datum(datum: &Datum) -> Result> { match datum.literal() { PrimitiveLiteral::Boolean(value) => Ok(Box::new(BooleanArray::new_scalar(*value))), @@ -379,180 +524,271 @@ fn get_leaf_column(column: &ArrayRef) -> std::result::Result BoundPredicateVisitor for PredicateConverter<'a> { type T = Box; - type U = ProjectionMask; - fn visit_always_true(&mut self) -> Result { + fn always_true(&mut self) -> Result { Ok(Box::new(ArrowPredicateFn::new( self.projection_mask.clone(), |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), ))) } - fn visit_always_false(&mut self) -> Result { + fn always_false(&mut self) -> Result { Ok(Box::new(ArrowPredicateFn::new( self.projection_mask.clone(), |batch| Ok(BooleanArray::from(vec![false; batch.num_rows()])), ))) } - fn visit_unary(&mut self, predicate: &UnaryExpression) -> Result { - let projected_mask = self.bound_reference(predicate.term())?; - - match predicate.op() { - PredicateOperator::IsNull => Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let column = get_leaf_column(batch.column(0))?; - is_null(&column) - }, - ))), - PredicateOperator::NotNull => Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let column = get_leaf_column(batch.column(0))?; - is_not_null(&column) - }, - ))), - // Unsupported operators, return always true. - _ => Ok(Box::new(ArrowPredicateFn::new(projected_mask, |batch| { - Ok(BooleanArray::from(vec![true; batch.num_rows()])) - }))), - } + fn and(&mut self, mut lhs: Self::T, mut rhs: Self::T) -> Result { + Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + move |batch| { + let left = lhs.evaluate(batch.clone())?; + let right = rhs.evaluate(batch)?; + and(&left, &right) + }, + ))) } - fn visit_binary(&mut self, predicate: &BinaryExpression) -> Result { - let projected_mask = self.bound_reference(predicate.term())?; - let literal = predicate.literal().clone(); - let literal = get_arrow_datum(&literal)?; - - match predicate.op() { - PredicateOperator::LessThan => Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let left = get_leaf_column(batch.column(0))?; - lt(&left, literal.as_ref()) - }, - ))), - PredicateOperator::LessThanOrEq => Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let left = get_leaf_column(batch.column(0))?; - lt_eq(&left, literal.as_ref()) - }, - ))), - PredicateOperator::GreaterThan => Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let left = get_leaf_column(batch.column(0))?; - gt(&left, literal.as_ref()) - }, - ))), - PredicateOperator::GreaterThanOrEq => Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let left = get_leaf_column(batch.column(0))?; - gt_eq(&left, literal.as_ref()) - }, - ))), - PredicateOperator::Eq => Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let left = get_leaf_column(batch.column(0))?; - eq(&left, literal.as_ref()) - }, - ))), - PredicateOperator::NotEq => Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let left = get_leaf_column(batch.column(0))?; - neq(&left, literal.as_ref()) - }, - ))), - // Unsupported operators, return always true. - _ => Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), - |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), - ))), - } + fn or(&mut self, mut lhs: Self::T, mut rhs: Self::T) -> Result { + Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + move |batch| { + let left = lhs.evaluate(batch.clone())?; + let right = rhs.evaluate(batch)?; + or(&left, &right) + }, + ))) } - fn visit_set(&mut self, predicate: &SetExpression) -> Result { - #[allow(clippy::match_single_binding)] - match predicate.op() { - // Unsupported operators, return always true. - _ => Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), - |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), - ))), - } + fn not(&mut self, mut inner: Self::T) -> Result { + Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + move |batch| { + let pred_ret = inner.evaluate(batch)?; + not(&pred_ret) + }, + ))) } - fn and(&mut self, mut predicates: Vec) -> Result { + fn is_null( + &mut self, + reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> Result { + let projected_mask = self.bound_reference(reference)?; + Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), + projected_mask, move |batch| { - let left = predicates.get_mut(0).unwrap().evaluate(batch.clone())?; - let right = predicates.get_mut(1).unwrap().evaluate(batch)?; - and(&left, &right) + let column = get_leaf_column(batch.column(0))?; + is_null(&column) }, ))) } - fn or(&mut self, mut predicates: Vec) -> Result { + fn not_null( + &mut self, + reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> Result { + let projected_mask = self.bound_reference(reference)?; + Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), + projected_mask, move |batch| { - let left = predicates.get_mut(0).unwrap().evaluate(batch.clone())?; - let right = predicates.get_mut(1).unwrap().evaluate(batch)?; - or(&left, &right) + let column = get_leaf_column(batch.column(0))?; + is_not_null(&column) }, ))) } - fn not(&mut self, mut predicate: Self::T) -> Result { + fn is_nan( + &mut self, + reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> Result { + let projected_mask = self.bound_reference(reference)?; + + Ok(Box::new(ArrowPredicateFn::new(projected_mask, |batch| { + Ok(BooleanArray::from(vec![true; batch.num_rows()])) + }))) + } + + fn not_nan( + &mut self, + reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> Result { + let projected_mask = self.bound_reference(reference)?; + + Ok(Box::new(ArrowPredicateFn::new(projected_mask, |batch| { + Ok(BooleanArray::from(vec![true; batch.num_rows()])) + }))) + } + + fn less_than( + &mut self, + reference: &BoundReference, + literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + let projected_mask = self.bound_reference(reference)?; + let literal = get_arrow_datum(literal)?; + Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), + projected_mask, move |batch| { - let evaluated = predicate.evaluate(batch.clone())?; - not(&evaluated) + let left = get_leaf_column(batch.column(0))?; + lt(&left, literal.as_ref()) }, ))) } - /// When visiting a bound reference, we return the projection mask for the leaf column - /// which is used to project the column in the record batch. - fn bound_reference(&mut self, reference: &BoundReference) -> Result { - // The leaf column's index in Parquet schema. - let column_idx = self.column_map.get(&reference.field().id).ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Field id {} not found in schema", reference.field().id), - ) - })?; + fn less_than_or_eq( + &mut self, + reference: &BoundReference, + literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + let projected_mask = self.bound_reference(reference)?; + let literal = get_arrow_datum(literal)?; - // Find the column index in projection mask. - let column_idx = self - .columns - .iter() - .position(|&x| x == *column_idx) - .ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Column index {} not found in schema", *column_idx), - ) - })?; + Ok(Box::new(ArrowPredicateFn::new( + projected_mask, + move |batch| { + let left = get_leaf_column(batch.column(0))?; + lt_eq(&left, literal.as_ref()) + }, + ))) + } - Ok(ProjectionMask::leaves( - self.parquet_schema, - vec![self.columns[column_idx]], - )) + fn greater_than( + &mut self, + reference: &BoundReference, + literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + let projected_mask = self.bound_reference(reference)?; + let literal = get_arrow_datum(literal)?; + + Ok(Box::new(ArrowPredicateFn::new( + projected_mask, + move |batch| { + let left = get_leaf_column(batch.column(0))?; + gt(&left, literal.as_ref()) + }, + ))) + } + + fn greater_than_or_eq( + &mut self, + reference: &BoundReference, + literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + let projected_mask = self.bound_reference(reference)?; + let literal = get_arrow_datum(literal)?; + + Ok(Box::new(ArrowPredicateFn::new( + projected_mask, + move |batch| { + let left = get_leaf_column(batch.column(0))?; + gt_eq(&left, literal.as_ref()) + }, + ))) + } + + fn eq( + &mut self, + reference: &BoundReference, + literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + let projected_mask = self.bound_reference(reference)?; + let literal = get_arrow_datum(literal)?; + + Ok(Box::new(ArrowPredicateFn::new( + projected_mask, + move |batch| { + let left = get_leaf_column(batch.column(0))?; + eq(&left, literal.as_ref()) + }, + ))) + } + + fn not_eq( + &mut self, + reference: &BoundReference, + literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + let projected_mask = self.bound_reference(reference)?; + let literal = get_arrow_datum(literal)?; + + Ok(Box::new(ArrowPredicateFn::new( + projected_mask, + move |batch| { + let left = get_leaf_column(batch.column(0))?; + neq(&left, literal.as_ref()) + }, + ))) + } + + fn starts_with( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), + ))) + } + + fn not_starts_with( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), + ))) + } + + fn r#in( + &mut self, + _reference: &BoundReference, + _literals: &FnvHashSet, + _predicate: &BoundPredicate, + ) -> Result { + Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), + ))) + } + + fn not_in( + &mut self, + _reference: &BoundReference, + _literals: &FnvHashSet, + _predicate: &BoundPredicate, + ) -> Result { + Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), + ))) } } #[cfg(test)] mod tests { use crate::arrow::reader::CollectFieldIdVisitor; - use crate::expr::{visit_predicate, Bind, Reference}; + use crate::expr::visitors::bound_predicate_visitor::visit; + use crate::expr::{Bind, Reference}; use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type}; use std::sync::Arc; @@ -579,7 +815,7 @@ mod tests { let bound_expr = expr.bind(schema, true).unwrap(); let mut visitor = CollectFieldIdVisitor { field_ids: vec![] }; - visit_predicate(&mut visitor, &bound_expr).unwrap(); + visit(&mut visitor, &bound_expr).unwrap(); assert_eq!(visitor.field_ids, vec![4]); } @@ -593,7 +829,7 @@ mod tests { let bound_expr = expr.bind(schema, true).unwrap(); let mut visitor = CollectFieldIdVisitor { field_ids: vec![] }; - visit_predicate(&mut visitor, &bound_expr).unwrap(); + visit(&mut visitor, &bound_expr).unwrap(); assert_eq!(visitor.field_ids, vec![4, 3]); } @@ -607,7 +843,7 @@ mod tests { let bound_expr = expr.bind(schema, true).unwrap(); let mut visitor = CollectFieldIdVisitor { field_ids: vec![] }; - visit_predicate(&mut visitor, &bound_expr).unwrap(); + visit(&mut visitor, &bound_expr).unwrap(); assert_eq!(visitor.field_ids, vec![4, 3]); } diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index bf8e40219..158ab135b 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -673,94 +673,6 @@ impl Display for BoundPredicate { } } -/// Visit a bound predicate. -pub fn visit_predicate( - visitor: &mut V, - predicate: &BoundPredicate, -) -> Result { - match predicate { - BoundPredicate::And(predicates) => visit_and(visitor, predicates.inputs()), - BoundPredicate::Or(predicates) => visit_or(visitor, predicates.inputs()), - BoundPredicate::Not(predicate) => visit_not(visitor, predicate.inputs()), - BoundPredicate::AlwaysTrue => visitor.visit_always_true(), - BoundPredicate::AlwaysFalse => visitor.visit_always_false(), - BoundPredicate::Unary(unary) => visitor.visit_unary(unary), - BoundPredicate::Binary(binary) => visitor.visit_binary(binary), - BoundPredicate::Set(set) => visitor.visit_set(set), - } -} - -/// Visit an AND predicate. -fn visit_and( - visitor: &mut V, - predicates: [&BoundPredicate; 2], -) -> Result { - let mut results = Vec::with_capacity(predicates.len()); - for predicate in predicates { - let result = visit_predicate(visitor, predicate)?; - results.push(result); - } - visitor.and(results) -} - -/// Visit an OR predicate. -fn visit_or( - visitor: &mut V, - predicates: [&BoundPredicate; 2], -) -> Result { - let mut results = Vec::with_capacity(predicates.len()); - for predicate in predicates { - let result = visit_predicate(visitor, predicate)?; - results.push(result); - } - visitor.or(results) -} - -/// Visit a NOT predicate. -fn visit_not( - visitor: &mut V, - predicate: [&BoundPredicate; 1], -) -> Result { - let result = visit_predicate(visitor, predicate.first().unwrap())?; - visitor.not(result) -} - -/// A visitor for bound predicates. -pub trait BoundPredicateVisitor { - /// Return type of this visitor on bound predicate. - type T; - - /// Return type of this visitor on bound reference. - type U; - - /// Visit an always true predicate. - fn visit_always_true(&mut self) -> Result; - - /// Visit an always false predicate. - fn visit_always_false(&mut self) -> Result; - - /// Visit a unary predicate. - fn visit_unary(&mut self, predicate: &UnaryExpression) -> Result; - - /// Visit a binary predicate. - fn visit_binary(&mut self, predicate: &BinaryExpression) -> Result; - - /// Visit a set predicate. - fn visit_set(&mut self, predicate: &SetExpression) -> Result; - - /// Called after visiting predicates of AND. - fn and(&mut self, predicates: Vec) -> Result; - - /// Called after visiting predicates of OR. - fn or(&mut self, predicates: Vec) -> Result; - - /// Called after visiting predicates of NOT. - fn not(&mut self, predicate: Self::T) -> Result; - - /// Visit a bound reference. - fn bound_reference(&mut self, reference: &BoundReference) -> Result; -} - #[cfg(test)] mod tests { use std::ops::Not; From 89e3aa66412b02fdbc547f77a13b929894455e0a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 27 Apr 2024 22:08:43 -0700 Subject: [PATCH 08/17] For review --- crates/iceberg/src/arrow/reader.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 768f4abfa..b69677111 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -111,6 +111,12 @@ impl ArrowReader { pub fn read(self, mut tasks: FileScanTaskStream) -> crate::Result { let file_io = self.file_io.clone(); + // Collect Parquet column indices from field ids + let mut collector = CollectFieldIdVisitor { field_ids: vec![] }; + if let Some(predicates) = &self.predicates { + visit(&mut collector, predicates)?; + } + Ok(try_stream! { while let Some(Ok(task)) = tasks.next().await { let parquet_reader = file_io @@ -127,7 +133,7 @@ impl ArrowReader { batch_stream_builder = batch_stream_builder.with_projection(projection_mask); let parquet_schema = batch_stream_builder.parquet_schema(); - let row_filter = self.get_row_filter(parquet_schema)?; + let row_filter = self.get_row_filter(parquet_schema, &collector)?; if let Some(row_filter) = row_filter { batch_stream_builder = batch_stream_builder.with_row_filter(row_filter); @@ -216,13 +222,14 @@ impl ArrowReader { } } - fn get_row_filter(&self, parquet_schema: &SchemaDescriptor) -> Result> { + fn get_row_filter( + &self, + parquet_schema: &SchemaDescriptor, + collector: &CollectFieldIdVisitor, + ) -> Result> { if let Some(predicates) = &self.predicates { let field_id_map = build_field_id_map(parquet_schema)?; - // Collect Parquet column indices from field ids - let mut collector = CollectFieldIdVisitor { field_ids: vec![] }; - visit(&mut collector, predicates).unwrap(); let column_indices = collector .field_ids .iter() From e06a5b900a1b9c6510bc8939e2982e983feb4501 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 28 Apr 2024 12:02:58 -0700 Subject: [PATCH 09/17] For review --- crates/iceberg/src/arrow/reader.rs | 194 ++++++++++++++--------------- crates/iceberg/src/arrow/schema.rs | 26 +++- 2 files changed, 119 insertions(+), 101 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index b69677111..f735a84c7 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -19,28 +19,24 @@ use crate::error::Result; use arrow_arith::boolean::{and, is_not_null, is_null, not, or}; -use arrow_array::{ - ArrayRef, BooleanArray, Datum as ArrowDatum, Float32Array, Float64Array, Int32Array, - Int64Array, StructArray, -}; +use arrow_array::{ArrayRef, BooleanArray, StructArray}; use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef}; use async_stream::try_stream; -use bitvec::macros::internal::funty::Fundamental; use fnv::FnvHashSet; use futures::stream::StreamExt; use parquet::arrow::arrow_reader::{ArrowPredicate, ArrowPredicateFn, RowFilter}; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY}; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::str::FromStr; -use crate::arrow::arrow_schema_to_schema; +use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; use crate::expr::{BoundPredicate, BoundReference}; use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream}; -use crate::spec::{Datum, PrimitiveLiteral, SchemaRef}; +use crate::spec::{Datum, SchemaRef}; use crate::{Error, ErrorKind}; /// Builder to create ArrowReader @@ -112,7 +108,9 @@ impl ArrowReader { let file_io = self.file_io.clone(); // Collect Parquet column indices from field ids - let mut collector = CollectFieldIdVisitor { field_ids: vec![] }; + let mut collector = CollectFieldIdVisitor { + field_ids: HashSet::default(), + }; if let Some(predicates) = &self.predicates { visit(&mut collector, predicates)?; } @@ -266,7 +264,9 @@ fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result Result, + field_ids: HashSet, } impl BoundPredicateVisitor for CollectFieldIdVisitor { type T = (); - fn always_true(&mut self) -> Result { + fn always_true(&mut self) -> Result<()> { Ok(()) } - fn always_false(&mut self) -> Result { + fn always_false(&mut self) -> Result<()> { Ok(()) } - fn and(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result { + fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> { Ok(()) } - fn or(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result { + fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> { Ok(()) } - fn not(&mut self, _inner: Self::T) -> Result { + fn not(&mut self, _inner: ()) -> Result<()> { Ok(()) } - fn is_null( - &mut self, - reference: &BoundReference, - _predicate: &BoundPredicate, - ) -> Result { - self.field_ids.push(reference.field().id); + fn is_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> { + self.field_ids.insert(reference.field().id); Ok(()) } - fn not_null( - &mut self, - reference: &BoundReference, - _predicate: &BoundPredicate, - ) -> Result { - self.field_ids.push(reference.field().id); + fn not_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> { + self.field_ids.insert(reference.field().id); Ok(()) } - fn is_nan( - &mut self, - reference: &BoundReference, - _predicate: &BoundPredicate, - ) -> Result { - self.field_ids.push(reference.field().id); + fn is_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> { + self.field_ids.insert(reference.field().id); Ok(()) } - fn not_nan( - &mut self, - reference: &BoundReference, - _predicate: &BoundPredicate, - ) -> Result { - self.field_ids.push(reference.field().id); + fn not_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> { + self.field_ids.insert(reference.field().id); Ok(()) } @@ -357,8 +341,8 @@ impl BoundPredicateVisitor for CollectFieldIdVisitor { reference: &BoundReference, _literal: &Datum, _predicate: &BoundPredicate, - ) -> Result { - self.field_ids.push(reference.field().id); + ) -> Result<()> { + self.field_ids.insert(reference.field().id); Ok(()) } @@ -367,8 +351,8 @@ impl BoundPredicateVisitor for CollectFieldIdVisitor { reference: &BoundReference, _literal: &Datum, _predicate: &BoundPredicate, - ) -> Result { - self.field_ids.push(reference.field().id); + ) -> Result<()> { + self.field_ids.insert(reference.field().id); Ok(()) } @@ -377,8 +361,8 @@ impl BoundPredicateVisitor for CollectFieldIdVisitor { reference: &BoundReference, _literal: &Datum, _predicate: &BoundPredicate, - ) -> Result { - self.field_ids.push(reference.field().id); + ) -> Result<()> { + self.field_ids.insert(reference.field().id); Ok(()) } @@ -387,8 +371,8 @@ impl BoundPredicateVisitor for CollectFieldIdVisitor { reference: &BoundReference, _literal: &Datum, _predicate: &BoundPredicate, - ) -> Result { - self.field_ids.push(reference.field().id); + ) -> Result<()> { + self.field_ids.insert(reference.field().id); Ok(()) } @@ -397,8 +381,8 @@ impl BoundPredicateVisitor for CollectFieldIdVisitor { reference: &BoundReference, _literal: &Datum, _predicate: &BoundPredicate, - ) -> Result { - self.field_ids.push(reference.field().id); + ) -> Result<()> { + self.field_ids.insert(reference.field().id); Ok(()) } @@ -407,8 +391,8 @@ impl BoundPredicateVisitor for CollectFieldIdVisitor { reference: &BoundReference, _literal: &Datum, _predicate: &BoundPredicate, - ) -> Result { - self.field_ids.push(reference.field().id); + ) -> Result<()> { + self.field_ids.insert(reference.field().id); Ok(()) } @@ -417,8 +401,8 @@ impl BoundPredicateVisitor for CollectFieldIdVisitor { reference: &BoundReference, _literal: &Datum, _predicate: &BoundPredicate, - ) -> Result { - self.field_ids.push(reference.field().id); + ) -> Result<()> { + self.field_ids.insert(reference.field().id); Ok(()) } @@ -427,8 +411,8 @@ impl BoundPredicateVisitor for CollectFieldIdVisitor { reference: &BoundReference, _literal: &Datum, _predicate: &BoundPredicate, - ) -> Result { - self.field_ids.push(reference.field().id); + ) -> Result<()> { + self.field_ids.insert(reference.field().id); Ok(()) } @@ -437,8 +421,8 @@ impl BoundPredicateVisitor for CollectFieldIdVisitor { reference: &BoundReference, _literals: &FnvHashSet, _predicate: &BoundPredicate, - ) -> Result { - self.field_ids.push(reference.field().id); + ) -> Result<()> { + self.field_ids.insert(reference.field().id); Ok(()) } @@ -447,8 +431,8 @@ impl BoundPredicateVisitor for CollectFieldIdVisitor { reference: &BoundReference, _literals: &FnvHashSet, _predicate: &BoundPredicate, - ) -> Result { - self.field_ids.push(reference.field().id); + ) -> Result<()> { + self.field_ids.insert(reference.field().id); Ok(()) } } @@ -496,20 +480,6 @@ impl PredicateConverter<'_> { } } -fn get_arrow_datum(datum: &Datum) -> Result> { - match datum.literal() { - PrimitiveLiteral::Boolean(value) => Ok(Box::new(BooleanArray::new_scalar(*value))), - PrimitiveLiteral::Int(value) => Ok(Box::new(Int32Array::new_scalar(*value))), - PrimitiveLiteral::Long(value) => Ok(Box::new(Int64Array::new_scalar(*value))), - PrimitiveLiteral::Float(value) => Ok(Box::new(Float32Array::new_scalar(value.as_f32()))), - PrimitiveLiteral::Double(value) => Ok(Box::new(Float64Array::new_scalar(value.as_f64()))), - l => Err(Error::new( - ErrorKind::DataInvalid, - format!("Unsupported literal type: {:?}", l), - )), - } -} - /// Recursively get the leaf column from the record batch. Assume that the nested columns in /// struct is projected to a single column. fn get_leaf_column(column: &ArrayRef) -> std::result::Result { @@ -532,21 +502,25 @@ fn get_leaf_column(column: &ArrayRef) -> std::result::Result BoundPredicateVisitor for PredicateConverter<'a> { type T = Box; - fn always_true(&mut self) -> Result { + fn always_true(&mut self) -> Result> { Ok(Box::new(ArrowPredicateFn::new( self.projection_mask.clone(), |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), ))) } - fn always_false(&mut self) -> Result { + fn always_false(&mut self) -> Result> { Ok(Box::new(ArrowPredicateFn::new( self.projection_mask.clone(), |batch| Ok(BooleanArray::from(vec![false; batch.num_rows()])), ))) } - fn and(&mut self, mut lhs: Self::T, mut rhs: Self::T) -> Result { + fn and( + &mut self, + mut lhs: Box, + mut rhs: Box, + ) -> Result> { Ok(Box::new(ArrowPredicateFn::new( self.projection_mask.clone(), move |batch| { @@ -557,7 +531,11 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { ))) } - fn or(&mut self, mut lhs: Self::T, mut rhs: Self::T) -> Result { + fn or( + &mut self, + mut lhs: Box, + mut rhs: Box, + ) -> Result> { Ok(Box::new(ArrowPredicateFn::new( self.projection_mask.clone(), move |batch| { @@ -568,7 +546,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { ))) } - fn not(&mut self, mut inner: Self::T) -> Result { + fn not(&mut self, mut inner: Box) -> Result> { Ok(Box::new(ArrowPredicateFn::new( self.projection_mask.clone(), move |batch| { @@ -582,7 +560,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { &mut self, reference: &BoundReference, _predicate: &BoundPredicate, - ) -> Result { + ) -> Result> { let projected_mask = self.bound_reference(reference)?; Ok(Box::new(ArrowPredicateFn::new( @@ -598,7 +576,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { &mut self, reference: &BoundReference, _predicate: &BoundPredicate, - ) -> Result { + ) -> Result> { let projected_mask = self.bound_reference(reference)?; Ok(Box::new(ArrowPredicateFn::new( @@ -614,7 +592,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { &mut self, reference: &BoundReference, _predicate: &BoundPredicate, - ) -> Result { + ) -> Result> { let projected_mask = self.bound_reference(reference)?; Ok(Box::new(ArrowPredicateFn::new(projected_mask, |batch| { @@ -626,7 +604,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { &mut self, reference: &BoundReference, _predicate: &BoundPredicate, - ) -> Result { + ) -> Result> { let projected_mask = self.bound_reference(reference)?; Ok(Box::new(ArrowPredicateFn::new(projected_mask, |batch| { @@ -639,7 +617,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, literal: &Datum, _predicate: &BoundPredicate, - ) -> Result { + ) -> Result> { let projected_mask = self.bound_reference(reference)?; let literal = get_arrow_datum(literal)?; @@ -657,7 +635,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, literal: &Datum, _predicate: &BoundPredicate, - ) -> Result { + ) -> Result> { let projected_mask = self.bound_reference(reference)?; let literal = get_arrow_datum(literal)?; @@ -675,7 +653,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, literal: &Datum, _predicate: &BoundPredicate, - ) -> Result { + ) -> Result> { let projected_mask = self.bound_reference(reference)?; let literal = get_arrow_datum(literal)?; @@ -693,7 +671,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, literal: &Datum, _predicate: &BoundPredicate, - ) -> Result { + ) -> Result> { let projected_mask = self.bound_reference(reference)?; let literal = get_arrow_datum(literal)?; @@ -711,7 +689,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, literal: &Datum, _predicate: &BoundPredicate, - ) -> Result { + ) -> Result> { let projected_mask = self.bound_reference(reference)?; let literal = get_arrow_datum(literal)?; @@ -729,7 +707,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, literal: &Datum, _predicate: &BoundPredicate, - ) -> Result { + ) -> Result> { let projected_mask = self.bound_reference(reference)?; let literal = get_arrow_datum(literal)?; @@ -747,7 +725,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { _reference: &BoundReference, _literal: &Datum, _predicate: &BoundPredicate, - ) -> Result { + ) -> Result> { Ok(Box::new(ArrowPredicateFn::new( self.projection_mask.clone(), |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), @@ -759,7 +737,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { _reference: &BoundReference, _literal: &Datum, _predicate: &BoundPredicate, - ) -> Result { + ) -> Result> { Ok(Box::new(ArrowPredicateFn::new( self.projection_mask.clone(), |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), @@ -771,7 +749,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { _reference: &BoundReference, _literals: &FnvHashSet, _predicate: &BoundPredicate, - ) -> Result { + ) -> Result> { Ok(Box::new(ArrowPredicateFn::new( self.projection_mask.clone(), |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), @@ -783,7 +761,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { _reference: &BoundReference, _literals: &FnvHashSet, _predicate: &BoundPredicate, - ) -> Result { + ) -> Result> { Ok(Box::new(ArrowPredicateFn::new( self.projection_mask.clone(), |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), @@ -797,6 +775,7 @@ mod tests { use crate::expr::visitors::bound_predicate_visitor::visit; use crate::expr::{Bind, Reference}; use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type}; + use std::collections::HashSet; use std::sync::Arc; fn table_schema_simple() -> SchemaRef { @@ -821,10 +800,15 @@ mod tests { let expr = Reference::new("qux").is_null(); let bound_expr = expr.bind(schema, true).unwrap(); - let mut visitor = CollectFieldIdVisitor { field_ids: vec![] }; + let mut visitor = CollectFieldIdVisitor { + field_ids: HashSet::default(), + }; visit(&mut visitor, &bound_expr).unwrap(); - assert_eq!(visitor.field_ids, vec![4]); + let mut expected = HashSet::default(); + expected.insert(4_i32); + + assert_eq!(visitor.field_ids, expected); } #[test] @@ -835,10 +819,16 @@ mod tests { .and(Reference::new("baz").is_null()); let bound_expr = expr.bind(schema, true).unwrap(); - let mut visitor = CollectFieldIdVisitor { field_ids: vec![] }; + let mut visitor = CollectFieldIdVisitor { + field_ids: HashSet::default(), + }; visit(&mut visitor, &bound_expr).unwrap(); - assert_eq!(visitor.field_ids, vec![4, 3]); + let mut expected = HashSet::default(); + expected.insert(4_i32); + expected.insert(3); + + assert_eq!(visitor.field_ids, expected); } #[test] @@ -849,9 +839,15 @@ mod tests { .or(Reference::new("baz").is_null()); let bound_expr = expr.bind(schema, true).unwrap(); - let mut visitor = CollectFieldIdVisitor { field_ids: vec![] }; + let mut visitor = CollectFieldIdVisitor { + field_ids: HashSet::default(), + }; visit(&mut visitor, &bound_expr).unwrap(); - assert_eq!(visitor.field_ids, vec![4, 3]); + let mut expected = HashSet::default(); + expected.insert(4_i32); + expected.insert(3); + + assert_eq!(visitor.field_ids, expected); } } diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index c7e870096..172d4bb79 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -19,12 +19,16 @@ use crate::error::Result; use crate::spec::{ - ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, SchemaVisitor, - StructType, Type, + Datum, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral, PrimitiveType, Schema, + SchemaVisitor, StructType, Type, }; use crate::{Error, ErrorKind}; use arrow_array::types::{validate_decimal_precision_and_scale, Decimal128Type}; +use arrow_array::{ + BooleanArray, Datum as ArrowDatum, Float32Array, Float64Array, Int32Array, Int64Array, +}; use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit}; +use bitvec::macros::internal::funty::Fundamental; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use rust_decimal::prelude::ToPrimitive; use std::collections::HashMap; @@ -593,6 +597,24 @@ pub fn schema_to_arrow_schema(schema: &crate::spec::Schema) -> crate::Result Result> { + match datum.literal() { + PrimitiveLiteral::Boolean(value) => Ok(Box::new(BooleanArray::new_scalar(*value))), + PrimitiveLiteral::Int(value) => Ok(Box::new(Int32Array::new_scalar(*value))), + PrimitiveLiteral::Long(value) => Ok(Box::new(Int64Array::new_scalar(*value))), + PrimitiveLiteral::Float(value) => Ok(Box::new(Float32Array::new_scalar(value.as_f32()))), + PrimitiveLiteral::Double(value) => Ok(Box::new(Float64Array::new_scalar(value.as_f64()))), + l => Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Converting datum from type {:?} to arrow not supported yet.", + l + ), + )), + } +} + impl TryFrom<&ArrowSchema> for crate::spec::Schema { type Error = Error; From 78f35e61e6c3ebb82d48e3dcc759bbd84e5891d1 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 28 Apr 2024 12:24:20 -0700 Subject: [PATCH 10/17] For review --- crates/iceberg/src/arrow/reader.rs | 279 ++++++++++++++--------------- 1 file changed, 139 insertions(+), 140 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index f735a84c7..c8dbc663e 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -228,20 +228,18 @@ impl ArrowReader { if let Some(predicates) = &self.predicates { let field_id_map = build_field_id_map(parquet_schema)?; + // Collect Parquet column indices from field ids. + // If the field id is not found in Parquet schema, it will be ignored due to schema evolution. let column_indices = collector .field_ids .iter() - .map(|field_id| { - field_id_map.get(field_id).cloned().ok_or_else(|| { - Error::new(ErrorKind::DataInvalid, "Field id not found in schema") - }) - }) - .collect::>>()?; + .map(|field_id| field_id_map.get(field_id).cloned()) + .flatten() + .collect::>(); // Convert BoundPredicates to ArrowPredicates let mut converter = PredicateConverter { - columns: &column_indices, - projection_mask: ProjectionMask::leaves(parquet_schema, column_indices.clone()), + projection_mask: ProjectionMask::leaves(parquet_schema, column_indices), parquet_schema, column_map: &field_id_map, }; @@ -439,8 +437,6 @@ impl BoundPredicateVisitor for CollectFieldIdVisitor { /// A visitor to convert Iceberg bound predicates to Arrow predicates. struct PredicateConverter<'a> { - /// The leaf column indices used in the predicates. - pub columns: &'a Vec, /// The projection mask for the Arrow predicates. pub projection_mask: ProjectionMask, /// The Parquet schema descriptor. @@ -451,33 +447,25 @@ struct PredicateConverter<'a> { impl PredicateConverter<'_> { /// When visiting a bound reference, we return the projection mask for the leaf column - /// which is used to project the column in the record batch. - fn bound_reference(&mut self, reference: &BoundReference) -> Result { + /// which is used to project the column in the record batch. Return None if the field id + /// is not found in the column map, which is possible due to schema evolution. + fn bound_reference(&mut self, reference: &BoundReference) -> Option { // The leaf column's index in Parquet schema. - let column_idx = self.column_map.get(&reference.field().id).ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Field id {} not found in schema", reference.field().id), - ) - })?; - - // Find the column index in projection mask. - let column_idx = self - .columns - .iter() - .position(|&x| x == *column_idx) - .ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Column index {} not found in schema", *column_idx), - ) - })?; + let column_idx = self.column_map.get(&reference.field().id)?; - Ok(ProjectionMask::leaves( + Some(ProjectionMask::leaves( self.parquet_schema, - vec![self.columns[column_idx]], + vec![*column_idx], )) } + + /// Build an Arrow predicate that always returns true. + fn build_always_true(&self) -> Result> { + Ok(Box::new(ArrowPredicateFn::new( + self.projection_mask.clone(), + |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), + ))) + } } /// Recursively get the leaf column from the record batch. Assume that the nested columns in @@ -503,10 +491,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { type T = Box; fn always_true(&mut self) -> Result> { - Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), - |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), - ))) + self.build_always_true() } fn always_false(&mut self) -> Result> { @@ -561,15 +546,17 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, _predicate: &BoundPredicate, ) -> Result> { - let projected_mask = self.bound_reference(reference)?; - - Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let column = get_leaf_column(batch.column(0))?; - is_null(&column) - }, - ))) + if let Some(projected_mask) = self.bound_reference(reference) { + Ok(Box::new(ArrowPredicateFn::new( + projected_mask, + move |batch| { + let column = get_leaf_column(batch.column(0))?; + is_null(&column) + }, + ))) + } else { + self.build_always_true() + } } fn not_null( @@ -577,15 +564,17 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, _predicate: &BoundPredicate, ) -> Result> { - let projected_mask = self.bound_reference(reference)?; - - Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let column = get_leaf_column(batch.column(0))?; - is_not_null(&column) - }, - ))) + if let Some(projected_mask) = self.bound_reference(reference) { + Ok(Box::new(ArrowPredicateFn::new( + projected_mask, + move |batch| { + let column = get_leaf_column(batch.column(0))?; + is_not_null(&column) + }, + ))) + } else { + self.build_always_true() + } } fn is_nan( @@ -593,11 +582,13 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, _predicate: &BoundPredicate, ) -> Result> { - let projected_mask = self.bound_reference(reference)?; - - Ok(Box::new(ArrowPredicateFn::new(projected_mask, |batch| { - Ok(BooleanArray::from(vec![true; batch.num_rows()])) - }))) + if let Some(projected_mask) = self.bound_reference(reference) { + Ok(Box::new(ArrowPredicateFn::new(projected_mask, |batch| { + Ok(BooleanArray::from(vec![true; batch.num_rows()])) + }))) + } else { + self.build_always_true() + } } fn not_nan( @@ -605,11 +596,13 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, _predicate: &BoundPredicate, ) -> Result> { - let projected_mask = self.bound_reference(reference)?; - - Ok(Box::new(ArrowPredicateFn::new(projected_mask, |batch| { - Ok(BooleanArray::from(vec![true; batch.num_rows()])) - }))) + if let Some(projected_mask) = self.bound_reference(reference) { + Ok(Box::new(ArrowPredicateFn::new(projected_mask, |batch| { + Ok(BooleanArray::from(vec![true; batch.num_rows()])) + }))) + } else { + self.build_always_true() + } } fn less_than( @@ -618,16 +611,19 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - let projected_mask = self.bound_reference(reference)?; - let literal = get_arrow_datum(literal)?; - - Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let left = get_leaf_column(batch.column(0))?; - lt(&left, literal.as_ref()) - }, - ))) + if let Some(projected_mask) = self.bound_reference(reference) { + let literal = get_arrow_datum(literal)?; + + Ok(Box::new(ArrowPredicateFn::new( + projected_mask, + move |batch| { + let left = get_leaf_column(batch.column(0))?; + lt(&left, literal.as_ref()) + }, + ))) + } else { + self.build_always_true() + } } fn less_than_or_eq( @@ -636,16 +632,19 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - let projected_mask = self.bound_reference(reference)?; - let literal = get_arrow_datum(literal)?; - - Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let left = get_leaf_column(batch.column(0))?; - lt_eq(&left, literal.as_ref()) - }, - ))) + if let Some(projected_mask) = self.bound_reference(reference) { + let literal = get_arrow_datum(literal)?; + + Ok(Box::new(ArrowPredicateFn::new( + projected_mask, + move |batch| { + let left = get_leaf_column(batch.column(0))?; + lt_eq(&left, literal.as_ref()) + }, + ))) + } else { + self.build_always_true() + } } fn greater_than( @@ -654,16 +653,19 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - let projected_mask = self.bound_reference(reference)?; - let literal = get_arrow_datum(literal)?; - - Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let left = get_leaf_column(batch.column(0))?; - gt(&left, literal.as_ref()) - }, - ))) + if let Some(projected_mask) = self.bound_reference(reference) { + let literal = get_arrow_datum(literal)?; + + Ok(Box::new(ArrowPredicateFn::new( + projected_mask, + move |batch| { + let left = get_leaf_column(batch.column(0))?; + gt(&left, literal.as_ref()) + }, + ))) + } else { + self.build_always_true() + } } fn greater_than_or_eq( @@ -672,16 +674,19 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - let projected_mask = self.bound_reference(reference)?; - let literal = get_arrow_datum(literal)?; - - Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let left = get_leaf_column(batch.column(0))?; - gt_eq(&left, literal.as_ref()) - }, - ))) + if let Some(projected_mask) = self.bound_reference(reference) { + let literal = get_arrow_datum(literal)?; + + Ok(Box::new(ArrowPredicateFn::new( + projected_mask, + move |batch| { + let left = get_leaf_column(batch.column(0))?; + gt_eq(&left, literal.as_ref()) + }, + ))) + } else { + self.build_always_true() + } } fn eq( @@ -690,16 +695,19 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - let projected_mask = self.bound_reference(reference)?; - let literal = get_arrow_datum(literal)?; - - Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let left = get_leaf_column(batch.column(0))?; - eq(&left, literal.as_ref()) - }, - ))) + if let Some(projected_mask) = self.bound_reference(reference) { + let literal = get_arrow_datum(literal)?; + + Ok(Box::new(ArrowPredicateFn::new( + projected_mask, + move |batch| { + let left = get_leaf_column(batch.column(0))?; + eq(&left, literal.as_ref()) + }, + ))) + } else { + self.build_always_true() + } } fn not_eq( @@ -708,16 +716,19 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - let projected_mask = self.bound_reference(reference)?; - let literal = get_arrow_datum(literal)?; - - Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let left = get_leaf_column(batch.column(0))?; - neq(&left, literal.as_ref()) - }, - ))) + if let Some(projected_mask) = self.bound_reference(reference) { + let literal = get_arrow_datum(literal)?; + + Ok(Box::new(ArrowPredicateFn::new( + projected_mask, + move |batch| { + let left = get_leaf_column(batch.column(0))?; + neq(&left, literal.as_ref()) + }, + ))) + } else { + self.build_always_true() + } } fn starts_with( @@ -726,10 +737,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { _literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), - |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), - ))) + self.build_always_true() } fn not_starts_with( @@ -738,10 +746,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { _literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), - |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), - ))) + self.build_always_true() } fn r#in( @@ -750,10 +755,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { _literals: &FnvHashSet, _predicate: &BoundPredicate, ) -> Result> { - Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), - |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), - ))) + self.build_always_true() } fn not_in( @@ -762,10 +764,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { _literals: &FnvHashSet, _predicate: &BoundPredicate, ) -> Result> { - Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), - |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), - ))) + self.build_always_true() } } From 8e1a3529491c0f86e7569b15306bcf79524bb0c5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 5 May 2024 22:26:25 -0700 Subject: [PATCH 11/17] For review --- crates/iceberg/src/arrow/reader.rs | 336 ++++++++++++++--------------- crates/iceberg/src/scan.rs | 75 +++++++ 2 files changed, 231 insertions(+), 180 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index c8dbc663e..c50c20136 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -19,13 +19,13 @@ use crate::error::Result; use arrow_arith::boolean::{and, is_not_null, is_null, not, or}; -use arrow_array::{ArrayRef, BooleanArray, StructArray}; +use arrow_array::{ArrayRef, BooleanArray, RecordBatch}; use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef}; use async_stream::try_stream; use fnv::FnvHashSet; use futures::stream::StreamExt; -use parquet::arrow::arrow_reader::{ArrowPredicate, ArrowPredicateFn, RowFilter}; +use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter}; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY}; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; use std::collections::{HashMap, HashSet}; @@ -230,21 +230,27 @@ impl ArrowReader { // Collect Parquet column indices from field ids. // If the field id is not found in Parquet schema, it will be ignored due to schema evolution. - let column_indices = collector + let mut column_indices = collector .field_ids .iter() .map(|field_id| field_id_map.get(field_id).cloned()) .flatten() .collect::>(); - // Convert BoundPredicates to ArrowPredicates + column_indices.sort(); + + // The converter that converts `BoundPredicates` to `ArrowPredicates` let mut converter = PredicateConverter { - projection_mask: ProjectionMask::leaves(parquet_schema, column_indices), - parquet_schema, column_map: &field_id_map, + column_indices: &column_indices, }; - let arrow_predicate = visit(&mut converter, predicates)?; - Ok(Some(RowFilter::new(vec![arrow_predicate]))) + + // After collecting required leaf column indices used in the predicate, + // creates the projection mask for the Arrow predicates. + let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone()); + let predicate_func = visit(&mut converter, predicates)?; + let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func); + Ok(Some(RowFilter::new(vec![Box::new(arrow_predicate)]))) } else { Ok(None) } @@ -437,125 +443,118 @@ impl BoundPredicateVisitor for CollectFieldIdVisitor { /// A visitor to convert Iceberg bound predicates to Arrow predicates. struct PredicateConverter<'a> { - /// The projection mask for the Arrow predicates. - pub projection_mask: ProjectionMask, - /// The Parquet schema descriptor. - pub parquet_schema: &'a SchemaDescriptor, /// The map between field id and leaf column index in Parquet schema. pub column_map: &'a HashMap, + /// The required column indices in Parquet schema for the predicates. + pub column_indices: &'a Vec, } impl PredicateConverter<'_> { - /// When visiting a bound reference, we return the projection mask for the leaf column - /// which is used to project the column in the record batch. Return None if the field id - /// is not found in the column map, which is possible due to schema evolution. - fn bound_reference(&mut self, reference: &BoundReference) -> Option { + /// When visiting a bound reference, we return index of the leaf column in the + /// required column indices which is used to project the column in the record batch. + /// Return None if the field id is not found in the column map, which is possible + /// due to schema evolution. + fn bound_reference(&mut self, reference: &BoundReference) -> Option { // The leaf column's index in Parquet schema. let column_idx = self.column_map.get(&reference.field().id)?; - Some(ProjectionMask::leaves( - self.parquet_schema, - vec![*column_idx], - )) + // The leaf column's index in the required column indices. + let index = self + .column_indices + .iter() + .position(|&idx| idx == *column_idx)?; + + Some(index) } /// Build an Arrow predicate that always returns true. - fn build_always_true(&self) -> Result> { - Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), - |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])), - ))) + fn build_always_true(&self) -> Result> { + Ok(Box::new(|batch| { + Ok(BooleanArray::from(vec![true; batch.num_rows()])) + })) + } + + /// Build an Arrow predicate that always returns false. + fn build_always_false(&self) -> Result> { + Ok(Box::new(|batch| { + Ok(BooleanArray::from(vec![false; batch.num_rows()])) + })) } } -/// Recursively get the leaf column from the record batch. Assume that the nested columns in -/// struct is projected to a single column. -fn get_leaf_column(column: &ArrayRef) -> std::result::Result { +/// Gets the leaf column from the record batch for the required column index. Only +/// supports top-level columns for now. +fn project_column( + batch: &RecordBatch, + column_idx: usize, +) -> std::result::Result { + let column = batch.column(column_idx); + match column.data_type() { - DataType::Struct(fields) => { - if fields.len() != 1 { - return Err(ArrowError::SchemaError( - "Struct column should have only one field after projection" - .parse() - .unwrap(), - )); - } - let struct_array = column.as_any().downcast_ref::().unwrap(); - get_leaf_column(struct_array.column(0)) - } + DataType::Struct(_) => Err(ArrowError::SchemaError( + "Does not support struct column yet.".to_string(), + )), _ => Ok(column.clone()), } } +type PredicateResult = + dyn FnMut(RecordBatch) -> std::result::Result + Send + 'static; + impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { - type T = Box; + type T = Box; - fn always_true(&mut self) -> Result> { + fn always_true(&mut self) -> Result> { self.build_always_true() } - fn always_false(&mut self) -> Result> { - Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), - |batch| Ok(BooleanArray::from(vec![false; batch.num_rows()])), - ))) + fn always_false(&mut self) -> Result> { + self.build_always_false() } fn and( &mut self, - mut lhs: Box, - mut rhs: Box, - ) -> Result> { - Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), - move |batch| { - let left = lhs.evaluate(batch.clone())?; - let right = rhs.evaluate(batch)?; - and(&left, &right) - }, - ))) + mut lhs: Box, + mut rhs: Box, + ) -> Result> { + Ok(Box::new(move |batch| { + let left = lhs(batch.clone())?; + let right = rhs(batch)?; + and(&left, &right) + })) } fn or( &mut self, - mut lhs: Box, - mut rhs: Box, - ) -> Result> { - Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), - move |batch| { - let left = lhs.evaluate(batch.clone())?; - let right = rhs.evaluate(batch)?; - or(&left, &right) - }, - ))) - } - - fn not(&mut self, mut inner: Box) -> Result> { - Ok(Box::new(ArrowPredicateFn::new( - self.projection_mask.clone(), - move |batch| { - let pred_ret = inner.evaluate(batch)?; - not(&pred_ret) - }, - ))) + mut lhs: Box, + mut rhs: Box, + ) -> Result> { + Ok(Box::new(move |batch| { + let left = lhs(batch.clone())?; + let right = rhs(batch)?; + or(&left, &right) + })) + } + + fn not(&mut self, mut inner: Box) -> Result> { + Ok(Box::new(move |batch| { + let pred_ret = inner(batch)?; + not(&pred_ret) + })) } fn is_null( &mut self, reference: &BoundReference, _predicate: &BoundPredicate, - ) -> Result> { - if let Some(projected_mask) = self.bound_reference(reference) { - Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let column = get_leaf_column(batch.column(0))?; - is_null(&column) - }, - ))) + ) -> Result> { + if let Some(idx) = self.bound_reference(reference) { + Ok(Box::new(move |batch| { + let column = project_column(&batch, idx)?; + is_null(&column) + })) } else { - self.build_always_true() + self.build_always_false() } } @@ -563,46 +562,31 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { &mut self, reference: &BoundReference, _predicate: &BoundPredicate, - ) -> Result> { - if let Some(projected_mask) = self.bound_reference(reference) { - Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let column = get_leaf_column(batch.column(0))?; - is_not_null(&column) - }, - ))) + ) -> Result> { + if let Some(idx) = self.bound_reference(reference) { + Ok(Box::new(move |batch| { + let column = project_column(&batch, idx)?; + is_not_null(&column) + })) } else { - self.build_always_true() + self.build_always_false() } } fn is_nan( &mut self, - reference: &BoundReference, + _reference: &BoundReference, _predicate: &BoundPredicate, - ) -> Result> { - if let Some(projected_mask) = self.bound_reference(reference) { - Ok(Box::new(ArrowPredicateFn::new(projected_mask, |batch| { - Ok(BooleanArray::from(vec![true; batch.num_rows()])) - }))) - } else { - self.build_always_true() - } + ) -> Result> { + self.build_always_true() } fn not_nan( &mut self, - reference: &BoundReference, + _reference: &BoundReference, _predicate: &BoundPredicate, - ) -> Result> { - if let Some(projected_mask) = self.bound_reference(reference) { - Ok(Box::new(ArrowPredicateFn::new(projected_mask, |batch| { - Ok(BooleanArray::from(vec![true; batch.num_rows()])) - }))) - } else { - self.build_always_true() - } + ) -> Result> { + self.build_always_true() } fn less_than( @@ -610,19 +594,17 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, literal: &Datum, _predicate: &BoundPredicate, - ) -> Result> { - if let Some(projected_mask) = self.bound_reference(reference) { + ) -> Result> { + if let Some(idx) = self.bound_reference(reference) { let literal = get_arrow_datum(literal)?; - Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let left = get_leaf_column(batch.column(0))?; - lt(&left, literal.as_ref()) - }, - ))) + Ok(Box::new(move |batch| { + let left = project_column(&batch, idx)?; + lt(&left, literal.as_ref()) + })) } else { - self.build_always_true() + // A missing column, treating it as null. + self.build_always_false() } } @@ -631,19 +613,17 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, literal: &Datum, _predicate: &BoundPredicate, - ) -> Result> { - if let Some(projected_mask) = self.bound_reference(reference) { + ) -> Result> { + if let Some(idx) = self.bound_reference(reference) { let literal = get_arrow_datum(literal)?; - Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let left = get_leaf_column(batch.column(0))?; - lt_eq(&left, literal.as_ref()) - }, - ))) + Ok(Box::new(move |batch| { + let left = project_column(&batch, idx)?; + lt_eq(&left, literal.as_ref()) + })) } else { - self.build_always_true() + // A missing column, treating it as null. + self.build_always_false() } } @@ -652,19 +632,17 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, literal: &Datum, _predicate: &BoundPredicate, - ) -> Result> { - if let Some(projected_mask) = self.bound_reference(reference) { + ) -> Result> { + if let Some(idx) = self.bound_reference(reference) { let literal = get_arrow_datum(literal)?; - Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let left = get_leaf_column(batch.column(0))?; - gt(&left, literal.as_ref()) - }, - ))) + Ok(Box::new(move |batch| { + let left = project_column(&batch, idx)?; + gt(&left, literal.as_ref()) + })) } else { - self.build_always_true() + // A missing column, treating it as null. + self.build_always_false() } } @@ -673,19 +651,17 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, literal: &Datum, _predicate: &BoundPredicate, - ) -> Result> { - if let Some(projected_mask) = self.bound_reference(reference) { + ) -> Result> { + if let Some(idx) = self.bound_reference(reference) { let literal = get_arrow_datum(literal)?; - Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let left = get_leaf_column(batch.column(0))?; - gt_eq(&left, literal.as_ref()) - }, - ))) + Ok(Box::new(move |batch| { + let left = project_column(&batch, idx)?; + gt_eq(&left, literal.as_ref()) + })) } else { - self.build_always_true() + // A missing column, treating it as null. + self.build_always_false() } } @@ -694,19 +670,17 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, literal: &Datum, _predicate: &BoundPredicate, - ) -> Result> { - if let Some(projected_mask) = self.bound_reference(reference) { + ) -> Result> { + if let Some(idx) = self.bound_reference(reference) { let literal = get_arrow_datum(literal)?; - Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let left = get_leaf_column(batch.column(0))?; - eq(&left, literal.as_ref()) - }, - ))) + Ok(Box::new(move |batch| { + let left = project_column(&batch, idx)?; + eq(&left, literal.as_ref()) + })) } else { - self.build_always_true() + // A missing column, treating it as null. + self.build_always_false() } } @@ -715,19 +689,17 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, literal: &Datum, _predicate: &BoundPredicate, - ) -> Result> { - if let Some(projected_mask) = self.bound_reference(reference) { + ) -> Result> { + if let Some(idx) = self.bound_reference(reference) { let literal = get_arrow_datum(literal)?; - Ok(Box::new(ArrowPredicateFn::new( - projected_mask, - move |batch| { - let left = get_leaf_column(batch.column(0))?; - neq(&left, literal.as_ref()) - }, - ))) + Ok(Box::new(move |batch| { + let left = project_column(&batch, idx)?; + neq(&left, literal.as_ref()) + })) } else { - self.build_always_true() + // A missing column, treating it as null. + self.build_always_false() } } @@ -736,7 +708,8 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { _reference: &BoundReference, _literal: &Datum, _predicate: &BoundPredicate, - ) -> Result> { + ) -> Result> { + // TODO: Implement starts_with self.build_always_true() } @@ -745,7 +718,8 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { _reference: &BoundReference, _literal: &Datum, _predicate: &BoundPredicate, - ) -> Result> { + ) -> Result> { + // TODO: Implement not_starts_with self.build_always_true() } @@ -754,7 +728,8 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { _reference: &BoundReference, _literals: &FnvHashSet, _predicate: &BoundPredicate, - ) -> Result> { + ) -> Result> { + // TODO: Implement in self.build_always_true() } @@ -763,7 +738,8 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { _reference: &BoundReference, _literals: &FnvHashSet, _predicate: &BoundPredicate, - ) -> Result> { + ) -> Result> { + // TODO: Implement not_in self.build_always_true() } } diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 8405a249a..f8cd1d09f 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -549,6 +549,8 @@ mod tests { ]; Arc::new(arrow_schema::Schema::new(fields)) }; + // 3 columns: + // x: [1, 1, 1, 1, ...] let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; let mut values = vec![2; 512]; @@ -556,11 +558,13 @@ mod tests { values.append(vec![4; 300].as_mut()); values.append(vec![5; 12].as_mut()); + // y: [2, 2, 2, 2, ..., 3, 3, 3, 3, ..., 4, 4, 4, 4, ..., 5, 5, 5, 5] let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; let mut values = vec![3; 512]; values.append(vec![4; 512].as_mut()); + // z: [3, 3, 3, 3, ..., 4, 4, 4, 4] let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; let to_write = RecordBatch::try_new(schema.clone(), vec![col1, col2, col3]).unwrap(); @@ -806,4 +810,75 @@ mod tests { let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); assert_eq!(batches[0].num_rows(), 1024); } + + #[tokio::test] + async fn test_filter_on_arrow_lt_and_gt() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Filter: y < 5 AND z >= 4 + let mut builder = fixture.table.scan(); + let predicate = Reference::new("y") + .less_than(Datum::long(5)) + .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4))); + builder = builder.filter(predicate); + let table_scan = builder.build().unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches[0].num_rows(), 500); + + let col = batches[0].column_by_name("x").unwrap(); + let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef; + assert_eq!(col, &expected_x); + + let col = batches[0].column_by_name("y").unwrap(); + let mut values = vec![]; + values.append(vec![3; 200].as_mut()); + values.append(vec![4; 300].as_mut()); + let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; + assert_eq!(col, &expected_y); + + let col = batches[0].column_by_name("z").unwrap(); + let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef; + assert_eq!(col, &expected_z); + } + + #[tokio::test] + async fn test_filter_on_arrow_lt_or_gt() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Filter: y < 5 AND z >= 4 + let mut builder = fixture.table.scan(); + let predicate = Reference::new("y") + .less_than(Datum::long(5)) + .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4))); + builder = builder.filter(predicate); + let table_scan = builder.build().unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches[0].num_rows(), 1024); + + let col = batches[0].column_by_name("x").unwrap(); + let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; + assert_eq!(col, &expected_x); + + let col = batches[0].column_by_name("y").unwrap(); + let mut values = vec![2; 512]; + values.append(vec![3; 200].as_mut()); + values.append(vec![4; 300].as_mut()); + values.append(vec![5; 12].as_mut()); + let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; + assert_eq!(col, &expected_y); + + let col = batches[0].column_by_name("z").unwrap(); + let mut values = vec![3; 512]; + values.append(vec![4; 512].as_mut()); + let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; + assert_eq!(col, &expected_z); + } } From 9b9d4015f501a3405e7a5c209dcf37dbf8ae3414 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 5 May 2024 22:42:10 -0700 Subject: [PATCH 12/17] More --- crates/iceberg/src/arrow/reader.rs | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index c50c20136..d775a4c2b 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -554,7 +554,8 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { is_null(&column) })) } else { - self.build_always_false() + // A missing column, treating it as null. + self.build_always_true() } } @@ -569,24 +570,35 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { is_not_null(&column) })) } else { + // A missing column, treating it as null. self.build_always_false() } } fn is_nan( &mut self, - _reference: &BoundReference, + reference: &BoundReference, _predicate: &BoundPredicate, ) -> Result> { - self.build_always_true() + if let Some(_) = self.bound_reference(reference) { + self.build_always_true() + } else { + // A missing column, treating it as null. + self.build_always_false() + } } fn not_nan( &mut self, - _reference: &BoundReference, + reference: &BoundReference, _predicate: &BoundPredicate, ) -> Result> { - self.build_always_true() + if let Some(_) = self.bound_reference(reference) { + self.build_always_false() + } else { + // A missing column, treating it as null. + self.build_always_true() + } } fn less_than( From 559321abb35d3fe777d69612062575a7a8e0af8f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 5 May 2024 22:44:29 -0700 Subject: [PATCH 13/17] fix --- crates/iceberg/src/arrow/reader.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index d775a4c2b..b9e22a55f 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -616,7 +616,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { })) } else { // A missing column, treating it as null. - self.build_always_false() + self.build_always_true() } } @@ -635,7 +635,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { })) } else { // A missing column, treating it as null. - self.build_always_false() + self.build_always_true() } } From 5453c5752e8a4836d0bcd29018d829adf87d9849 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 5 May 2024 22:46:07 -0700 Subject: [PATCH 14/17] Fix clippy --- crates/iceberg/src/arrow/reader.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index b9e22a55f..a7ff949f5 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -233,8 +233,7 @@ impl ArrowReader { let mut column_indices = collector .field_ids .iter() - .map(|field_id| field_id_map.get(field_id).cloned()) - .flatten() + .filter_map(|field_id| field_id_map.get(field_id).cloned()) .collect::>(); column_indices.sort(); @@ -580,7 +579,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, _predicate: &BoundPredicate, ) -> Result> { - if let Some(_) = self.bound_reference(reference) { + if self.bound_reference(reference).is_some() { self.build_always_true() } else { // A missing column, treating it as null. @@ -593,7 +592,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, _predicate: &BoundPredicate, ) -> Result> { - if let Some(_) = self.bound_reference(reference) { + if self.bound_reference(reference).is_some() { self.build_always_false() } else { // A missing column, treating it as null. From abe7afc5ced306eef5a3d4898efabf210772b1bf Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 5 May 2024 23:02:16 -0700 Subject: [PATCH 15/17] More --- crates/iceberg/src/arrow/reader.rs | 54 ++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index a7ff949f5..facf8688f 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -240,6 +240,7 @@ impl ArrowReader { // The converter that converts `BoundPredicates` to `ArrowPredicates` let mut converter = PredicateConverter { + parquet_schema: &parquet_schema, column_map: &field_id_map, column_indices: &column_indices, }; @@ -442,6 +443,8 @@ impl BoundPredicateVisitor for CollectFieldIdVisitor { /// A visitor to convert Iceberg bound predicates to Arrow predicates. struct PredicateConverter<'a> { + /// The Parquet schema descriptor. + pub parquet_schema: &'a SchemaDescriptor, /// The map between field id and leaf column index in Parquet schema. pub column_map: &'a HashMap, /// The required column indices in Parquet schema for the predicates. @@ -453,17 +456,32 @@ impl PredicateConverter<'_> { /// required column indices which is used to project the column in the record batch. /// Return None if the field id is not found in the column map, which is possible /// due to schema evolution. - fn bound_reference(&mut self, reference: &BoundReference) -> Option { + fn bound_reference(&mut self, reference: &BoundReference) -> Result> { // The leaf column's index in Parquet schema. - let column_idx = self.column_map.get(&reference.field().id)?; + if let Some(column_idx) = self.column_map.get(&reference.field().id) { + if self.parquet_schema.get_column_root_idx(*column_idx) != *column_idx { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Leave column `{}` in predicates isn't a root column in Parquet schema.", + reference.field().name + ), + )); + } - // The leaf column's index in the required column indices. - let index = self - .column_indices - .iter() - .position(|&idx| idx == *column_idx)?; + // The leaf column's index in the required column indices. + let index = self + .column_indices + .iter() + .position(|&idx| idx == *column_idx).ok_or(Error::new(ErrorKind::DataInvalid, format!( + "Leave column `{}` in predicates cannot be found in the required column indices.", + reference.field().name + )))?; - Some(index) + Ok(Some(index)) + } else { + Ok(None) + } } /// Build an Arrow predicate that always returns true. @@ -547,7 +565,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, _predicate: &BoundPredicate, ) -> Result> { - if let Some(idx) = self.bound_reference(reference) { + if let Some(idx) = self.bound_reference(reference)? { Ok(Box::new(move |batch| { let column = project_column(&batch, idx)?; is_null(&column) @@ -563,7 +581,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, _predicate: &BoundPredicate, ) -> Result> { - if let Some(idx) = self.bound_reference(reference) { + if let Some(idx) = self.bound_reference(reference)? { Ok(Box::new(move |batch| { let column = project_column(&batch, idx)?; is_not_null(&column) @@ -579,7 +597,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, _predicate: &BoundPredicate, ) -> Result> { - if self.bound_reference(reference).is_some() { + if self.bound_reference(reference)?.is_some() { self.build_always_true() } else { // A missing column, treating it as null. @@ -592,7 +610,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { reference: &BoundReference, _predicate: &BoundPredicate, ) -> Result> { - if self.bound_reference(reference).is_some() { + if self.bound_reference(reference)?.is_some() { self.build_always_false() } else { // A missing column, treating it as null. @@ -606,7 +624,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - if let Some(idx) = self.bound_reference(reference) { + if let Some(idx) = self.bound_reference(reference)? { let literal = get_arrow_datum(literal)?; Ok(Box::new(move |batch| { @@ -625,7 +643,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - if let Some(idx) = self.bound_reference(reference) { + if let Some(idx) = self.bound_reference(reference)? { let literal = get_arrow_datum(literal)?; Ok(Box::new(move |batch| { @@ -644,7 +662,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - if let Some(idx) = self.bound_reference(reference) { + if let Some(idx) = self.bound_reference(reference)? { let literal = get_arrow_datum(literal)?; Ok(Box::new(move |batch| { @@ -663,7 +681,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - if let Some(idx) = self.bound_reference(reference) { + if let Some(idx) = self.bound_reference(reference)? { let literal = get_arrow_datum(literal)?; Ok(Box::new(move |batch| { @@ -682,7 +700,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - if let Some(idx) = self.bound_reference(reference) { + if let Some(idx) = self.bound_reference(reference)? { let literal = get_arrow_datum(literal)?; Ok(Box::new(move |batch| { @@ -701,7 +719,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { literal: &Datum, _predicate: &BoundPredicate, ) -> Result> { - if let Some(idx) = self.bound_reference(reference) { + if let Some(idx) = self.bound_reference(reference)? { let literal = get_arrow_datum(literal)?; Ok(Box::new(move |batch| { From c36560c4634225799893f38022962cf2a234c015 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 6 May 2024 22:46:31 -0700 Subject: [PATCH 16/17] Fix clippy --- crates/iceberg/src/arrow/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index facf8688f..24c40eb01 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -240,7 +240,7 @@ impl ArrowReader { // The converter that converts `BoundPredicates` to `ArrowPredicates` let mut converter = PredicateConverter { - parquet_schema: &parquet_schema, + parquet_schema, column_map: &field_id_map, column_indices: &column_indices, }; From 185e1853f002029ba52b3e50fdb60c9b506cc225 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 14 May 2024 12:29:23 -0700 Subject: [PATCH 17/17] fix clippy --- crates/iceberg/src/arrow/reader.rs | 80 +++++++++++++++--------------- 1 file changed, 40 insertions(+), 40 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 672138d34..391239cf6 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -780,6 +780,46 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { } } +/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader. +/// +/// # TODO +/// +/// [ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64) contains the following hints to speed up metadata loading, we can consider adding them to this struct: +/// +/// - `metadata_size_hint`: Provide a hint as to the size of the parquet file's footer. +/// - `preload_column_index`: Load the Column Index as part of [`Self::get_metadata`]. +/// - `preload_offset_index`: Load the Offset Index as part of [`Self::get_metadata`]. +struct ArrowFileReader { + meta: FileMetadata, + r: R, +} + +impl ArrowFileReader { + /// Create a new ArrowFileReader + fn new(meta: FileMetadata, r: R) -> Self { + Self { meta, r } + } +} + +impl AsyncFileReader for ArrowFileReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + Box::pin( + self.r + .read(range.start as _..range.end as _) + .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))), + ) + } + + fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { + Box::pin(async move { + let file_size = self.meta.size; + let mut loader = MetadataLoader::load(self, file_size as usize, None).await?; + loader.load_page_index(false, false).await?; + Ok(Arc::new(loader.finish())) + }) + } +} + #[cfg(test)] mod tests { use crate::arrow::reader::CollectFieldIdVisitor; @@ -862,43 +902,3 @@ mod tests { assert_eq!(visitor.field_ids, expected); } } - -/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader. -/// -/// # TODO -/// -/// [ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64) contains the following hints to speed up metadata loading, we can consider adding them to this struct: -/// -/// - `metadata_size_hint`: Provide a hint as to the size of the parquet file's footer. -/// - `preload_column_index`: Load the Column Index as part of [`Self::get_metadata`]. -/// - `preload_offset_index`: Load the Offset Index as part of [`Self::get_metadata`]. -struct ArrowFileReader { - meta: FileMetadata, - r: R, -} - -impl ArrowFileReader { - /// Create a new ArrowFileReader - fn new(meta: FileMetadata, r: R) -> Self { - Self { meta, r } - } -} - -impl AsyncFileReader for ArrowFileReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { - Box::pin( - self.r - .read(range.start as _..range.end as _) - .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))), - ) - } - - fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { - Box::pin(async move { - let file_size = self.meta.size; - let mut loader = MetadataLoader::load(self, file_size as usize, None).await?; - loader.load_page_index(false, false).await?; - Ok(Arc::new(loader.finish())) - }) - } -}