Skip to content

Commit

Permalink
Add ParquetAccessPlan that describes which part of the parquet file…
Browse files Browse the repository at this point in the history
…s to read
  • Loading branch information
alamb committed May 31, 2024
1 parent a0773cd commit c60711f
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 78 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use parquet::arrow::arrow_reader::RowSelection;

/// Specifies a selection of row groups / rows within a ParquetFile to decode.
///
/// This structure can limit the row groups and data pages a `ParquetExec` will
/// read and decode.
///
/// Note that page level pruning based on ArrowPredicate is applied after all of
/// these selections
///
/// This looks like:
/// (TODO diagram)
#[derive(Debug, Clone, PartialEq)]
pub struct ParquetAccessPlan {
/// How to access the i-th row group
row_groups: Vec<RowGroupAccess>,
}

#[derive(Debug, Clone, PartialEq)]
pub enum RowGroupAccess {
/// The row group should not be read at all
Skip,
/// The row group should be scanned fully
Scan,
/// Only the specified rows within the row group should be scanned
Selection(RowSelection),
}

impl RowGroupAccess {
/// return true if this row group should be scanned
pub fn should_scan(&self) -> bool {
match self {
RowGroupAccess::Skip => false,
RowGroupAccess::Scan | RowGroupAccess::Selection(_) => true,
}
}
}

impl ParquetAccessPlan {
/// Create a new `ParquetAccessPlan` to scan all row groups
pub fn new_all(row_group_count: usize) -> Self {
Self {
row_groups: vec![RowGroupAccess::Scan; row_group_count],
}
}

/// Set the i-th row group to false (should not be scanned)
pub fn do_not_scan(&mut self, idx: usize) {
self.row_groups[idx] = RowGroupAccess::Skip;
}

/// Return true if the i-th row group should be scanned
pub fn should_scan(&self, idx: usize) -> bool {
self.row_groups[idx].should_scan()
}

/// Return an iterator over the row group indexes that should be scanned
pub fn row_group_index_iter(&self) -> impl Iterator<Item = usize> + '_ {
self.row_groups.iter().enumerate().filter_map(|(idx, b)| {
if b.should_scan() {
Some(idx)
} else {
None
}
})
}

/// Return a vec of all row group indexes to scan
pub fn row_group_indexes(&self) -> Vec<usize> {
self.row_group_index_iter().collect()
}

/// Return the total number of row groups (not the total number to be scanned)
pub fn len(&self) -> usize {
self.row_groups.len()
}

/// Return true if there are no row groups
pub fn is_empty(&self) -> bool {
self.row_groups.is_empty()
}
}
7 changes: 5 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use log::debug;
use parquet::basic::{ConvertedType, LogicalType};
use parquet::schema::types::ColumnDescriptor;

mod access_plan;
mod metrics;
mod opener;
mod page_filter;
Expand All @@ -59,6 +60,7 @@ mod writer;
use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
pub use access_plan::ParquetAccessPlan;
pub use metrics::ParquetFileMetrics;
use opener::ParquetOpener;
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
Expand Down Expand Up @@ -152,8 +154,9 @@ pub use writer::plan_to_parquet;
/// the file.
///
/// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata)
/// via [`ParquetFileReaderFactory`] and applies any predicates and projections
/// to determine what pages must be read.
/// via [`ParquetFileReaderFactory`], creating a [`ParquetAccessPlan`] by
/// applying predicates. The plan and projections are used to determine what
/// pages must be read.
///
/// * Step 4: The stream begins reading data, fetching the required pages
/// and incrementally decoding them.
Expand Down
12 changes: 7 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! [`ParquetOpener`] for opening Parquet files
use crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate;
use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
use crate::datasource::physical_plan::parquet::row_groups::RowGroupPlanBuilder;
use crate::datasource::physical_plan::parquet::{row_filter, should_enable_page_index};
use crate::datasource::physical_plan::{
FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, ParquetFileReaderFactory,
Expand Down Expand Up @@ -137,7 +137,7 @@ impl FileOpener for ParquetOpener {
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let rg_metadata = file_metadata.row_groups();
// track which row groups to actually read
let mut row_groups = RowGroupSet::new(rg_metadata.len());
let mut row_groups = RowGroupPlanBuilder::new(rg_metadata.len());
// if there is a range restricting what parts of the file to read
if let Some(range) = file_range.as_ref() {
row_groups.prune_by_range(rg_metadata, range);
Expand All @@ -164,15 +164,17 @@ impl FileOpener for ParquetOpener {
}
}

let access_plan = row_groups.build();

// page index pruning: if all data on individual pages can
// be ruled using page metadata, rows from other columns
// with that range can be skipped as well
if enable_page_index && !row_groups.is_empty() {
if enable_page_index && !access_plan.is_empty() {
if let Some(p) = page_pruning_predicate {
let pruned = p.prune(
&file_schema,
builder.parquet_schema(),
&row_groups,
&access_plan,
file_metadata.as_ref(),
&file_metrics,
)?;
Expand All @@ -189,7 +191,7 @@ impl FileOpener for ParquetOpener {
let stream = builder
.with_projection(mask)
.with_batch_size(batch_size)
.with_row_groups(row_groups.indexes())
.with_row_groups(access_plan.row_group_indexes())
.build()?;

let adapted = stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ use std::collections::HashSet;
use std::sync::Arc;

use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type;
use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
use crate::datasource::physical_plan::parquet::statistics::{
from_bytes_to_i128, parquet_column,
};
use crate::datasource::physical_plan::parquet::ParquetAccessPlan;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};

use super::metrics::ParquetFileMetrics;
Expand Down Expand Up @@ -134,7 +134,7 @@ impl PagePruningPredicate {
&self,
arrow_schema: &Schema,
parquet_schema: &SchemaDescriptor,
row_groups: &RowGroupSet,
access_plan: &ParquetAccessPlan,
file_metadata: &ParquetMetaData,
file_metrics: &ParquetFileMetrics,
) -> Result<Option<RowSelection>> {
Expand Down Expand Up @@ -171,8 +171,8 @@ impl PagePruningPredicate {
for predicate in page_index_predicates {
// find column index in the parquet schema
let col_idx = find_column_index(predicate, arrow_schema, parquet_schema);
let mut selectors = Vec::with_capacity(row_groups.len());
for r in row_groups.iter() {
let mut selectors = Vec::with_capacity(access_plan.len());
for r in access_plan.row_group_index_iter() {
let row_group_metadata = &groups[r];

let rg_offset_indexes = file_offset_indexes.get(r);
Expand Down
Loading

0 comments on commit c60711f

Please sign in to comment.