Skip to content

Commit

Permalink
Data skipping correctly handles nested columns and column mapping (#512)
Browse files Browse the repository at this point in the history
## What changes are proposed in this pull request?

The existing implementation of data skipping has two flaws:
1. Only top-level columns are considered for data skipping (due to how
the stats schema is derived)
2. Column mapping is completely ignored -- the data skipping expression
with its logical column references is evaluated against physical data.
At best this means data skipping is ineffective (because the column
doesn't exist in the parquet). At worst, we would attempt to skip over
the wrong column (e.g. if a table was upgraded to use column mapping
because a column was dropped and re-added, we'd wrongly work with the
old/dropped column because its logical and physical names were the same)

It turns out the two issues are intertwined, because both column mapping
and nested column references need a schema traversal. So while we
_could_ solve them separately, it's actually easier to just do it all at
once.

Also -- the data skipping predicate we pass around needs an associated
"referenced" schema (in order to build a stats schema); if that schema
is empty, it means the data skipping predicate is "static" and should be
evaluated once to decide whether to even initiate a log scan. That adds
some complexity to the log replay path. But it also allows a predicate
like the following to be treated as static, in spite of appearing to
reference table columns:
```sql
WHERE my_col < 10 AND FALSE
```
 
### This PR affects the following public APIs

`scan::Scan::predicate` renamed as `physical_predicate` to eliminate
ambiguity

`scan::log_replay::scan_action_iter` now takes fewer (and different)
params.

## How was this change tested?

Existing unit tests, plus new unit tests that verify the new behavior.
  • Loading branch information
scovich authored Dec 10, 2024
1 parent c1b202a commit af075a8
Show file tree
Hide file tree
Showing 9 changed files with 465 additions and 119 deletions.
6 changes: 3 additions & 3 deletions kernel/src/engine/parquet_stats_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ mod tests;
pub(crate) trait ParquetStatsProvider {
/// The min-value stat for this column, if the column exists in this file, has the expected
/// type, and the parquet footer provides stats for it.
fn get_parquet_min_stat(&self, _col: &ColumnName, _data_type: &DataType) -> Option<Scalar>;
fn get_parquet_min_stat(&self, col: &ColumnName, data_type: &DataType) -> Option<Scalar>;

/// The max-value stat for this column, if the column exists in this file, has the expected
/// type, and the parquet footer provides stats for it.
fn get_parquet_max_stat(&self, _col: &ColumnName, _data_type: &DataType) -> Option<Scalar>;
fn get_parquet_max_stat(&self, col: &ColumnName, data_type: &DataType) -> Option<Scalar>;

/// The nullcount stat for this column, if the column exists in this file, has the expected
/// type, and the parquet footer provides stats for it.
fn get_parquet_nullcount_stat(&self, _col: &ColumnName) -> Option<i64>;
fn get_parquet_nullcount_stat(&self, col: &ColumnName) -> Option<i64>;

/// The rowcount stat for this row group. It is always available in the parquet footer.
fn get_parquet_rowcount_stat(&self) -> i64;
Expand Down
32 changes: 7 additions & 25 deletions kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::HashSet;
use std::sync::{Arc, LazyLock};

use tracing::debug;
Expand Down Expand Up @@ -57,8 +56,7 @@ impl DataSkippingFilter {
/// but using an Option lets the engine easily avoid the overhead of applying trivial filters.
pub(crate) fn new(
engine: &dyn Engine,
table_schema: &SchemaRef,
predicate: Option<ExpressionRef>,
physical_predicate: Option<(ExpressionRef, SchemaRef)>,
) -> Option<Self> {
static PREDICATE_SCHEMA: LazyLock<DataType> = LazyLock::new(|| {
DataType::struct_type([StructField::new("predicate", DataType::BOOLEAN, true)])
Expand All @@ -67,24 +65,8 @@ impl DataSkippingFilter {
static FILTER_EXPR: LazyLock<Expr> =
LazyLock::new(|| column_expr!("predicate").distinct(false));

let predicate = predicate.as_deref()?;
debug!("Creating a data skipping filter for {}", &predicate);
let field_names: HashSet<_> = predicate.references();

// Build the stats read schema by extracting the column names referenced by the predicate,
// extracting the corresponding field from the table schema, and inserting that field.
//
// TODO: Support nested column names!
let data_fields: Vec<_> = table_schema
.fields()
.filter(|field| field_names.contains([field.name.clone()].as_slice()))
.cloned()
.collect();
if data_fields.is_empty() {
// The predicate didn't reference any eligible stats columns, so skip it.
return None;
}
let minmax_schema = StructType::new(data_fields);
let (predicate, referenced_schema) = physical_predicate?;
debug!("Creating a data skipping filter for {:#?}", predicate);

// Convert a min/max stats schema into a nullcount schema (all leaf fields are LONG)
struct NullCountStatsTransform;
Expand All @@ -97,13 +79,13 @@ impl DataSkippingFilter {
}
}
let nullcount_schema = NullCountStatsTransform
.transform_struct(&minmax_schema)?
.transform_struct(&referenced_schema)?
.into_owned();
let stats_schema = Arc::new(StructType::new([
StructField::new("numRecords", DataType::LONG, true),
StructField::new("nullCount", nullcount_schema, true),
StructField::new("minValues", minmax_schema.clone(), true),
StructField::new("maxValues", minmax_schema, true),
StructField::new("minValues", referenced_schema.clone(), true),
StructField::new("maxValues", referenced_schema, true),
]));

// Skipping happens in several steps:
Expand All @@ -126,7 +108,7 @@ impl DataSkippingFilter {

let skipping_evaluator = engine.get_expression_handler().get_evaluator(
stats_schema.clone(),
Expr::struct_from([as_data_skipping_predicate(predicate, false)?]),
Expr::struct_from([as_data_skipping_predicate(&predicate, false)?]),
PREDICATE_SCHEMA.clone(),
);

Expand Down
13 changes: 4 additions & 9 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,9 @@ fn get_add_transform_expr() -> Expression {

impl LogReplayScanner {
/// Create a new [`LogReplayScanner`] instance
fn new(
engine: &dyn Engine,
table_schema: &SchemaRef,
predicate: Option<ExpressionRef>,
) -> Self {
fn new(engine: &dyn Engine, physical_predicate: Option<(ExpressionRef, SchemaRef)>) -> Self {
Self {
filter: DataSkippingFilter::new(engine, table_schema, predicate),
filter: DataSkippingFilter::new(engine, physical_predicate),
seen: Default::default(),
}
}
Expand Down Expand Up @@ -242,10 +238,9 @@ impl LogReplayScanner {
pub fn scan_action_iter(
engine: &dyn Engine,
action_iter: impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>>,
table_schema: &SchemaRef,
predicate: Option<ExpressionRef>,
physical_predicate: Option<(ExpressionRef, SchemaRef)>,
) -> impl Iterator<Item = DeltaResult<ScanData>> {
let mut log_scanner = LogReplayScanner::new(engine, table_schema, predicate);
let mut log_scanner = LogReplayScanner::new(engine, physical_predicate);
let add_transform = engine.get_expression_handler().get_evaluator(
get_log_add_schema().clone(),
get_add_transform_expr(),
Expand Down
Loading

0 comments on commit af075a8

Please sign in to comment.