Skip to content

Commit

Permalink
Merge partition columns into scan statistics for data skipping
Browse files Browse the repository at this point in the history
  • Loading branch information
timsaucer committed Dec 23, 2024
1 parent c3a868f commit 6e56d24
Showing 1 changed file with 169 additions and 2 deletions.
171 changes: 169 additions & 2 deletions kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, LazyLock};

use arrow_array::{Array, ArrayRef, MapArray, RecordBatch, StringArray, StructArray};
use tracing::debug;

use crate::actions::get_log_add_schema;
use crate::actions::visitors::SelectionVectorVisitor;
use crate::engine::arrow_data::ArrowEngineData;
use crate::error::DeltaResult;
use crate::expressions::{
column_expr, joined_column_expr, BinaryOperator, ColumnName, Expression as Expr, ExpressionRef,
Expand All @@ -14,8 +17,10 @@ use crate::expressions::{
use crate::predicates::{
DataSkippingPredicateEvaluator, PredicateEvaluator, PredicateEvaluatorDefaults,
};
use crate::schema::{DataType, PrimitiveType, SchemaRef, SchemaTransform, StructField, StructType};
use crate::{Engine, EngineData, ExpressionEvaluator, JsonHandler, RowVisitor as _};
use crate::schema::{
DataType, MapType, PrimitiveType, SchemaRef, SchemaTransform, StructField, StructType,
};
use crate::{Engine, EngineData, Error, ExpressionEvaluator, JsonHandler, RowVisitor as _};

#[cfg(test)]
mod tests;
Expand Down Expand Up @@ -43,6 +48,7 @@ fn as_data_skipping_predicate(expr: &Expr, inverted: bool) -> Option<Expr> {
pub(crate) struct DataSkippingFilter {
stats_schema: SchemaRef,
select_stats_evaluator: Arc<dyn ExpressionEvaluator>,
partitions_evaluator: Arc<dyn ExpressionEvaluator>,
skipping_evaluator: Arc<dyn ExpressionEvaluator>,
filter_evaluator: Arc<dyn ExpressionEvaluator>,
json_handler: Arc<dyn JsonHandler>,
Expand All @@ -61,6 +67,8 @@ impl DataSkippingFilter {
static PREDICATE_SCHEMA: LazyLock<DataType> = LazyLock::new(|| {
DataType::struct_type([StructField::new("predicate", DataType::BOOLEAN, true)])
});
static PARITIONS_EXPR: LazyLock<Expr> =
LazyLock::new(|| column_expr!("add.partitionValues"));
static STATS_EXPR: LazyLock<Expr> = LazyLock::new(|| column_expr!("add.stats"));
static FILTER_EXPR: LazyLock<Expr> =
LazyLock::new(|| column_expr!("predicate").distinct(false));
Expand Down Expand Up @@ -88,6 +96,8 @@ impl DataSkippingFilter {
StructField::new("maxValues", referenced_schema, true),
]));

let partitions_map_type = MapType::new(DataType::STRING, DataType::STRING, true);

// Skipping happens in several steps:
//
// 1. The stats selector fetches add.stats from the metadata
Expand All @@ -106,6 +116,12 @@ impl DataSkippingFilter {
DataType::STRING,
);

let partitions_evaluator = engine.get_expression_handler().get_evaluator(
get_log_add_schema().clone(),
PARITIONS_EXPR.clone(),
partitions_map_type.into(),
);

let skipping_evaluator = engine.get_expression_handler().get_evaluator(
stats_schema.clone(),
Expr::struct_from([as_data_skipping_predicate(&predicate, false)?]),
Expand All @@ -121,6 +137,7 @@ impl DataSkippingFilter {
Some(Self {
stats_schema,
select_stats_evaluator,
partitions_evaluator,
skipping_evaluator,
filter_evaluator,
json_handler: engine.get_json_handler(),
Expand All @@ -138,6 +155,11 @@ impl DataSkippingFilter {
.parse_json(stats, self.stats_schema.clone())?;
assert_eq!(parsed_stats.len(), actions.len());

let parsed_partitions = self.partitions_evaluator.evaluate(actions)?;
assert_eq!(parsed_partitions.len(), actions.len());

let parsed_stats = merge_partitions_into_stats(parsed_partitions, parsed_stats)?;

// evaluate the predicate on the parsed stats, then convert to selection vector
let skipping_predicate = self.skipping_evaluator.evaluate(&*parsed_stats)?;
assert_eq!(skipping_predicate.len(), actions.len());
Expand Down Expand Up @@ -257,3 +279,148 @@ impl DataSkippingPredicateEvaluator for DataSkippingPredicateCreator {
Some(Expr::variadic(op, exprs))
}
}

/// This function computes the values for the partition arrays that are added to the stats
/// fields to follow. Since the partition columns are a MapArray, we need to find for each
/// key the value assigned for each log.
fn compute_partition_arrays(
partitions_column: &ArrayRef,
output_schema: &Arc<arrow_schema::Schema>,
) -> DeltaResult<HashMap<String, ArrayRef>> {
let output_types: HashMap<String, arrow_schema::DataType> =
match output_schema.field_with_name("minValues")?.data_type() {
arrow_schema::DataType::Struct(fields) => fields
.iter()
.map(|field| (field.name().to_owned(), field.data_type().to_owned())),
_ => return Err(Error::engine_data_type("minValues")),
}
.collect();

let partitions_array = partitions_column
.as_any()
.downcast_ref::<MapArray>()
.ok_or_else(|| Error::engine_data_type("Partitions"))?;

let keys: HashSet<String> = partitions_array
.keys()
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| Error::engine_data_type("Partition keys"))?
.iter()
.filter_map(|s| s.map(|t| t.to_string()))
.collect();

let partition_values: HashMap<String, _> = keys
.iter()
.filter_map(|key| {
let cast_type = output_types.get(key)?;

let values = partitions_array
.iter()
.map(|maybe_partition| {
maybe_partition.and_then(|partition_data| {
let keys = partition_data
.column_by_name("key")?
.as_any()
.downcast_ref::<StringArray>()?;
let values = partition_data
.column_by_name("value")?
.as_any()
.downcast_ref::<StringArray>()?;

let mut kv =
keys.iter()
.zip(values.iter())
.filter_map(|(k, v)| match (k, v) {
(Some(k), Some(v)) => Some((k, v)),
_ => None,
});

kv.find(|(k, _)| *k == key.as_str())
.map(|(_, v)| v.to_string())
})
})
.collect::<Vec<Option<String>>>();

let string_array = StringArray::from(values);
let value_array = arrow_cast::cast(&string_array, &cast_type).ok()?;
Some((key.to_owned(), value_array))
})
.collect();

Ok(partition_values)
}

/// This funtion builds up the stats fields for the min and max values. It assumes
/// that the arrays for the partition fields already exist. It will only build those
/// that match the predicate filters.
fn merge_partition_fields_into_stats(
stats_batch: &RecordBatch,
idx: usize,
partition_values: &HashMap<String, Arc<dyn Array>>,
) -> DeltaResult<Arc<dyn arrow_array::Array + 'static>> {
let (fields, mut arrays, nulls) = stats_batch
.column(idx)
.as_any()
.downcast_ref::<StructArray>()
.ok_or_else(|| Error::engine_data_type("minValues"))?
.to_owned()
.into_parts();
for (idx, field) in itertools::enumerate(fields.iter()) {
if let Some(arr) = partition_values.get(field.name()) {
arrays[idx] = Arc::clone(arr);
}
}
Ok(Arc::new(StructArray::new(fields, arrays, nulls))
as Arc<(dyn arrow_array::Array + 'static)>)
}

/// This function adds partition data to the stats fields. For each partition field for a log
/// it adds the partition value to both the `minValues` and `maxValues` fields, so that when
/// we match against it with the data skipping filters we can effectively skip files.
fn merge_partitions_into_stats(
partitions: Box<dyn EngineData>,
stats: Box<dyn EngineData>,
) -> DeltaResult<Box<dyn EngineData>> {
let partitions = ArrowEngineData::try_from_engine_data(partitions)?;
let partitions_batch = partitions.record_batch();

// If the struct is partitions data is emtpy, return the original stats
let partitions_column = match partitions_batch.column_by_name("output") {
Some(c) => c,
None => return Ok(stats),
};

let stats = ArrowEngineData::try_from_engine_data(stats)?;
let stats_batch = stats.record_batch();
let output_schema = stats_batch.schema();

// For each unique partition key, generate the associated array
// to add to the stats fields
let partition_values = compute_partition_arrays(partitions_column, &output_schema)?;
if partition_values.is_empty() {
return Ok(stats);
}

let mut columns = Vec::default();
for (idx, field) in itertools::enumerate(output_schema.fields()) {
match field.name().as_str() {
"minValues" => columns.push(merge_partition_fields_into_stats(
stats_batch,
idx,
&partition_values,
)?),
"maxValues" => columns.push(merge_partition_fields_into_stats(
stats_batch,
idx,
&partition_values,
)?),
_ => {
columns.push(Arc::clone(stats_batch.column(idx)));
}
}
}

let record_batch = RecordBatch::try_new(output_schema, columns)?;
Ok(Box::new(ArrowEngineData::new(record_batch)))
}

0 comments on commit 6e56d24

Please sign in to comment.