From 301f27d7ab522d4b3a2168ce66e01ed2c064e1d2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 31 May 2024 07:57:06 -0400 Subject: [PATCH] Add `ParquetAccessPlan` that describes which part of the parquet files to read --- .../physical_plan/parquet/access_plan.rs | 153 +++++++++++ .../datasource/physical_plan/parquet/mod.rs | 7 +- .../physical_plan/parquet/opener.rs | 23 +- .../physical_plan/parquet/page_filter.rs | 253 ++++++++---------- .../physical_plan/parquet/row_groups.rs | 128 +++++---- 5 files changed, 351 insertions(+), 213 deletions(-) create mode 100644 datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs diff --git a/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs new file mode 100644 index 000000000000..9a84f2fa3086 --- /dev/null +++ b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; + +/// Specifies a selection of rows and row groups within a ParquetFile to decode. +/// +/// A `ParquetAccessPlan` is used to limits the row groups and data pages a `ParquetExec` +/// will read and decode and this improve performance. +/// +/// Note that page level pruning based on ArrowPredicate is applied after all of +/// these selections +/// +/// This looks like: +/// (TODO diagram) +#[derive(Debug, Clone, PartialEq)] +pub struct ParquetAccessPlan { + /// How to access the i-th row group + row_groups: Vec, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum RowGroupAccess { + /// The row group should not be read at all + Skip, + /// The row group should be scanned fully + Scan, + /// Only the specified rows within the row group should be scanned + Selection(RowSelection), +} + +impl RowGroupAccess { + /// return true if this row group should be scanned + pub fn should_scan(&self) -> bool { + match self { + RowGroupAccess::Skip => false, + RowGroupAccess::Scan | RowGroupAccess::Selection(_) => true, + } + } +} + +impl ParquetAccessPlan { + /// Create a new `ParquetAccessPlan` to scan all row groups + pub fn new_all(row_group_count: usize) -> Self { + Self { + row_groups: vec![RowGroupAccess::Scan; row_group_count], + } + } + + /// Set the i-th row group to false (should not be scanned) + pub fn do_not_scan(&mut self, idx: usize) { + self.row_groups[idx] = RowGroupAccess::Skip; + } + + /// Return true if the i-th row group should be scanned + pub fn should_scan(&self, idx: usize) -> bool { + self.row_groups[idx].should_scan() + } + + /// Set to scan only the [`RowSelection`] in the specified row group. + /// + /// Based on the existing row groups plan: + /// * Skip: does nothing + /// * Scan: Updates to scan only the rows in the `RowSelection` + /// * Selection: Updates to scan only the specified in the exising selection and the new selection + pub fn scan_selection(&mut self, idx: usize, selection: RowSelection) { + self.row_groups[idx] = match &self.row_groups[idx] { + // already skipping the entire row group + RowGroupAccess::Skip => RowGroupAccess::Skip, + RowGroupAccess::Scan => RowGroupAccess::Selection(selection), + RowGroupAccess::Selection(existing_selection) => { + RowGroupAccess::Selection(existing_selection.intersection(&selection)) + } + } + } + + /// Return the overall RowSelection for all scanned row groups, if + /// there are any RowGroupAccess::Selection; + /// + /// + /// TODO better doc / explanation + pub fn overall_row_selection(&self) -> Option { + if !self + .row_groups + .iter() + .any(|rg| matches!(rg, RowGroupAccess::Selection(_))) + { + return None; + } + + let total_selection: RowSelection = self + .row_groups + .iter() + .flat_map(|rg| { + match rg { + RowGroupAccess::Skip => vec![], + RowGroupAccess::Scan => { + // need a row group access to scan the entire row group (need row group counts) + // This is clearly not tested TODO + todo!(); + } + RowGroupAccess::Selection(selection) => { + // todo avoid these clones + let selection: Vec = selection.clone().into(); + selection + } + } + }) + .collect(); + + Some(total_selection) + } + + /// Return an iterator over the row group indexes that should be scanned + pub fn row_group_index_iter(&self) -> impl Iterator + '_ { + self.row_groups.iter().enumerate().filter_map(|(idx, b)| { + if b.should_scan() { + Some(idx) + } else { + None + } + }) + } + + /// Return a vec of all row group indexes to scan + pub fn row_group_indexes(&self) -> Vec { + self.row_group_index_iter().collect() + } + + /// Return the total number of row groups (not the total number to be scanned) + pub fn len(&self) -> usize { + self.row_groups.len() + } + + /// Return true if there are no row groups + pub fn is_empty(&self) -> bool { + self.row_groups.is_empty() + } +} diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 702103aa254e..1f30f39ce4ed 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -47,6 +47,7 @@ use log::debug; use parquet::basic::{ConvertedType, LogicalType}; use parquet::schema::types::ColumnDescriptor; +mod access_plan; mod metrics; mod opener; mod page_filter; @@ -59,6 +60,7 @@ mod writer; use crate::datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapterFactory, }; +pub use access_plan::ParquetAccessPlan; pub use metrics::ParquetFileMetrics; use opener::ParquetOpener; pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory}; @@ -152,8 +154,9 @@ pub use writer::plan_to_parquet; /// the file. /// /// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata) -/// via [`ParquetFileReaderFactory`] and applies any predicates and projections -/// to determine what pages must be read. +/// via [`ParquetFileReaderFactory`], creating a [`ParquetAccessPlan`] by +/// applying predicates. The plan and projections are used to determine what +/// pages must be read. /// /// * Step 4: The stream begins reading data, fetching the required pages /// and incrementally decoding them. diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index 6df225d80a0c..1f86c53a2c67 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -18,7 +18,7 @@ //! [`ParquetOpener`] for opening Parquet files use crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate; -use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet; +use crate::datasource::physical_plan::parquet::row_groups::RowGroupPlanBuilder; use crate::datasource::physical_plan::parquet::{row_filter, should_enable_page_index}; use crate::datasource::physical_plan::{ FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, ParquetFileReaderFactory, @@ -137,7 +137,7 @@ impl FileOpener for ParquetOpener { let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); let rg_metadata = file_metadata.row_groups(); // track which row groups to actually read - let mut row_groups = RowGroupSet::new(rg_metadata.len()); + let mut row_groups = RowGroupPlanBuilder::new(rg_metadata.len()); // if there is a range restricting what parts of the file to read if let Some(range) = file_range.as_ref() { row_groups.prune_by_range(rg_metadata, range); @@ -164,24 +164,27 @@ impl FileOpener for ParquetOpener { } } + let mut access_plan = row_groups.build(); + // page index pruning: if all data on individual pages can // be ruled using page metadata, rows from other columns // with that range can be skipped as well - if enable_page_index && !row_groups.is_empty() { + if enable_page_index && !access_plan.is_empty() { if let Some(p) = page_pruning_predicate { - let pruned = p.prune( + access_plan = p.prune( &file_schema, builder.parquet_schema(), - &row_groups, + access_plan, file_metadata.as_ref(), &file_metrics, - )?; - if let Some(row_selection) = pruned { - builder = builder.with_row_selection(row_selection); - } + ); } } + if let Some(row_selection) = access_plan.overall_row_selection() { + builder = builder.with_row_selection(row_selection); + } + if let Some(limit) = limit { builder = builder.with_limit(limit) } @@ -189,7 +192,7 @@ impl FileOpener for ParquetOpener { let stream = builder .with_projection(mask) .with_batch_size(batch_size) - .with_row_groups(row_groups.indexes()) + .with_row_groups(access_plan.row_group_indexes()) .build()?; let adapted = stream diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index d47d5c56bdf9..08b4e76bae16 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -22,16 +22,15 @@ use arrow::array::{ StringArray, }; use arrow::datatypes::DataType; -use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError}; +use arrow::{array::ArrayRef, datatypes::SchemaRef}; use arrow_schema::Schema; -use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_common::{Result, ScalarValue}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; use log::{debug, trace}; use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; use parquet::{ arrow::arrow_reader::{RowSelection, RowSelector}, - errors::ParquetError, file::{ metadata::{ParquetMetaData, RowGroupMetaData}, page_index::index::Index, @@ -42,10 +41,10 @@ use std::collections::HashSet; use std::sync::Arc; use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type; -use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet; use crate::datasource::physical_plan::parquet::statistics::{ from_bytes_to_i128, parquet_column, }; +use crate::datasource::physical_plan::parquet::ParquetAccessPlan; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use super::metrics::ParquetFileMetrics; @@ -129,105 +128,90 @@ impl PagePruningPredicate { Ok(Self { predicates }) } - /// Returns a [`RowSelection`] for the given file + /// Returns an updated [`ParquetAccessPlan`] for the given file pub fn prune( &self, arrow_schema: &Schema, parquet_schema: &SchemaDescriptor, - row_groups: &RowGroupSet, + mut access_plan: ParquetAccessPlan, file_metadata: &ParquetMetaData, file_metrics: &ParquetFileMetrics, - ) -> Result> { + ) -> ParquetAccessPlan { // scoped timer updates on drop let _timer_guard = file_metrics.page_index_eval_time.timer(); if self.predicates.is_empty() { - return Ok(None); + return access_plan; } let page_index_predicates = &self.predicates; let groups = file_metadata.row_groups(); if groups.is_empty() { - return Ok(None); + return access_plan; } - let file_offset_indexes = file_metadata.offset_index(); - let file_page_indexes = file_metadata.column_index(); - let (file_offset_indexes, file_page_indexes) = match ( - file_offset_indexes, - file_page_indexes, - ) { - (Some(o), Some(i)) => (o, i), - _ => { - trace!( + let (Some(file_offset_indexes), Some(file_page_indexes)) = + (file_metadata.offset_index(), file_metadata.column_index()) + else { + trace!( "skip page pruning due to lack of indexes. Have offset: {}, column index: {}", - file_offset_indexes.is_some(), file_page_indexes.is_some() + file_metadata.offset_index().is_some(), file_metadata.column_index().is_some() ); - return Ok(None); - } + return access_plan; }; - let mut row_selections = Vec::with_capacity(page_index_predicates.len()); - for predicate in page_index_predicates { - // find column index in the parquet schema - let col_idx = find_column_index(predicate, arrow_schema, parquet_schema); - let mut selectors = Vec::with_capacity(row_groups.len()); - for r in row_groups.iter() { + // track the total number of rows that should be skipped + let mut total_skip = 0; + + let row_group_indexes = access_plan.row_group_indexes(); + for r in row_group_indexes { + // The selection for this particular row group + let mut overall_selection = None; + for predicate in page_index_predicates { + // find column index in the parquet schema + let col_idx = find_column_index(predicate, arrow_schema, parquet_schema); let row_group_metadata = &groups[r]; - let rg_offset_indexes = file_offset_indexes.get(r); - let rg_page_indexes = file_page_indexes.get(r); - if let (Some(rg_page_indexes), Some(rg_offset_indexes), Some(col_idx)) = - (rg_page_indexes, rg_offset_indexes, col_idx) - { - selectors.extend( - prune_pages_in_one_row_group( - row_group_metadata, - predicate, - rg_offset_indexes.get(col_idx), - rg_page_indexes.get(col_idx), - groups[r].column(col_idx).column_descr(), - file_metrics, - ) - .map_err(|e| { - ArrowError::ParquetError(format!( - "Fail in prune_pages_in_one_row_group: {e}" - )) - }), + if let (Some(rg_page_indexes), Some(rg_offset_indexes), Some(col_idx)) = ( + file_page_indexes.get(r), + file_offset_indexes.get(r), + col_idx, + ) { + let selection = prune_pages_in_one_row_group( + row_group_metadata, + predicate, + rg_offset_indexes.get(col_idx), + rg_page_indexes.get(col_idx), + groups[r].column(col_idx).column_descr(), + file_metrics, ); + + if let Some(selection) = selection { + debug!("Use filter and page index to create RowSelection {:?} from predicate: {:?}", + &selection, + predicate.predicate_expr(), + ); + overall_selection = update_selection(overall_selection, selection) + } else { + trace!("No pages pruned in prune_pages_in_one_row_group") + }; } else { trace!( "Did not have enough metadata to prune with page indexes, \ falling back to all rows", ); - // fallback select all rows - let all_selected = - vec![RowSelector::select(groups[r].num_rows() as usize)]; - selectors.push(all_selected); } } - debug!( - "Use filter and page index create RowSelection {:?} from predicate: {:?}", - &selectors, - predicate.predicate_expr(), - ); - row_selections.push(selectors.into_iter().flatten().collect::>()); + if let Some(overall_selection) = overall_selection { + let rows_skipped = rows_skipped(&overall_selection); + trace!("Overall selection from predicate skipped {rows_skipped}: {overall_selection:?}"); + total_skip += rows_skipped; + access_plan.scan_selection(r, overall_selection) + } } - let final_selection = combine_multi_col_selection(row_selections); - let total_skip = - final_selection.iter().fold( - 0, - |acc, x| { - if x.skip { - acc + x.row_count - } else { - acc - } - }, - ); file_metrics.page_index_rows_filtered.add(total_skip); - Ok(Some(final_selection)) + access_plan } /// Returns the number of filters in the [`PagePruningPredicate`] @@ -236,6 +220,24 @@ impl PagePruningPredicate { } } +/// returns the number of rows skipped in the selection +/// TODO should this be upstreamed to RowSelection? +fn rows_skipped(selection: &RowSelection) -> usize { + selection + .iter() + .fold(0, |acc, x| if x.skip { acc + x.row_count } else { acc }) +} + +fn update_selection( + current_selection: Option, + row_selection: RowSelection, +) -> Option { + match current_selection { + None => Some(row_selection), + Some(current_selection) => Some(current_selection.intersection(&row_selection)), + } +} + /// Returns the column index in the row parquet schema for the single /// column of a single column pruning predicate. /// @@ -282,22 +284,7 @@ fn find_column_index( parquet_column(parquet_schema, arrow_schema, column.name()).map(|x| x.0) } -/// Intersects the [`RowSelector`]s -/// -/// For exampe, given: -/// * `RowSelector1: [ Skip(0~199), Read(200~299)]` -/// * `RowSelector2: [ Skip(0~99), Read(100~249), Skip(250~299)]` -/// -/// The final selection is the intersection of these `RowSelector`s: -/// * `final_selection:[ Skip(0~199), Read(200~249), Skip(250~299)]` -fn combine_multi_col_selection(row_selections: Vec>) -> RowSelection { - row_selections - .into_iter() - .map(RowSelection::from) - .reduce(|s1, s2| s1.intersection(&s2)) - .unwrap() -} - +/// Returns a `RowSelection` for the pages in this RowGroup if it could be successfully pruned. fn prune_pages_in_one_row_group( group: &RowGroupMetaData, predicate: &PruningPredicate, @@ -305,63 +292,61 @@ fn prune_pages_in_one_row_group( col_page_indexes: Option<&Index>, col_desc: &ColumnDescriptor, metrics: &ParquetFileMetrics, -) -> Result> { +) -> Option { let num_rows = group.num_rows() as usize; - if let (Some(col_offset_indexes), Some(col_page_indexes)) = + let (Some(col_offset_indexes), Some(col_page_indexes)) = (col_offset_indexes, col_page_indexes) - { - let target_type = parquet_to_arrow_decimal_type(col_desc); - let pruning_stats = PagesPruningStatistics { - col_page_indexes, - col_offset_indexes, - target_type: &target_type, - num_rows_in_row_group: group.num_rows(), - }; + else { + return None; + }; - match predicate.prune(&pruning_stats) { - Ok(values) => { - let mut vec = Vec::with_capacity(values.len()); - let row_vec = create_row_count_in_each_page(col_offset_indexes, num_rows); - assert_eq!(row_vec.len(), values.len()); - let mut sum_row = *row_vec.first().unwrap(); - let mut selected = *values.first().unwrap(); - trace!("Pruned to {:?} using {:?}", values, pruning_stats); - for (i, &f) in values.iter().enumerate().skip(1) { - if f == selected { - sum_row += *row_vec.get(i).unwrap(); - } else { - let selector = if selected { - RowSelector::select(sum_row) - } else { - RowSelector::skip(sum_row) - }; - vec.push(selector); - sum_row = *row_vec.get(i).unwrap(); - selected = f; - } - } + let target_type = parquet_to_arrow_decimal_type(col_desc); + let pruning_stats = PagesPruningStatistics { + col_page_indexes, + col_offset_indexes, + target_type: &target_type, + num_rows_in_row_group: group.num_rows(), + }; - let selector = if selected { - RowSelector::select(sum_row) - } else { - RowSelector::skip(sum_row) - }; - vec.push(selector); - return Ok(vec); - } + let values = match predicate.prune(&pruning_stats) { + Ok(values) => values, + Err(e) => { // stats filter array could not be built // return a result which will not filter out any pages - Err(e) => { - debug!("Error evaluating page index predicate values {e}"); - metrics.predicate_evaluation_errors.add(1); - return Ok(vec![RowSelector::select(group.num_rows() as usize)]); - } + debug!("Error evaluating page index predicate values {e}"); + metrics.predicate_evaluation_errors.add(1); + return None; + } + }; + + let mut vec = Vec::with_capacity(values.len()); + let row_vec = create_row_count_in_each_page(col_offset_indexes, num_rows); + assert_eq!(row_vec.len(), values.len()); + let mut sum_row = *row_vec.first().unwrap(); + let mut selected = *values.first().unwrap(); + trace!("Pruned to {:?} using {:?}", values, pruning_stats); + for (i, &f) in values.iter().enumerate().skip(1) { + if f == selected { + sum_row += *row_vec.get(i).unwrap(); + } else { + let selector = if selected { + RowSelector::select(sum_row) + } else { + RowSelector::skip(sum_row) + }; + vec.push(selector); + sum_row = *row_vec.get(i).unwrap(); + selected = f; } } - Err(DataFusionError::ParquetError(ParquetError::General( - "Got some error in prune_pages_in_one_row_group, plz try open the debuglog mode" - .to_string(), - ))) + + let selector = if selected { + RowSelector::select(sum_row) + } else { + RowSelector::skip(sum_row) + }; + vec.push(selector); + Some(RowSelection::from(vec)) } fn create_row_count_in_each_page( diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 7dd91d3d4e4b..52cd01cf1b9f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -36,58 +36,51 @@ use crate::datasource::physical_plan::parquet::statistics::{ }; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; -use super::ParquetFileMetrics; +use super::{ParquetAccessPlan, ParquetFileMetrics}; -/// Tracks which RowGroups within a parquet file should be scanned. +/// Calculates which RowGroups within a parquet file should be scanned. /// -/// This struct encapsulates the various types of pruning that can be applied to +/// This struct implements the various types of pruning that are applied to /// a set of row groups within a parquet file, progressively narrowing down the /// set of row groups that should be scanned. -#[derive(Debug, PartialEq)] -pub struct RowGroupSet { - /// `row_groups[i]` is true if the i-th row group should be scanned - row_groups: Vec, +#[derive(Debug, Clone, PartialEq)] +pub struct RowGroupPlanBuilder { + /// which row groups should be accessed + access_plan: ParquetAccessPlan, } -impl RowGroupSet { - /// Create a new `RowGroupSet` with all row groups set to true (will be scanned) - pub fn new(num_row_groups: usize) -> Self { +impl RowGroupPlanBuilder { + /// Create a new `RowGroupSet` with all row groups set to be scanned + pub fn new(row_group_count: usize) -> Self { Self { - row_groups: vec![true; num_row_groups], + access_plan: ParquetAccessPlan::new_all(row_group_count), } } + /* + /// Set the i-th row group to false (should not be scanned) + pub fn do_not_scan(&mut self, idx: usize) { + self.row_groups[idx] = false; + } - /// Set the i-th row group to false (should not be scanned) - pub fn do_not_scan(&mut self, idx: usize) { - self.row_groups[idx] = false; - } - - /// Return true if the i-th row group should be scanned - fn should_scan(&self, idx: usize) -> bool { - self.row_groups[idx] - } + /// Return true if the i-th row group should be scanned + fn should_scan(&self, idx: usize) -> bool { + self.row_groups[idx] + } - /// Return the total number of row groups (not the total number to be scanned) - pub fn len(&self) -> usize { - self.row_groups.len() - } + /// Return the total number of row groups (not the total number to be scanned) + pub fn len(&self) -> usize { + self.row_groups.len() + } + */ - /// Return true if there are no row groups + /// Return true if there are no row groups to scan pub fn is_empty(&self) -> bool { - self.row_groups.is_empty() - } - - /// Return an iterator over the row group indexes that should be scanned - pub fn iter(&self) -> impl Iterator + '_ { - self.row_groups - .iter() - .enumerate() - .filter_map(|(idx, &b)| if b { Some(idx) } else { None }) + self.access_plan.is_empty() } - /// Return a `Vec` of row group indices that should be scanned - pub fn indexes(&self) -> Vec { - self.iter().collect() + /// Returns the inner access plan + pub fn build(self) -> ParquetAccessPlan { + self.access_plan } /// Prune remaining row groups to only those within the specified range. @@ -97,9 +90,9 @@ impl RowGroupSet { /// # Panics /// if `groups.len() != self.len()` pub fn prune_by_range(&mut self, groups: &[RowGroupMetaData], range: &FileRange) { - assert_eq!(groups.len(), self.len()); + assert_eq!(groups.len(), self.access_plan.len()); for (idx, metadata) in groups.iter().enumerate() { - if !self.should_scan(idx) { + if !self.access_plan.should_scan(idx) { continue; } @@ -113,7 +106,7 @@ impl RowGroupSet { .dictionary_page_offset() .unwrap_or_else(|| col.data_page_offset()); if !range.contains(offset) { - self.do_not_scan(idx); + self.access_plan.do_not_scan(idx); } } } @@ -135,9 +128,9 @@ impl RowGroupSet { predicate: &PruningPredicate, metrics: &ParquetFileMetrics, ) { - assert_eq!(groups.len(), self.len()); + assert_eq!(groups.len(), self.access_plan.len()); for (idx, metadata) in groups.iter().enumerate() { - if !self.should_scan(idx) { + if !self.access_plan.should_scan(idx) { continue; } let pruning_stats = RowGroupPruningStatistics { @@ -150,7 +143,7 @@ impl RowGroupSet { // NB: false means don't scan row group if !values[0] { metrics.row_groups_pruned_statistics.add(1); - self.do_not_scan(idx); + self.access_plan.do_not_scan(idx); continue; } } @@ -179,9 +172,9 @@ impl RowGroupSet { predicate: &PruningPredicate, metrics: &ParquetFileMetrics, ) { - assert_eq!(builder.metadata().num_row_groups(), self.len()); - for idx in 0..self.len() { - if !self.should_scan(idx) { + assert_eq!(builder.metadata().num_row_groups(), self.access_plan.len()); + for idx in 0..self.access_plan.len() { + if !self.access_plan.should_scan(idx) { continue; } @@ -230,7 +223,7 @@ impl RowGroupSet { if prune_group { metrics.row_groups_pruned_bloom_filter.add(1); - self.do_not_scan(idx) + self.access_plan.do_not_scan(idx) } else if !stats.column_sbbf.is_empty() { metrics.row_groups_matched_bloom_filter.add(1); } @@ -500,7 +493,7 @@ mod tests { ); let metrics = parquet_file_metrics(); - let mut row_groups = RowGroupSet::new(2); + let mut row_groups = RowGroupPlanBuilder::new(2); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -534,7 +527,7 @@ mod tests { let metrics = parquet_file_metrics(); // missing statistics for first row group mean that the result from the predicate expression // is null / undefined so the first row group can't be filtered out - let mut row_groups = RowGroupSet::new(2); + let mut row_groups = RowGroupPlanBuilder::new(2); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -581,7 +574,7 @@ mod tests { let groups = &[rgm1, rgm2]; // the first row group is still filtered out because the predicate expression can be partially evaluated // when conditions are joined using AND - let mut row_groups = RowGroupSet::new(2); + let mut row_groups = RowGroupPlanBuilder::new(2); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -599,7 +592,7 @@ mod tests { // if conditions in predicate are joined with OR and an unsupported expression is used // this bypasses the entire predicate expression and no row groups are filtered out - let mut row_groups = RowGroupSet::new(2); + let mut row_groups = RowGroupPlanBuilder::new(2); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -655,7 +648,7 @@ mod tests { let groups = &[rgm1, rgm2]; // the first row group should be left because c1 is greater than zero // the second should be filtered out because c1 is less than zero - let mut row_groups = RowGroupSet::new(2); + let mut row_groups = RowGroupPlanBuilder::new(2); row_groups.prune_by_statistics( &file_schema, &schema_descr, @@ -704,7 +697,7 @@ mod tests { let metrics = parquet_file_metrics(); // First row group was filtered out because it contains no null value on "c2". - let mut row_groups = RowGroupSet::new(2); + let mut row_groups = RowGroupPlanBuilder::new(2); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -736,7 +729,7 @@ mod tests { let metrics = parquet_file_metrics(); // bool = NULL always evaluates to NULL (and thus will not // pass predicates. Ideally these should both be false - let mut row_groups = RowGroupSet::new(groups.len()); + let mut row_groups = RowGroupPlanBuilder::new(groups.len()); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -796,7 +789,7 @@ mod tests { vec![ParquetStatistics::int32(Some(100), None, None, 0, false)], ); let metrics = parquet_file_metrics(); - let mut row_groups = RowGroupSet::new(3); + let mut row_groups = RowGroupPlanBuilder::new(3); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -864,7 +857,7 @@ mod tests { vec![ParquetStatistics::int32(None, Some(2), None, 0, false)], ); let metrics = parquet_file_metrics(); - let mut row_groups = RowGroupSet::new(4); + let mut row_groups = RowGroupPlanBuilder::new(4); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -915,7 +908,7 @@ mod tests { vec![ParquetStatistics::int64(None, None, None, 0, false)], ); let metrics = parquet_file_metrics(); - let mut row_groups = RowGroupSet::new(3); + let mut row_groups = RowGroupPlanBuilder::new(3); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -989,7 +982,7 @@ mod tests { )], ); let metrics = parquet_file_metrics(); - let mut row_groups = RowGroupSet::new(3); + let mut row_groups = RowGroupPlanBuilder::new(3); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -1052,7 +1045,7 @@ mod tests { vec![ParquetStatistics::byte_array(None, None, None, 0, false)], ); let metrics = parquet_file_metrics(); - let mut row_groups = RowGroupSet::new(3); + let mut row_groups = RowGroupPlanBuilder::new(3); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -1179,7 +1172,7 @@ mod tests { ) .await .unwrap(); - assert!(pruned_row_groups.indexes().is_empty()); + assert!(pruned_row_groups.access_plan.row_group_indexes().is_empty()); } #[tokio::test] @@ -1251,12 +1244,12 @@ mod tests { impl ExpectedPruning { /// asserts that the pruned row group match this expectation - fn assert(&self, row_groups: &RowGroupSet) { - let num_row_groups = row_groups.len(); + fn assert(&self, row_groups: &RowGroupPlanBuilder) { + let num_row_groups = row_groups.access_plan.len(); assert!(num_row_groups > 0); let num_pruned = (0..num_row_groups) .filter_map(|i| { - if row_groups.should_scan(i) { + if row_groups.access_plan.should_scan(i) { None } else { Some(1) @@ -1278,14 +1271,14 @@ mod tests { ); } ExpectedPruning::Some(expected) => { - let actual = row_groups.indexes(); + let actual = row_groups.access_plan.row_group_indexes(); assert_eq!(expected, &actual, "Unexpected row groups pruned. Expected {expected:?}, got {actual:?}"); } } } } - fn assert_pruned(row_groups: RowGroupSet, expected: ExpectedPruning) { + fn assert_pruned(row_groups: RowGroupPlanBuilder, expected: ExpectedPruning) { expected.assert(&row_groups); } @@ -1386,7 +1379,7 @@ mod tests { file_name: &str, data: bytes::Bytes, pruning_predicate: &PruningPredicate, - ) -> Result { + ) -> Result { use object_store::{ObjectMeta, ObjectStore}; let object_meta = ObjectMeta { @@ -1411,7 +1404,8 @@ mod tests { }; let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); - let mut pruned_row_groups = RowGroupSet::new(builder.metadata().num_row_groups()); + let mut pruned_row_groups = + RowGroupPlanBuilder::new(builder.metadata().num_row_groups()); pruned_row_groups .prune_by_bloom_filters( pruning_predicate.schema(),