From c60711fc16ac9ea73f558f21e9185bd59d624e08 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 31 May 2024 07:57:06 -0400 Subject: [PATCH] Add `ParquetAccessPlan` that describes which part of the parquet files to read --- .../physical_plan/parquet/access_plan.rs | 99 ++++++++++++++ .../datasource/physical_plan/parquet/mod.rs | 7 +- .../physical_plan/parquet/opener.rs | 12 +- .../physical_plan/parquet/page_filter.rs | 8 +- .../physical_plan/parquet/row_groups.rs | 128 +++++++++--------- 5 files changed, 176 insertions(+), 78 deletions(-) create mode 100644 datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs diff --git a/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs new file mode 100644 index 000000000000..de0d71a76674 --- /dev/null +++ b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use parquet::arrow::arrow_reader::RowSelection; + +/// Specifies a selection of row groups / rows within a ParquetFile to decode. +/// +/// This structure can limit the row groups and data pages a `ParquetExec` will +/// read and decode. +/// +/// Note that page level pruning based on ArrowPredicate is applied after all of +/// these selections +/// +/// This looks like: +/// (TODO diagram) +#[derive(Debug, Clone, PartialEq)] +pub struct ParquetAccessPlan { + /// How to access the i-th row group + row_groups: Vec, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum RowGroupAccess { + /// The row group should not be read at all + Skip, + /// The row group should be scanned fully + Scan, + /// Only the specified rows within the row group should be scanned + Selection(RowSelection), +} + +impl RowGroupAccess { + /// return true if this row group should be scanned + pub fn should_scan(&self) -> bool { + match self { + RowGroupAccess::Skip => false, + RowGroupAccess::Scan | RowGroupAccess::Selection(_) => true, + } + } +} + +impl ParquetAccessPlan { + /// Create a new `ParquetAccessPlan` to scan all row groups + pub fn new_all(row_group_count: usize) -> Self { + Self { + row_groups: vec![RowGroupAccess::Scan; row_group_count], + } + } + + /// Set the i-th row group to false (should not be scanned) + pub fn do_not_scan(&mut self, idx: usize) { + self.row_groups[idx] = RowGroupAccess::Skip; + } + + /// Return true if the i-th row group should be scanned + pub fn should_scan(&self, idx: usize) -> bool { + self.row_groups[idx].should_scan() + } + + /// Return an iterator over the row group indexes that should be scanned + pub fn row_group_index_iter(&self) -> impl Iterator + '_ { + self.row_groups.iter().enumerate().filter_map(|(idx, b)| { + if b.should_scan() { + Some(idx) + } else { + None + } + }) + } + + /// Return a vec of all row group indexes to scan + pub fn row_group_indexes(&self) -> Vec { + self.row_group_index_iter().collect() + } + + /// Return the total number of row groups (not the total number to be scanned) + pub fn len(&self) -> usize { + self.row_groups.len() + } + + /// Return true if there are no row groups + pub fn is_empty(&self) -> bool { + self.row_groups.is_empty() + } +} diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 702103aa254e..1f30f39ce4ed 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -47,6 +47,7 @@ use log::debug; use parquet::basic::{ConvertedType, LogicalType}; use parquet::schema::types::ColumnDescriptor; +mod access_plan; mod metrics; mod opener; mod page_filter; @@ -59,6 +60,7 @@ mod writer; use crate::datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapterFactory, }; +pub use access_plan::ParquetAccessPlan; pub use metrics::ParquetFileMetrics; use opener::ParquetOpener; pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory}; @@ -152,8 +154,9 @@ pub use writer::plan_to_parquet; /// the file. /// /// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata) -/// via [`ParquetFileReaderFactory`] and applies any predicates and projections -/// to determine what pages must be read. +/// via [`ParquetFileReaderFactory`], creating a [`ParquetAccessPlan`] by +/// applying predicates. The plan and projections are used to determine what +/// pages must be read. /// /// * Step 4: The stream begins reading data, fetching the required pages /// and incrementally decoding them. diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index 6df225d80a0c..bdfa6031267d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -18,7 +18,7 @@ //! [`ParquetOpener`] for opening Parquet files use crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate; -use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet; +use crate::datasource::physical_plan::parquet::row_groups::RowGroupPlanBuilder; use crate::datasource::physical_plan::parquet::{row_filter, should_enable_page_index}; use crate::datasource::physical_plan::{ FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, ParquetFileReaderFactory, @@ -137,7 +137,7 @@ impl FileOpener for ParquetOpener { let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); let rg_metadata = file_metadata.row_groups(); // track which row groups to actually read - let mut row_groups = RowGroupSet::new(rg_metadata.len()); + let mut row_groups = RowGroupPlanBuilder::new(rg_metadata.len()); // if there is a range restricting what parts of the file to read if let Some(range) = file_range.as_ref() { row_groups.prune_by_range(rg_metadata, range); @@ -164,15 +164,17 @@ impl FileOpener for ParquetOpener { } } + let access_plan = row_groups.build(); + // page index pruning: if all data on individual pages can // be ruled using page metadata, rows from other columns // with that range can be skipped as well - if enable_page_index && !row_groups.is_empty() { + if enable_page_index && !access_plan.is_empty() { if let Some(p) = page_pruning_predicate { let pruned = p.prune( &file_schema, builder.parquet_schema(), - &row_groups, + &access_plan, file_metadata.as_ref(), &file_metrics, )?; @@ -189,7 +191,7 @@ impl FileOpener for ParquetOpener { let stream = builder .with_projection(mask) .with_batch_size(batch_size) - .with_row_groups(row_groups.indexes()) + .with_row_groups(access_plan.row_group_indexes()) .build()?; let adapted = stream diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index d47d5c56bdf9..4cece1981b7f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -42,10 +42,10 @@ use std::collections::HashSet; use std::sync::Arc; use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type; -use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet; use crate::datasource::physical_plan::parquet::statistics::{ from_bytes_to_i128, parquet_column, }; +use crate::datasource::physical_plan::parquet::ParquetAccessPlan; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use super::metrics::ParquetFileMetrics; @@ -134,7 +134,7 @@ impl PagePruningPredicate { &self, arrow_schema: &Schema, parquet_schema: &SchemaDescriptor, - row_groups: &RowGroupSet, + access_plan: &ParquetAccessPlan, file_metadata: &ParquetMetaData, file_metrics: &ParquetFileMetrics, ) -> Result> { @@ -171,8 +171,8 @@ impl PagePruningPredicate { for predicate in page_index_predicates { // find column index in the parquet schema let col_idx = find_column_index(predicate, arrow_schema, parquet_schema); - let mut selectors = Vec::with_capacity(row_groups.len()); - for r in row_groups.iter() { + let mut selectors = Vec::with_capacity(access_plan.len()); + for r in access_plan.row_group_index_iter() { let row_group_metadata = &groups[r]; let rg_offset_indexes = file_offset_indexes.get(r); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 7dd91d3d4e4b..52cd01cf1b9f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -36,58 +36,51 @@ use crate::datasource::physical_plan::parquet::statistics::{ }; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; -use super::ParquetFileMetrics; +use super::{ParquetAccessPlan, ParquetFileMetrics}; -/// Tracks which RowGroups within a parquet file should be scanned. +/// Calculates which RowGroups within a parquet file should be scanned. /// -/// This struct encapsulates the various types of pruning that can be applied to +/// This struct implements the various types of pruning that are applied to /// a set of row groups within a parquet file, progressively narrowing down the /// set of row groups that should be scanned. -#[derive(Debug, PartialEq)] -pub struct RowGroupSet { - /// `row_groups[i]` is true if the i-th row group should be scanned - row_groups: Vec, +#[derive(Debug, Clone, PartialEq)] +pub struct RowGroupPlanBuilder { + /// which row groups should be accessed + access_plan: ParquetAccessPlan, } -impl RowGroupSet { - /// Create a new `RowGroupSet` with all row groups set to true (will be scanned) - pub fn new(num_row_groups: usize) -> Self { +impl RowGroupPlanBuilder { + /// Create a new `RowGroupSet` with all row groups set to be scanned + pub fn new(row_group_count: usize) -> Self { Self { - row_groups: vec![true; num_row_groups], + access_plan: ParquetAccessPlan::new_all(row_group_count), } } + /* + /// Set the i-th row group to false (should not be scanned) + pub fn do_not_scan(&mut self, idx: usize) { + self.row_groups[idx] = false; + } - /// Set the i-th row group to false (should not be scanned) - pub fn do_not_scan(&mut self, idx: usize) { - self.row_groups[idx] = false; - } - - /// Return true if the i-th row group should be scanned - fn should_scan(&self, idx: usize) -> bool { - self.row_groups[idx] - } + /// Return true if the i-th row group should be scanned + fn should_scan(&self, idx: usize) -> bool { + self.row_groups[idx] + } - /// Return the total number of row groups (not the total number to be scanned) - pub fn len(&self) -> usize { - self.row_groups.len() - } + /// Return the total number of row groups (not the total number to be scanned) + pub fn len(&self) -> usize { + self.row_groups.len() + } + */ - /// Return true if there are no row groups + /// Return true if there are no row groups to scan pub fn is_empty(&self) -> bool { - self.row_groups.is_empty() - } - - /// Return an iterator over the row group indexes that should be scanned - pub fn iter(&self) -> impl Iterator + '_ { - self.row_groups - .iter() - .enumerate() - .filter_map(|(idx, &b)| if b { Some(idx) } else { None }) + self.access_plan.is_empty() } - /// Return a `Vec` of row group indices that should be scanned - pub fn indexes(&self) -> Vec { - self.iter().collect() + /// Returns the inner access plan + pub fn build(self) -> ParquetAccessPlan { + self.access_plan } /// Prune remaining row groups to only those within the specified range. @@ -97,9 +90,9 @@ impl RowGroupSet { /// # Panics /// if `groups.len() != self.len()` pub fn prune_by_range(&mut self, groups: &[RowGroupMetaData], range: &FileRange) { - assert_eq!(groups.len(), self.len()); + assert_eq!(groups.len(), self.access_plan.len()); for (idx, metadata) in groups.iter().enumerate() { - if !self.should_scan(idx) { + if !self.access_plan.should_scan(idx) { continue; } @@ -113,7 +106,7 @@ impl RowGroupSet { .dictionary_page_offset() .unwrap_or_else(|| col.data_page_offset()); if !range.contains(offset) { - self.do_not_scan(idx); + self.access_plan.do_not_scan(idx); } } } @@ -135,9 +128,9 @@ impl RowGroupSet { predicate: &PruningPredicate, metrics: &ParquetFileMetrics, ) { - assert_eq!(groups.len(), self.len()); + assert_eq!(groups.len(), self.access_plan.len()); for (idx, metadata) in groups.iter().enumerate() { - if !self.should_scan(idx) { + if !self.access_plan.should_scan(idx) { continue; } let pruning_stats = RowGroupPruningStatistics { @@ -150,7 +143,7 @@ impl RowGroupSet { // NB: false means don't scan row group if !values[0] { metrics.row_groups_pruned_statistics.add(1); - self.do_not_scan(idx); + self.access_plan.do_not_scan(idx); continue; } } @@ -179,9 +172,9 @@ impl RowGroupSet { predicate: &PruningPredicate, metrics: &ParquetFileMetrics, ) { - assert_eq!(builder.metadata().num_row_groups(), self.len()); - for idx in 0..self.len() { - if !self.should_scan(idx) { + assert_eq!(builder.metadata().num_row_groups(), self.access_plan.len()); + for idx in 0..self.access_plan.len() { + if !self.access_plan.should_scan(idx) { continue; } @@ -230,7 +223,7 @@ impl RowGroupSet { if prune_group { metrics.row_groups_pruned_bloom_filter.add(1); - self.do_not_scan(idx) + self.access_plan.do_not_scan(idx) } else if !stats.column_sbbf.is_empty() { metrics.row_groups_matched_bloom_filter.add(1); } @@ -500,7 +493,7 @@ mod tests { ); let metrics = parquet_file_metrics(); - let mut row_groups = RowGroupSet::new(2); + let mut row_groups = RowGroupPlanBuilder::new(2); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -534,7 +527,7 @@ mod tests { let metrics = parquet_file_metrics(); // missing statistics for first row group mean that the result from the predicate expression // is null / undefined so the first row group can't be filtered out - let mut row_groups = RowGroupSet::new(2); + let mut row_groups = RowGroupPlanBuilder::new(2); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -581,7 +574,7 @@ mod tests { let groups = &[rgm1, rgm2]; // the first row group is still filtered out because the predicate expression can be partially evaluated // when conditions are joined using AND - let mut row_groups = RowGroupSet::new(2); + let mut row_groups = RowGroupPlanBuilder::new(2); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -599,7 +592,7 @@ mod tests { // if conditions in predicate are joined with OR and an unsupported expression is used // this bypasses the entire predicate expression and no row groups are filtered out - let mut row_groups = RowGroupSet::new(2); + let mut row_groups = RowGroupPlanBuilder::new(2); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -655,7 +648,7 @@ mod tests { let groups = &[rgm1, rgm2]; // the first row group should be left because c1 is greater than zero // the second should be filtered out because c1 is less than zero - let mut row_groups = RowGroupSet::new(2); + let mut row_groups = RowGroupPlanBuilder::new(2); row_groups.prune_by_statistics( &file_schema, &schema_descr, @@ -704,7 +697,7 @@ mod tests { let metrics = parquet_file_metrics(); // First row group was filtered out because it contains no null value on "c2". - let mut row_groups = RowGroupSet::new(2); + let mut row_groups = RowGroupPlanBuilder::new(2); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -736,7 +729,7 @@ mod tests { let metrics = parquet_file_metrics(); // bool = NULL always evaluates to NULL (and thus will not // pass predicates. Ideally these should both be false - let mut row_groups = RowGroupSet::new(groups.len()); + let mut row_groups = RowGroupPlanBuilder::new(groups.len()); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -796,7 +789,7 @@ mod tests { vec![ParquetStatistics::int32(Some(100), None, None, 0, false)], ); let metrics = parquet_file_metrics(); - let mut row_groups = RowGroupSet::new(3); + let mut row_groups = RowGroupPlanBuilder::new(3); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -864,7 +857,7 @@ mod tests { vec![ParquetStatistics::int32(None, Some(2), None, 0, false)], ); let metrics = parquet_file_metrics(); - let mut row_groups = RowGroupSet::new(4); + let mut row_groups = RowGroupPlanBuilder::new(4); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -915,7 +908,7 @@ mod tests { vec![ParquetStatistics::int64(None, None, None, 0, false)], ); let metrics = parquet_file_metrics(); - let mut row_groups = RowGroupSet::new(3); + let mut row_groups = RowGroupPlanBuilder::new(3); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -989,7 +982,7 @@ mod tests { )], ); let metrics = parquet_file_metrics(); - let mut row_groups = RowGroupSet::new(3); + let mut row_groups = RowGroupPlanBuilder::new(3); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -1052,7 +1045,7 @@ mod tests { vec![ParquetStatistics::byte_array(None, None, None, 0, false)], ); let metrics = parquet_file_metrics(); - let mut row_groups = RowGroupSet::new(3); + let mut row_groups = RowGroupPlanBuilder::new(3); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -1179,7 +1172,7 @@ mod tests { ) .await .unwrap(); - assert!(pruned_row_groups.indexes().is_empty()); + assert!(pruned_row_groups.access_plan.row_group_indexes().is_empty()); } #[tokio::test] @@ -1251,12 +1244,12 @@ mod tests { impl ExpectedPruning { /// asserts that the pruned row group match this expectation - fn assert(&self, row_groups: &RowGroupSet) { - let num_row_groups = row_groups.len(); + fn assert(&self, row_groups: &RowGroupPlanBuilder) { + let num_row_groups = row_groups.access_plan.len(); assert!(num_row_groups > 0); let num_pruned = (0..num_row_groups) .filter_map(|i| { - if row_groups.should_scan(i) { + if row_groups.access_plan.should_scan(i) { None } else { Some(1) @@ -1278,14 +1271,14 @@ mod tests { ); } ExpectedPruning::Some(expected) => { - let actual = row_groups.indexes(); + let actual = row_groups.access_plan.row_group_indexes(); assert_eq!(expected, &actual, "Unexpected row groups pruned. Expected {expected:?}, got {actual:?}"); } } } } - fn assert_pruned(row_groups: RowGroupSet, expected: ExpectedPruning) { + fn assert_pruned(row_groups: RowGroupPlanBuilder, expected: ExpectedPruning) { expected.assert(&row_groups); } @@ -1386,7 +1379,7 @@ mod tests { file_name: &str, data: bytes::Bytes, pruning_predicate: &PruningPredicate, - ) -> Result { + ) -> Result { use object_store::{ObjectMeta, ObjectStore}; let object_meta = ObjectMeta { @@ -1411,7 +1404,8 @@ mod tests { }; let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); - let mut pruned_row_groups = RowGroupSet::new(builder.metadata().num_row_groups()); + let mut pruned_row_groups = + RowGroupPlanBuilder::new(builder.metadata().num_row_groups()); pruned_row_groups .prune_by_bloom_filters( pruning_predicate.schema(),