diff --git a/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs index a826c147e4253..9a84f2fa30865 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use parquet::arrow::arrow_reader::RowSelection; +use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; /// Specifies a selection of rows and row groups within a ParquetFile to decode. /// @@ -71,6 +71,60 @@ impl ParquetAccessPlan { self.row_groups[idx].should_scan() } + /// Set to scan only the [`RowSelection`] in the specified row group. + /// + /// Based on the existing row groups plan: + /// * Skip: does nothing + /// * Scan: Updates to scan only the rows in the `RowSelection` + /// * Selection: Updates to scan only the specified in the exising selection and the new selection + pub fn scan_selection(&mut self, idx: usize, selection: RowSelection) { + self.row_groups[idx] = match &self.row_groups[idx] { + // already skipping the entire row group + RowGroupAccess::Skip => RowGroupAccess::Skip, + RowGroupAccess::Scan => RowGroupAccess::Selection(selection), + RowGroupAccess::Selection(existing_selection) => { + RowGroupAccess::Selection(existing_selection.intersection(&selection)) + } + } + } + + /// Return the overall RowSelection for all scanned row groups, if + /// there are any RowGroupAccess::Selection; + /// + /// + /// TODO better doc / explanation + pub fn overall_row_selection(&self) -> Option { + if !self + .row_groups + .iter() + .any(|rg| matches!(rg, RowGroupAccess::Selection(_))) + { + return None; + } + + let total_selection: RowSelection = self + .row_groups + .iter() + .flat_map(|rg| { + match rg { + RowGroupAccess::Skip => vec![], + RowGroupAccess::Scan => { + // need a row group access to scan the entire row group (need row group counts) + // This is clearly not tested TODO + todo!(); + } + RowGroupAccess::Selection(selection) => { + // todo avoid these clones + let selection: Vec = selection.clone().into(); + selection + } + } + }) + .collect(); + + Some(total_selection) + } + /// Return an iterator over the row group indexes that should be scanned pub fn row_group_index_iter(&self) -> impl Iterator + '_ { self.row_groups.iter().enumerate().filter_map(|(idx, b)| { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index bdfa6031267d0..1f86c53a2c678 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -164,26 +164,27 @@ impl FileOpener for ParquetOpener { } } - let access_plan = row_groups.build(); + let mut access_plan = row_groups.build(); // page index pruning: if all data on individual pages can // be ruled using page metadata, rows from other columns // with that range can be skipped as well if enable_page_index && !access_plan.is_empty() { if let Some(p) = page_pruning_predicate { - let pruned = p.prune( + access_plan = p.prune( &file_schema, builder.parquet_schema(), - &access_plan, + access_plan, file_metadata.as_ref(), &file_metrics, - )?; - if let Some(row_selection) = pruned { - builder = builder.with_row_selection(row_selection); - } + ); } } + if let Some(row_selection) = access_plan.overall_row_selection() { + builder = builder.with_row_selection(row_selection); + } + if let Some(limit) = limit { builder = builder.with_limit(limit) } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index d2ec4b0d0c698..08b4e76bae162 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -22,16 +22,15 @@ use arrow::array::{ StringArray, }; use arrow::datatypes::DataType; -use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError}; +use arrow::{array::ArrayRef, datatypes::SchemaRef}; use arrow_schema::Schema; -use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_common::{Result, ScalarValue}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; use log::{debug, trace}; use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; use parquet::{ arrow::arrow_reader::{RowSelection, RowSelector}, - errors::ParquetError, file::{ metadata::{ParquetMetaData, RowGroupMetaData}, page_index::index::Index, @@ -129,105 +128,90 @@ impl PagePruningPredicate { Ok(Self { predicates }) } - /// Returns a [`RowSelection`] for the given file + /// Returns an updated [`ParquetAccessPlan`] for the given file pub fn prune( &self, arrow_schema: &Schema, parquet_schema: &SchemaDescriptor, - access_plan: &ParquetAccessPlan, + mut access_plan: ParquetAccessPlan, file_metadata: &ParquetMetaData, file_metrics: &ParquetFileMetrics, - ) -> Result> { + ) -> ParquetAccessPlan { // scoped timer updates on drop let _timer_guard = file_metrics.page_index_eval_time.timer(); if self.predicates.is_empty() { - return Ok(None); + return access_plan; } let page_index_predicates = &self.predicates; let groups = file_metadata.row_groups(); if groups.is_empty() { - return Ok(None); + return access_plan; } - let file_offset_indexes = file_metadata.offset_index(); - let file_page_indexes = file_metadata.column_index(); - let (file_offset_indexes, file_page_indexes) = match ( - file_offset_indexes, - file_page_indexes, - ) { - (Some(o), Some(i)) => (o, i), - _ => { - trace!( + let (Some(file_offset_indexes), Some(file_page_indexes)) = + (file_metadata.offset_index(), file_metadata.column_index()) + else { + trace!( "skip page pruning due to lack of indexes. Have offset: {}, column index: {}", - file_offset_indexes.is_some(), file_page_indexes.is_some() + file_metadata.offset_index().is_some(), file_metadata.column_index().is_some() ); - return Ok(None); - } + return access_plan; }; - let mut row_selections = Vec::with_capacity(page_index_predicates.len()); - for predicate in page_index_predicates { - // find column index in the parquet schema - let col_idx = find_column_index(predicate, arrow_schema, parquet_schema); - let mut selectors = Vec::with_capacity(access_plan.len()); - for r in access_plan.row_group_index_iter() { + // track the total number of rows that should be skipped + let mut total_skip = 0; + + let row_group_indexes = access_plan.row_group_indexes(); + for r in row_group_indexes { + // The selection for this particular row group + let mut overall_selection = None; + for predicate in page_index_predicates { + // find column index in the parquet schema + let col_idx = find_column_index(predicate, arrow_schema, parquet_schema); let row_group_metadata = &groups[r]; - let rg_offset_indexes = file_offset_indexes.get(r); - let rg_page_indexes = file_page_indexes.get(r); - if let (Some(rg_page_indexes), Some(rg_offset_indexes), Some(col_idx)) = - (rg_page_indexes, rg_offset_indexes, col_idx) - { - selectors.extend( - prune_pages_in_one_row_group( - row_group_metadata, - predicate, - rg_offset_indexes.get(col_idx), - rg_page_indexes.get(col_idx), - groups[r].column(col_idx).column_descr(), - file_metrics, - ) - .map_err(|e| { - ArrowError::ParquetError(format!( - "Fail in prune_pages_in_one_row_group: {e}" - )) - }), + if let (Some(rg_page_indexes), Some(rg_offset_indexes), Some(col_idx)) = ( + file_page_indexes.get(r), + file_offset_indexes.get(r), + col_idx, + ) { + let selection = prune_pages_in_one_row_group( + row_group_metadata, + predicate, + rg_offset_indexes.get(col_idx), + rg_page_indexes.get(col_idx), + groups[r].column(col_idx).column_descr(), + file_metrics, ); + + if let Some(selection) = selection { + debug!("Use filter and page index to create RowSelection {:?} from predicate: {:?}", + &selection, + predicate.predicate_expr(), + ); + overall_selection = update_selection(overall_selection, selection) + } else { + trace!("No pages pruned in prune_pages_in_one_row_group") + }; } else { trace!( "Did not have enough metadata to prune with page indexes, \ falling back to all rows", ); - // fallback select all rows - let all_selected = - vec![RowSelector::select(groups[r].num_rows() as usize)]; - selectors.push(all_selected); } } - debug!( - "Use filter and page index create RowSelection {:?} from predicate: {:?}", - &selectors, - predicate.predicate_expr(), - ); - row_selections.push(selectors.into_iter().flatten().collect::>()); + if let Some(overall_selection) = overall_selection { + let rows_skipped = rows_skipped(&overall_selection); + trace!("Overall selection from predicate skipped {rows_skipped}: {overall_selection:?}"); + total_skip += rows_skipped; + access_plan.scan_selection(r, overall_selection) + } } - let final_selection = combine_multi_col_selection(row_selections); - let total_skip = - final_selection.iter().fold( - 0, - |acc, x| { - if x.skip { - acc + x.row_count - } else { - acc - } - }, - ); file_metrics.page_index_rows_filtered.add(total_skip); - Ok(Some(final_selection)) + access_plan } /// Returns the number of filters in the [`PagePruningPredicate`] @@ -236,6 +220,24 @@ impl PagePruningPredicate { } } +/// returns the number of rows skipped in the selection +/// TODO should this be upstreamed to RowSelection? +fn rows_skipped(selection: &RowSelection) -> usize { + selection + .iter() + .fold(0, |acc, x| if x.skip { acc + x.row_count } else { acc }) +} + +fn update_selection( + current_selection: Option, + row_selection: RowSelection, +) -> Option { + match current_selection { + None => Some(row_selection), + Some(current_selection) => Some(current_selection.intersection(&row_selection)), + } +} + /// Returns the column index in the row parquet schema for the single /// column of a single column pruning predicate. /// @@ -282,22 +284,7 @@ fn find_column_index( parquet_column(parquet_schema, arrow_schema, column.name()).map(|x| x.0) } -/// Intersects the [`RowSelector`]s -/// -/// For example, given: -/// * `RowSelector1: [ Skip(0~199), Read(200~299)]` -/// * `RowSelector2: [ Skip(0~99), Read(100~249), Skip(250~299)]` -/// -/// The final selection is the intersection of these `RowSelector`s: -/// * `final_selection:[ Skip(0~199), Read(200~249), Skip(250~299)]` -fn combine_multi_col_selection(row_selections: Vec>) -> RowSelection { - row_selections - .into_iter() - .map(RowSelection::from) - .reduce(|s1, s2| s1.intersection(&s2)) - .unwrap() -} - +/// Returns a `RowSelection` for the pages in this RowGroup if it could be successfully pruned. fn prune_pages_in_one_row_group( group: &RowGroupMetaData, predicate: &PruningPredicate, @@ -305,63 +292,61 @@ fn prune_pages_in_one_row_group( col_page_indexes: Option<&Index>, col_desc: &ColumnDescriptor, metrics: &ParquetFileMetrics, -) -> Result> { +) -> Option { let num_rows = group.num_rows() as usize; - if let (Some(col_offset_indexes), Some(col_page_indexes)) = + let (Some(col_offset_indexes), Some(col_page_indexes)) = (col_offset_indexes, col_page_indexes) - { - let target_type = parquet_to_arrow_decimal_type(col_desc); - let pruning_stats = PagesPruningStatistics { - col_page_indexes, - col_offset_indexes, - target_type: &target_type, - num_rows_in_row_group: group.num_rows(), - }; + else { + return None; + }; - match predicate.prune(&pruning_stats) { - Ok(values) => { - let mut vec = Vec::with_capacity(values.len()); - let row_vec = create_row_count_in_each_page(col_offset_indexes, num_rows); - assert_eq!(row_vec.len(), values.len()); - let mut sum_row = *row_vec.first().unwrap(); - let mut selected = *values.first().unwrap(); - trace!("Pruned to {:?} using {:?}", values, pruning_stats); - for (i, &f) in values.iter().enumerate().skip(1) { - if f == selected { - sum_row += *row_vec.get(i).unwrap(); - } else { - let selector = if selected { - RowSelector::select(sum_row) - } else { - RowSelector::skip(sum_row) - }; - vec.push(selector); - sum_row = *row_vec.get(i).unwrap(); - selected = f; - } - } + let target_type = parquet_to_arrow_decimal_type(col_desc); + let pruning_stats = PagesPruningStatistics { + col_page_indexes, + col_offset_indexes, + target_type: &target_type, + num_rows_in_row_group: group.num_rows(), + }; - let selector = if selected { - RowSelector::select(sum_row) - } else { - RowSelector::skip(sum_row) - }; - vec.push(selector); - return Ok(vec); - } + let values = match predicate.prune(&pruning_stats) { + Ok(values) => values, + Err(e) => { // stats filter array could not be built // return a result which will not filter out any pages - Err(e) => { - debug!("Error evaluating page index predicate values {e}"); - metrics.predicate_evaluation_errors.add(1); - return Ok(vec![RowSelector::select(group.num_rows() as usize)]); - } + debug!("Error evaluating page index predicate values {e}"); + metrics.predicate_evaluation_errors.add(1); + return None; + } + }; + + let mut vec = Vec::with_capacity(values.len()); + let row_vec = create_row_count_in_each_page(col_offset_indexes, num_rows); + assert_eq!(row_vec.len(), values.len()); + let mut sum_row = *row_vec.first().unwrap(); + let mut selected = *values.first().unwrap(); + trace!("Pruned to {:?} using {:?}", values, pruning_stats); + for (i, &f) in values.iter().enumerate().skip(1) { + if f == selected { + sum_row += *row_vec.get(i).unwrap(); + } else { + let selector = if selected { + RowSelector::select(sum_row) + } else { + RowSelector::skip(sum_row) + }; + vec.push(selector); + sum_row = *row_vec.get(i).unwrap(); + selected = f; } } - Err(DataFusionError::ParquetError(ParquetError::General( - "Got some error in prune_pages_in_one_row_group, plz try open the debuglog mode" - .to_string(), - ))) + + let selector = if selected { + RowSelector::select(sum_row) + } else { + RowSelector::skip(sum_row) + }; + vec.push(selector); + Some(RowSelection::from(vec)) } fn create_row_count_in_each_page(