diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 10f971539..ed422be99 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -190,7 +190,7 @@ impl ArrowReader { try_join!(parquet_file.metadata(), parquet_file.reader())?; let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); - let should_load_page_index = row_selection_enabled && task.predicate().is_some(); + let should_load_page_index = row_selection_enabled && task.predicate.is_some(); // Start creating the record batch stream, which wraps the parquet file reader let mut record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options( @@ -245,7 +245,7 @@ impl ArrowReader { record_batch_stream_builder.metadata(), &selected_row_groups, &field_id_map, - task.schema(), + &task.schema, )?; record_batch_stream_builder = diff --git a/crates/iceberg/src/expr/visitors/page_index_evaluator.rs b/crates/iceberg/src/expr/visitors/page_index_evaluator.rs index e8c1849a9..af20be0a4 100644 --- a/crates/iceberg/src/expr/visitors/page_index_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/page_index_evaluator.rs @@ -24,14 +24,14 @@ use ordered_float::OrderedFloat; use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::file::metadata::RowGroupMetaData; use parquet::file::page_index::index::Index; -use parquet::format::PageLocation; +use parquet::file::page_index::offset_index::OffsetIndexMetaData; use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; use crate::expr::{BoundPredicate, BoundReference}; use crate::spec::{Datum, PrimitiveLiteral, PrimitiveType, Schema}; use crate::{Error, ErrorKind, Result}; -type OffsetIndex = Vec>; +type OffsetIndex = Vec; const IN_PREDICATE_LIMIT: usize = 200; @@ -206,13 +206,14 @@ impl<'a> PageIndexEvaluator<'a> { } /// returns a list of row counts per page - fn calc_row_counts(&self, offset_index: &[PageLocation]) -> Vec { + fn calc_row_counts(&self, offset_index: &OffsetIndexMetaData) -> Vec { let mut remaining_rows = self.row_group_metadata.num_rows() as usize; let mut row_counts = Vec::with_capacity(self.offset_index.len()); - for (idx, page_location) in offset_index.iter().enumerate() { - let row_count = if idx < offset_index.len() - 1 { - let row_count = (offset_index[idx + 1].first_row_index + let page_locations = offset_index.page_locations(); + for (idx, page_location) in page_locations.iter().enumerate() { + let row_count = if idx < page_locations.len() - 1 { + let row_count = (page_locations[idx + 1].first_row_index - page_location.first_row_index) as usize; remaining_rows -= row_count; row_count @@ -868,6 +869,7 @@ mod tests { use parquet::data_type::ByteArray; use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData}; use parquet::file::page_index::index::{Index, NativeIndex, PageIndex}; + use parquet::file::page_index::offset_index::OffsetIndexMetaData; use parquet::file::statistics::Statistics; use parquet::format::{BoundaryOrder, PageLocation}; use parquet::schema::types::{ @@ -1417,28 +1419,36 @@ mod tests { Ok(row_group_metadata?) } - fn create_page_index() -> Result<(Vec, Vec>)> { + fn create_page_index() -> Result<(Vec, Vec)> { let idx_float = Index::FLOAT(NativeIndex:: { indexes: vec![ PageIndex { min: None, max: None, null_count: Some(1024), + repetition_level_histogram: None, + definition_level_histogram: None, }, PageIndex { min: Some(0.0), max: Some(10.0), null_count: Some(0), + repetition_level_histogram: None, + definition_level_histogram: None, }, PageIndex { min: Some(10.0), max: Some(20.0), null_count: Some(1), + repetition_level_histogram: None, + definition_level_histogram: None, }, PageIndex { min: None, max: None, null_count: None, + repetition_level_histogram: None, + definition_level_histogram: None, }, ], boundary_order: BoundaryOrder(0), // UNORDERED @@ -1450,26 +1460,36 @@ mod tests { min: Some("AA".into()), max: Some("DD".into()), null_count: Some(0), + repetition_level_histogram: None, + definition_level_histogram: None, }, PageIndex { min: Some("DE".into()), max: Some("DE".into()), null_count: Some(0), + repetition_level_histogram: None, + definition_level_histogram: None, }, PageIndex { min: Some("DF".into()), max: Some("UJ".into()), null_count: Some(1), + repetition_level_histogram: None, + definition_level_histogram: None, }, PageIndex { min: None, max: None, null_count: Some(48), + repetition_level_histogram: None, + definition_level_histogram: None, }, PageIndex { min: None, max: None, null_count: None, + repetition_level_histogram: None, + definition_level_histogram: None, }, ], boundary_order: BoundaryOrder(0), // UNORDERED @@ -1491,8 +1511,14 @@ mod tests { ]; Ok((vec![idx_float, idx_string], vec![ - page_locs_float, - page_locs_string, + OffsetIndexMetaData { + page_locations: page_locs_float, + unencoded_byte_array_data_bytes: None, + }, + OffsetIndexMetaData { + page_locations: page_locs_string, + unencoded_byte_array_data_bytes: None, + }, ])) } }