Skip to content

Commit

Permalink
Update parquet page pruning code to use the StatisticsExtractor (ap…
Browse files Browse the repository at this point in the history
…ache#11483)

* Update the parquet code prune_pages_in_one_row_group to use the `StatisticsExtractor`

* fix doc

* Increase evaluation error counter if error determining data page row counts

* Optimize `single_column`
  • Loading branch information
alamb authored Jul 18, 2024
1 parent dff2f3c commit b197449
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 369 deletions.
51 changes: 11 additions & 40 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::physical_plan::file_stream::FileStream;
use crate::datasource::physical_plan::{
parquet::page_filter::PagePruningPredicate, DisplayAs, FileGroupPartitioner,
parquet::page_filter::PagePruningAccessPlanFilter, DisplayAs, FileGroupPartitioner,
FileScanConfig,
};
use crate::{
Expand All @@ -39,13 +39,11 @@ use crate::{
},
};

use arrow::datatypes::{DataType, SchemaRef};
use arrow::datatypes::SchemaRef;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};

use itertools::Itertools;
use log::debug;
use parquet::basic::{ConvertedType, LogicalType};
use parquet::schema::types::ColumnDescriptor;

mod access_plan;
mod metrics;
Expand Down Expand Up @@ -225,7 +223,7 @@ pub struct ParquetExec {
/// Optional predicate for pruning row groups (derived from `predicate`)
pruning_predicate: Option<Arc<PruningPredicate>>,
/// Optional predicate for pruning pages (derived from `predicate`)
page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
page_pruning_predicate: Option<Arc<PagePruningAccessPlanFilter>>,
/// Optional hint for the size of the parquet metadata
metadata_size_hint: Option<usize>,
/// Optional user defined parquet file reader factory
Expand Down Expand Up @@ -381,19 +379,12 @@ impl ParquetExecBuilder {
})
.filter(|p| !p.always_true());

let page_pruning_predicate = predicate.as_ref().and_then(|predicate_expr| {
match PagePruningPredicate::try_new(predicate_expr, file_schema.clone()) {
Ok(pruning_predicate) => Some(Arc::new(pruning_predicate)),
Err(e) => {
debug!(
"Could not create page pruning predicate for '{:?}': {}",
pruning_predicate, e
);
predicate_creation_errors.add(1);
None
}
}
});
let page_pruning_predicate = predicate
.as_ref()
.map(|predicate_expr| {
PagePruningAccessPlanFilter::new(predicate_expr, file_schema.clone())
})
.map(Arc::new);

let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();
Expand Down Expand Up @@ -739,7 +730,7 @@ impl ExecutionPlan for ParquetExec {

fn should_enable_page_index(
enable_page_index: bool,
page_pruning_predicate: &Option<Arc<PagePruningPredicate>>,
page_pruning_predicate: &Option<Arc<PagePruningAccessPlanFilter>>,
) -> bool {
enable_page_index
&& page_pruning_predicate.is_some()
Expand All @@ -749,26 +740,6 @@ fn should_enable_page_index(
.unwrap_or(false)
}

// Convert parquet column schema to arrow data type, and just consider the
// decimal data type.
pub(crate) fn parquet_to_arrow_decimal_type(
parquet_column: &ColumnDescriptor,
) -> Option<DataType> {
let type_ptr = parquet_column.self_type_ptr();
match type_ptr.get_basic_info().logical_type() {
Some(LogicalType::Decimal { scale, precision }) => {
Some(DataType::Decimal128(precision as u8, scale as i8))
}
_ => match type_ptr.get_basic_info().converted_type() {
ConvertedType::DECIMAL => Some(DataType::Decimal128(
type_ptr.get_precision() as u8,
type_ptr.get_scale() as i8,
)),
_ => None,
},
}
}

#[cfg(test)]
mod tests {
// See also `parquet_exec` integration test
Expand Down Expand Up @@ -798,7 +769,7 @@ mod tests {
};
use arrow::datatypes::{Field, Schema, SchemaBuilder};
use arrow::record_batch::RecordBatch;
use arrow_schema::Fields;
use arrow_schema::{DataType, Fields};
use datafusion_common::{assert_contains, ScalarValue};
use datafusion_expr::{col, lit, when, Expr};
use datafusion_physical_expr::planner::logical2physical;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! [`ParquetOpener`] for opening Parquet files
use crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate;
use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter;
use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter;
use crate::datasource::physical_plan::parquet::{
row_filter, should_enable_page_index, ParquetAccessPlan,
Expand Down Expand Up @@ -46,7 +46,7 @@ pub(super) struct ParquetOpener {
pub limit: Option<usize>,
pub predicate: Option<Arc<dyn PhysicalExpr>>,
pub pruning_predicate: Option<Arc<PruningPredicate>>,
pub page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
pub page_pruning_predicate: Option<Arc<PagePruningAccessPlanFilter>>,
pub table_schema: SchemaRef,
pub metadata_size_hint: Option<usize>,
pub metrics: ExecutionPlanMetricsSet,
Expand Down
Loading

0 comments on commit b197449

Please sign in to comment.