From af075a8faa0c346b0e9b16100b3279e31fc7eac5 Mon Sep 17 00:00:00 2001 From: Ryan Johnson Date: Tue, 10 Dec 2024 14:56:37 -0700 Subject: [PATCH] Data skipping correctly handles nested columns and column mapping (#512) ## What changes are proposed in this pull request? The existing implementation of data skipping has two flaws: 1. Only top-level columns are considered for data skipping (due to how the stats schema is derived) 2. Column mapping is completely ignored -- the data skipping expression with its logical column references is evaluated against physical data. At best this means data skipping is ineffective (because the column doesn't exist in the parquet). At worst, we would attempt to skip over the wrong column (e.g. if a table was upgraded to use column mapping because a column was dropped and re-added, we'd wrongly work with the old/dropped column because its logical and physical names were the same) It turns out the two issues are intertwined, because both column mapping and nested column references need a schema traversal. So while we _could_ solve them separately, it's actually easier to just do it all at once. Also -- the data skipping predicate we pass around needs an associated "referenced" schema (in order to build a stats schema); if that schema is empty, it means the data skipping predicate is "static" and should be evaluated once to decide whether to even initiate a log scan. That adds some complexity to the log replay path. But it also allows a predicate like the following to be treated as static, in spite of appearing to reference table columns: ```sql WHERE my_col < 10 AND FALSE ``` ### This PR affects the following public APIs `scan::Scan::predicate` renamed as `physical_predicate` to eliminate ambiguity `scan::log_replay::scan_action_iter` now takes fewer (and different) params. ## How was this change tested? Existing unit tests, plus new unit tests that verify the new behavior. --- kernel/src/engine/parquet_stats_skipping.rs | 6 +- kernel/src/scan/data_skipping.rs | 32 +- kernel/src/scan/log_replay.rs | 13 +- kernel/src/scan/mod.rs | 436 ++++++++++++++++--- kernel/src/schema.rs | 10 +- kernel/src/table_changes/log_replay.rs | 4 +- kernel/src/table_changes/log_replay/tests.rs | 21 +- kernel/src/table_changes/scan.rs | 32 +- kernel/tests/read.rs | 30 +- 9 files changed, 465 insertions(+), 119 deletions(-) diff --git a/kernel/src/engine/parquet_stats_skipping.rs b/kernel/src/engine/parquet_stats_skipping.rs index d7877195a..2aace74e4 100644 --- a/kernel/src/engine/parquet_stats_skipping.rs +++ b/kernel/src/engine/parquet_stats_skipping.rs @@ -17,15 +17,15 @@ mod tests; pub(crate) trait ParquetStatsProvider { /// The min-value stat for this column, if the column exists in this file, has the expected /// type, and the parquet footer provides stats for it. - fn get_parquet_min_stat(&self, _col: &ColumnName, _data_type: &DataType) -> Option; + fn get_parquet_min_stat(&self, col: &ColumnName, data_type: &DataType) -> Option; /// The max-value stat for this column, if the column exists in this file, has the expected /// type, and the parquet footer provides stats for it. - fn get_parquet_max_stat(&self, _col: &ColumnName, _data_type: &DataType) -> Option; + fn get_parquet_max_stat(&self, col: &ColumnName, data_type: &DataType) -> Option; /// The nullcount stat for this column, if the column exists in this file, has the expected /// type, and the parquet footer provides stats for it. - fn get_parquet_nullcount_stat(&self, _col: &ColumnName) -> Option; + fn get_parquet_nullcount_stat(&self, col: &ColumnName) -> Option; /// The rowcount stat for this row group. It is always available in the parquet footer. fn get_parquet_rowcount_stat(&self) -> i64; diff --git a/kernel/src/scan/data_skipping.rs b/kernel/src/scan/data_skipping.rs index ad7816f74..54eb5344c 100644 --- a/kernel/src/scan/data_skipping.rs +++ b/kernel/src/scan/data_skipping.rs @@ -1,6 +1,5 @@ use std::borrow::Cow; use std::cmp::Ordering; -use std::collections::HashSet; use std::sync::{Arc, LazyLock}; use tracing::debug; @@ -57,8 +56,7 @@ impl DataSkippingFilter { /// but using an Option lets the engine easily avoid the overhead of applying trivial filters. pub(crate) fn new( engine: &dyn Engine, - table_schema: &SchemaRef, - predicate: Option, + physical_predicate: Option<(ExpressionRef, SchemaRef)>, ) -> Option { static PREDICATE_SCHEMA: LazyLock = LazyLock::new(|| { DataType::struct_type([StructField::new("predicate", DataType::BOOLEAN, true)]) @@ -67,24 +65,8 @@ impl DataSkippingFilter { static FILTER_EXPR: LazyLock = LazyLock::new(|| column_expr!("predicate").distinct(false)); - let predicate = predicate.as_deref()?; - debug!("Creating a data skipping filter for {}", &predicate); - let field_names: HashSet<_> = predicate.references(); - - // Build the stats read schema by extracting the column names referenced by the predicate, - // extracting the corresponding field from the table schema, and inserting that field. - // - // TODO: Support nested column names! - let data_fields: Vec<_> = table_schema - .fields() - .filter(|field| field_names.contains([field.name.clone()].as_slice())) - .cloned() - .collect(); - if data_fields.is_empty() { - // The predicate didn't reference any eligible stats columns, so skip it. - return None; - } - let minmax_schema = StructType::new(data_fields); + let (predicate, referenced_schema) = physical_predicate?; + debug!("Creating a data skipping filter for {:#?}", predicate); // Convert a min/max stats schema into a nullcount schema (all leaf fields are LONG) struct NullCountStatsTransform; @@ -97,13 +79,13 @@ impl DataSkippingFilter { } } let nullcount_schema = NullCountStatsTransform - .transform_struct(&minmax_schema)? + .transform_struct(&referenced_schema)? .into_owned(); let stats_schema = Arc::new(StructType::new([ StructField::new("numRecords", DataType::LONG, true), StructField::new("nullCount", nullcount_schema, true), - StructField::new("minValues", minmax_schema.clone(), true), - StructField::new("maxValues", minmax_schema, true), + StructField::new("minValues", referenced_schema.clone(), true), + StructField::new("maxValues", referenced_schema, true), ])); // Skipping happens in several steps: @@ -126,7 +108,7 @@ impl DataSkippingFilter { let skipping_evaluator = engine.get_expression_handler().get_evaluator( stats_schema.clone(), - Expr::struct_from([as_data_skipping_predicate(predicate, false)?]), + Expr::struct_from([as_data_skipping_predicate(&predicate, false)?]), PREDICATE_SCHEMA.clone(), ); diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index bf713ae7a..fb5c2b0fa 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -196,13 +196,9 @@ fn get_add_transform_expr() -> Expression { impl LogReplayScanner { /// Create a new [`LogReplayScanner`] instance - fn new( - engine: &dyn Engine, - table_schema: &SchemaRef, - predicate: Option, - ) -> Self { + fn new(engine: &dyn Engine, physical_predicate: Option<(ExpressionRef, SchemaRef)>) -> Self { Self { - filter: DataSkippingFilter::new(engine, table_schema, predicate), + filter: DataSkippingFilter::new(engine, physical_predicate), seen: Default::default(), } } @@ -242,10 +238,9 @@ impl LogReplayScanner { pub fn scan_action_iter( engine: &dyn Engine, action_iter: impl Iterator, bool)>>, - table_schema: &SchemaRef, - predicate: Option, + physical_predicate: Option<(ExpressionRef, SchemaRef)>, ) -> impl Iterator> { - let mut log_scanner = LogReplayScanner::new(engine, table_schema, predicate); + let mut log_scanner = LogReplayScanner::new(engine, physical_predicate); let add_transform = engine.get_expression_handler().get_evaluator( get_log_add_schema().clone(), get_add_transform_expr(), diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index f03d62cc9..7cf9a01ae 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -1,6 +1,7 @@ //! Functionality to create and execute scans (reads) over data stored in a delta table -use std::collections::HashMap; +use std::borrow::Cow; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use itertools::Itertools; @@ -11,9 +12,12 @@ use crate::actions::deletion_vector::{ deletion_treemap_to_bools, split_vector, DeletionVectorDescriptor, }; use crate::actions::{get_log_add_schema, get_log_schema, ADD_NAME, REMOVE_NAME}; -use crate::expressions::{ColumnName, Expression, ExpressionRef, Scalar}; +use crate::expressions::{ColumnName, Expression, ExpressionRef, ExpressionTransform, Scalar}; use crate::scan::state::{DvInfo, Stats}; -use crate::schema::{DataType, Schema, SchemaRef, StructField, StructType}; +use crate::schema::{ + ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, SchemaTransform, StructField, + StructType, +}; use crate::snapshot::Snapshot; use crate::table_features::ColumnMappingMode; use crate::{DeltaResult, Engine, EngineData, Error, FileMeta}; @@ -94,23 +98,171 @@ impl ScanBuilder { let logical_schema = self .schema .unwrap_or_else(|| self.snapshot.schema().clone().into()); - let (all_fields, read_fields, have_partition_cols) = get_state_info( + let state_info = get_state_info( logical_schema.as_ref(), &self.snapshot.metadata().partition_columns, )?; - let physical_schema = Arc::new(StructType::new(read_fields)); + + let physical_predicate = match self.predicate { + Some(predicate) => PhysicalPredicate::try_new(&predicate, &logical_schema)?, + None => PhysicalPredicate::None, + }; Ok(Scan { snapshot: self.snapshot, logical_schema, - physical_schema, - predicate: self.predicate, - all_fields, - have_partition_cols, + physical_schema: Arc::new(StructType::new(state_info.read_fields)), + physical_predicate, + all_fields: state_info.all_fields, + have_partition_cols: state_info.have_partition_cols, }) } } +#[derive(Clone, Debug, PartialEq)] +pub(crate) enum PhysicalPredicate { + Some(ExpressionRef, SchemaRef), + StaticSkipAll, + None, +} + +impl PhysicalPredicate { + /// If we have a predicate, verify the columns it references and apply column mapping. First, get + /// the set of references; use that to filter the schema to only the columns of interest (and + /// verify that all referenced columns exist); then use the resulting logical/physical mappings + /// to rewrite the expression with physical column names. + /// + /// NOTE: It is possible the predicate resolves to FALSE even ignoring column references, + /// e.g. `col > 10 AND FALSE`. Such predicates can statically skip the whole query. + pub(crate) fn try_new( + predicate: &Expression, + logical_schema: &Schema, + ) -> DeltaResult { + if can_statically_skip_all_files(predicate) { + return Ok(PhysicalPredicate::StaticSkipAll); + } + let mut get_referenced_fields = GetReferencedFields { + unresolved_references: predicate.references(), + column_mappings: HashMap::new(), + logical_path: vec![], + physical_path: vec![], + }; + let schema_opt = get_referenced_fields.transform_struct(logical_schema); + let mut unresolved = get_referenced_fields.unresolved_references.into_iter(); + if let Some(unresolved) = unresolved.next() { + // Schema traversal failed to resolve at least one column referenced by the predicate. + // + // NOTE: It's a pretty serious engine bug if we got this far with a query whose WHERE + // clause has invalid column references. Data skipping is best-effort and the predicate + // anyway needs to be evaluated against every row of data -- which is impossible if the + // columns are missing/invalid. Just blow up instead of trying to handle it gracefully. + return Err(Error::missing_column(format!( + "Predicate references unknown column: {unresolved}" + ))); + } + let Some(schema) = schema_opt else { + // The predicate doesn't statically skip all files, and it doesn't reference any columns + // that could dynamically change its behavior, so it's useless for data skipping. + return Ok(PhysicalPredicate::None); + }; + let mut apply_mappings = ApplyColumnMappings { + column_mappings: get_referenced_fields.column_mappings, + }; + if let Some(predicate) = apply_mappings.transform(predicate) { + Ok(PhysicalPredicate::Some( + Arc::new(predicate.into_owned()), + Arc::new(schema.into_owned()), + )) + } else { + Ok(PhysicalPredicate::None) + } + } +} + +// Evaluates a static data skipping predicate, ignoring any column references, and returns true if +// the predicate allows to statically skip all files. Since this is direct evaluation (not an +// expression rewrite), we use a dummy `ParquetStatsProvider` that provides no stats. +fn can_statically_skip_all_files(predicate: &Expression) -> bool { + use crate::engine::parquet_stats_skipping::{ + ParquetStatsProvider, ParquetStatsSkippingFilter as _, + }; + struct NoStats; + impl ParquetStatsProvider for NoStats { + fn get_parquet_min_stat(&self, _: &ColumnName, _: &DataType) -> Option { + None + } + + fn get_parquet_max_stat(&self, _: &ColumnName, _: &DataType) -> Option { + None + } + + fn get_parquet_nullcount_stat(&self, _: &ColumnName) -> Option { + None + } + + fn get_parquet_rowcount_stat(&self) -> i64 { + 0 + } + } + NoStats.eval_sql_where(predicate) == Some(false) +} + +// Build the stats read schema filtering the table schema to keep only skipping-eligible +// leaf fields that the skipping expression actually references. Also extract physical name +// mappings so we can access the correct physical stats column for each logical column. +struct GetReferencedFields<'a> { + unresolved_references: HashSet<&'a ColumnName>, + column_mappings: HashMap, + logical_path: Vec, + physical_path: Vec, +} +impl<'a> SchemaTransform<'a> for GetReferencedFields<'a> { + // Capture the path mapping for this leaf field + fn transform_primitive(&mut self, ptype: &'a PrimitiveType) -> Option> { + // Record the physical name mappings for all referenced leaf columns + self.unresolved_references + .remove(self.logical_path.as_slice()) + .then(|| { + self.column_mappings.insert( + ColumnName::new(&self.logical_path), + ColumnName::new(&self.physical_path), + ); + Cow::Borrowed(ptype) + }) + } + + // array and map fields are not eligible for data skipping, so filter them out. + fn transform_array(&mut self, _: &'a ArrayType) -> Option> { + None + } + fn transform_map(&mut self, _: &'a MapType) -> Option> { + None + } + + fn transform_struct_field(&mut self, field: &'a StructField) -> Option> { + let physical_name = field.physical_name(); + self.logical_path.push(field.name.clone()); + self.physical_path.push(physical_name.to_string()); + let field = self.recurse_into_struct_field(field); + self.logical_path.pop(); + self.physical_path.pop(); + Some(Cow::Owned(field?.with_name(physical_name))) + } +} + +struct ApplyColumnMappings { + column_mappings: HashMap, +} +impl<'a> ExpressionTransform<'a> for ApplyColumnMappings { + // NOTE: We already verified all column references. But if the map probe ever did fail, the + // transform would just delete any expression(s) that reference the invalid column. + fn transform_column(&mut self, name: &'a ColumnName) -> Option> { + self.column_mappings + .get(name) + .map(|physical_name| Cow::Owned(physical_name.clone())) + } +} + /// A vector of this type is returned from calling [`Scan::execute`]. Each [`ScanResult`] contains /// the raw [`EngineData`] as read by the engines [`crate::ParquetHandler`], and a boolean /// mask. Rows can be dropped from a scan due to deletion vectors, so we communicate back both @@ -176,7 +328,7 @@ pub struct Scan { snapshot: Arc, logical_schema: SchemaRef, physical_schema: SchemaRef, - predicate: Option, + physical_predicate: PhysicalPredicate, all_fields: Vec, have_partition_cols: bool, } @@ -185,7 +337,7 @@ impl std::fmt::Debug for Scan { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { f.debug_struct("Scan") .field("schema", &self.logical_schema) - .field("predicate", &self.predicate) + .field("predicate", &self.physical_predicate) .finish() } } @@ -199,8 +351,12 @@ impl Scan { } /// Get the predicate [`Expression`] of the scan. - pub fn predicate(&self) -> Option { - self.predicate.clone() + pub fn physical_predicate(&self) -> Option { + if let PhysicalPredicate::Some(ref predicate, _) = self.physical_predicate { + Some(predicate.clone()) + } else { + None + } } /// Get an iterator of [`EngineData`]s that should be included in scan for a query. This handles @@ -219,12 +375,18 @@ impl Scan { &self, engine: &dyn Engine, ) -> DeltaResult>> { - Ok(scan_action_iter( + // NOTE: This is a cheap arc clone + let physical_predicate = match self.physical_predicate.clone() { + PhysicalPredicate::StaticSkipAll => return Ok(None.into_iter().flatten()), + PhysicalPredicate::Some(predicate, schema) => Some((predicate, schema)), + PhysicalPredicate::None => None, + }; + let it = scan_action_iter( engine, self.replay_for_scan_data(engine)?, - &self.logical_schema, - self.predicate(), - )) + physical_predicate, + ); + Ok(Some(it).into_iter().flatten()) } // Factored out to facilitate testing @@ -316,10 +478,15 @@ impl Scan { size: scan_file.size as usize, location: file_path, }; + + // WARNING: We validated the physical predicate against a schema that includes + // partition columns, but the read schema we use here does _NOT_ include partition + // columns. So we cannot safely assume that all column references are valid. See + // https://github.com/delta-io/delta-kernel-rs/issues/434 for more details. let read_result_iter = engine.get_parquet_handler().read_parquet_files( &[meta], global_state.read_schema.clone(), - self.predicate(), + self.physical_predicate(), )?; // Arc clones @@ -397,22 +564,24 @@ pub(crate) fn parse_partition_value( } } -/// Get the state needed to process a scan. In particular this returns a triple of -/// (all_fields_in_query, fields_to_read_from_parquet, have_partition_cols) where: -/// - all_fields_in_query - all fields in the query as [`ColumnType`] enums -/// - fields_to_read_from_parquet - Which fields should be read from the raw parquet files. This takes -/// into account column mapping -/// - have_partition_cols - boolean indicating if we have partition columns in this query -fn get_state_info( - logical_schema: &Schema, - partition_columns: &[String], -) -> DeltaResult<(Vec, Vec, bool)> { +/// All the state needed to process a scan. +struct StateInfo { + /// All fields referenced by the query. + all_fields: Vec, + /// The physical (parquet) read schema to use. + read_fields: Vec, + /// True if this query references any partition columns. + have_partition_cols: bool, +} + +/// Get the state needed to process a scan, see [`StateInfo`] for details. +fn get_state_info(logical_schema: &Schema, partition_columns: &[String]) -> DeltaResult { let mut have_partition_cols = false; let mut read_fields = Vec::with_capacity(logical_schema.fields.len()); // Loop over all selected fields and note if they are columns that will be read from the // parquet file ([`ColumnType::Selected`]) or if they are partition columns and will need to // be filled in by evaluating an expression ([`ColumnType::Partition`]) - let column_types = logical_schema + let all_fields = logical_schema .fields() .enumerate() .map(|(index, logical_field)| -> DeltaResult<_> { @@ -433,7 +602,11 @@ fn get_state_info( } }) .try_collect()?; - Ok((column_types, read_fields, have_partition_cols)) + Ok(StateInfo { + all_fields, + read_fields, + have_partition_cols, + }) } pub fn selection_vector( @@ -454,7 +627,7 @@ pub fn transform_to_logical( global_state: &GlobalScanState, partition_values: &HashMap, ) -> DeltaResult> { - let (all_fields, _read_fields, have_partition_cols) = get_state_info( + let state_info = get_state_info( &global_state.logical_schema, &global_state.partition_columns, )?; @@ -463,8 +636,8 @@ pub fn transform_to_logical( data, global_state, partition_values, - &all_fields, - have_partition_cols, + &state_info.all_fields, + state_info.have_partition_cols, ) } @@ -528,7 +701,6 @@ pub(crate) mod test_utils { sync::{json::SyncJsonHandler, SyncEngine}, }, scan::log_replay::scan_action_iter, - schema::{StructField, StructType}, EngineData, JsonHandler, }; @@ -582,17 +754,9 @@ pub(crate) mod test_utils { context: T, validate_callback: ScanCallback, ) { - let engine = SyncEngine::new(); - // doesn't matter here - let table_schema = Arc::new(StructType::new([StructField::new( - "foo", - crate::schema::DataType::STRING, - false, - )])); let iter = scan_action_iter( - &engine, + &SyncEngine::new(), batch.into_iter().map(|batch| Ok((batch as _, true))), - &table_schema, None, ); let mut batch_count = 0; @@ -618,11 +782,185 @@ mod tests { use crate::engine::sync::SyncEngine; use crate::expressions::column_expr; - use crate::schema::PrimitiveType; + use crate::schema::{ColumnMetadataKey, PrimitiveType}; use crate::Table; use super::*; + #[test] + fn test_static_skipping() { + let test_cases = [ + (false, column_expr!("a")), + (true, Expression::literal(false)), + (false, Expression::literal(true)), + (false, Expression::null_literal(DataType::LONG)), + (true, Expression::and(column_expr!("a"), false)), + (false, Expression::or(column_expr!("a"), true)), + (false, Expression::or(column_expr!("a"), false)), + (false, Expression::lt(column_expr!("a"), 10)), + (false, Expression::lt(Expression::literal(10), 100)), + (true, Expression::gt(Expression::literal(10), 100)), + ]; + for (should_skip, predicate) in test_cases { + assert_eq!( + can_statically_skip_all_files(&predicate), + should_skip, + "Failed for predicate: {:#?}", + predicate + ); + } + } + + #[test] + fn test_physical_predicate() { + let logical_schema = StructType::new(vec![ + StructField::new("a", DataType::LONG, true), + StructField::new("b", DataType::LONG, true).with_metadata([( + ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), + "phys_b", + )]), + StructField::new("phys_b", DataType::LONG, true).with_metadata([( + ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), + "phys_c", + )]), + StructField::new( + "nested", + StructType::new(vec![ + StructField::new("x", DataType::LONG, true), + StructField::new("y", DataType::LONG, true).with_metadata([( + ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), + "phys_y", + )]), + ]), + true, + ), + StructField::new( + "mapped", + StructType::new(vec![StructField::new("n", DataType::LONG, true) + .with_metadata([( + ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), + "phys_n", + )])]), + true, + ) + .with_metadata([( + ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), + "phys_mapped", + )]), + ]); + + // NOTE: We break several column mapping rules here because they don't matter for this + // test. For example, we do not provide field ids, and not all columns have physical names. + let test_cases = [ + (Expression::literal(true), Some(PhysicalPredicate::None)), + ( + Expression::literal(false), + Some(PhysicalPredicate::StaticSkipAll), + ), + (column_expr!("x"), None), // no such column + ( + column_expr!("a"), + Some(PhysicalPredicate::Some( + column_expr!("a").into(), + StructType::new(vec![StructField::new("a", DataType::LONG, true)]).into(), + )), + ), + ( + column_expr!("b"), + Some(PhysicalPredicate::Some( + column_expr!("phys_b").into(), + StructType::new(vec![StructField::new("phys_b", DataType::LONG, true) + .with_metadata([( + ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), + "phys_b", + )])]) + .into(), + )), + ), + ( + column_expr!("nested.x"), + Some(PhysicalPredicate::Some( + column_expr!("nested.x").into(), + StructType::new(vec![StructField::new( + "nested", + StructType::new(vec![StructField::new("x", DataType::LONG, true)]), + true, + )]) + .into(), + )), + ), + ( + column_expr!("nested.y"), + Some(PhysicalPredicate::Some( + column_expr!("nested.phys_y").into(), + StructType::new(vec![StructField::new( + "nested", + StructType::new(vec![StructField::new("phys_y", DataType::LONG, true) + .with_metadata([( + ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), + "phys_y", + )])]), + true, + )]) + .into(), + )), + ), + ( + column_expr!("mapped.n"), + Some(PhysicalPredicate::Some( + column_expr!("phys_mapped.phys_n").into(), + StructType::new(vec![StructField::new( + "phys_mapped", + StructType::new(vec![StructField::new("phys_n", DataType::LONG, true) + .with_metadata([( + ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), + "phys_n", + )])]), + true, + ) + .with_metadata([( + ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), + "phys_mapped", + )])]) + .into(), + )), + ), + ( + Expression::and(column_expr!("mapped.n"), true), + Some(PhysicalPredicate::Some( + Expression::and(column_expr!("phys_mapped.phys_n"), true).into(), + StructType::new(vec![StructField::new( + "phys_mapped", + StructType::new(vec![StructField::new("phys_n", DataType::LONG, true) + .with_metadata([( + ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), + "phys_n", + )])]), + true, + ) + .with_metadata([( + ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), + "phys_mapped", + )])]) + .into(), + )), + ), + ( + Expression::and(column_expr!("mapped.n"), false), + Some(PhysicalPredicate::StaticSkipAll), + ), + ]; + + for (predicate, expected) in test_cases { + let result = PhysicalPredicate::try_new(&predicate, &logical_schema).ok(); + assert_eq!( + result, expected, + "Failed for predicate: {:#?}, expected {:#?}, got {:#?}", + predicate, expected, result + ); + } + } + fn get_files_for_scan(scan: Scan, engine: &dyn Engine) -> DeltaResult> { let scan_data = scan.scan_data(engine)?; fn scan_data_callback( @@ -808,17 +1146,13 @@ mod tests { let data: Vec<_> = scan.execute(engine.clone()).unwrap().try_collect().unwrap(); assert_eq!(data.len(), 1); - // Predicate over a logically missing column, so the one data file should be returned. - // - // TODO: This should ideally trigger an error instead? + // Predicate over a logically missing column fails the scan let predicate = Arc::new(column_expr!("numeric.ints.invalid").lt(1000)); - let scan = snapshot + snapshot .scan_builder() .with_predicate(predicate) .build() - .unwrap(); - let data: Vec<_> = scan.execute(engine).unwrap().try_collect().unwrap(); - assert_eq!(data.len(), 1); + .expect_err("unknown column"); } #[test_log::test] diff --git a/kernel/src/schema.rs b/kernel/src/schema.rs index 93fcf8d65..42901751f 100644 --- a/kernel/src/schema.rs +++ b/kernel/src/schema.rs @@ -53,6 +53,12 @@ impl From<&String> for MetadataValue { } } +impl From<&str> for MetadataValue { + fn from(value: &str) -> Self { + Self::String(value.to_string()) + } +} + impl From for MetadataValue { fn from(value: i32) -> Self { Self::Number(value) @@ -134,8 +140,8 @@ impl StructField { /// Get the physical name for this field as it should be read from parquet. /// /// NOTE: Caller affirms that the schema was already validated by - /// [`crate::table_features::validate_schema_column_mapping`], to ensure that - /// annotations are always and only present when column mapping mode is enabled. + /// [`crate::table_features::validate_schema_column_mapping`], to ensure that annotations are + /// always and only present when column mapping mode is enabled. pub fn physical_name(&self) -> &str { match self .metadata diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index 9c6cfe872..0deb24682 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -52,9 +52,9 @@ pub(crate) fn table_changes_action_iter( engine: Arc, commit_files: impl IntoIterator, table_schema: SchemaRef, - predicate: Option, + physical_predicate: Option<(ExpressionRef, SchemaRef)>, ) -> DeltaResult>> { - let filter = DataSkippingFilter::new(engine.as_ref(), &table_schema, predicate).map(Arc::new); + let filter = DataSkippingFilter::new(engine.as_ref(), physical_predicate).map(Arc::new); let result = commit_files .into_iter() .map(move |commit_file| -> DeltaResult<_> { diff --git a/kernel/src/table_changes/log_replay/tests.rs b/kernel/src/table_changes/log_replay/tests.rs index 9953dd464..f2dbdd956 100644 --- a/kernel/src/table_changes/log_replay/tests.rs +++ b/kernel/src/table_changes/log_replay/tests.rs @@ -8,6 +8,7 @@ use crate::expressions::{column_expr, BinaryOperator}; use crate::log_segment::LogSegment; use crate::path::ParsedLogPath; use crate::scan::state::DvInfo; +use crate::scan::PhysicalPredicate; use crate::schema::{DataType, StructField, StructType}; use crate::table_changes::log_replay::LogReplayScanner; use crate::table_features::ReaderFeatures; @@ -523,18 +524,22 @@ async fn data_skipping_filter() { column_expr!("id"), Scalar::from(4), ); + let logical_schema = get_schema(); + let predicate = match PhysicalPredicate::try_new(&predicate, &logical_schema) { + Ok(PhysicalPredicate::Some(p, s)) => Some((p, s)), + other => panic!("Unexpected result: {:?}", other), + }; let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) .unwrap() .into_iter(); - let sv = - table_changes_action_iter(engine, commits, get_schema().into(), Some(predicate.into())) - .unwrap() - .flat_map(|scan_data| { - let scan_data = scan_data.unwrap(); - scan_data.selection_vector - }) - .collect_vec(); + let sv = table_changes_action_iter(engine, commits, logical_schema.into(), predicate) + .unwrap() + .flat_map(|scan_data| { + let scan_data = scan_data.unwrap(); + scan_data.selection_vector + }) + .collect_vec(); // Note: since the first pair is a dv operation, remove action will always be filtered assert_eq!(sv, &[false, true, false, false, true]); diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 980374177..92d20ff3d 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -4,7 +4,7 @@ use itertools::Itertools; use tracing::debug; use crate::scan::state::GlobalScanState; -use crate::scan::ColumnType; +use crate::scan::{ColumnType, PhysicalPredicate}; use crate::schema::{SchemaRef, StructType}; use crate::{DeltaResult, Engine, ExpressionRef}; @@ -26,7 +26,7 @@ pub struct TableChangesScan { // Data Feed physical_schema: SchemaRef, // The predicate to filter the data - predicate: Option, + physical_predicate: PhysicalPredicate, // The [`ColumnType`] of all the fields in the `logical_schema` all_fields: Vec, // `true` if any column in the `logical_schema` is a partition column @@ -159,10 +159,15 @@ impl TableChangesScanBuilder { } }) .try_collect()?; + let physical_predicate = match self.predicate { + Some(predicate) => PhysicalPredicate::try_new(&predicate, &logical_schema)?, + None => PhysicalPredicate::None, + }; + Ok(TableChangesScan { table_changes: self.table_changes, logical_schema, - predicate: self.predicate, + physical_predicate, all_fields, have_partition_cols, physical_schema: StructType::new(read_fields).into(), @@ -186,8 +191,15 @@ impl TableChangesScan { .log_segment .ascending_commit_files .clone(); + // NOTE: This is a cheap arc clone + let physical_predicate = match self.physical_predicate.clone() { + PhysicalPredicate::StaticSkipAll => return Ok(None.into_iter().flatten()), + PhysicalPredicate::Some(predicate, schema) => Some((predicate, schema)), + PhysicalPredicate::None => None, + }; let schema = self.table_changes.end_snapshot.schema().clone().into(); - table_changes_action_iter(engine, commits, schema, self.predicate.clone()) + let it = table_changes_action_iter(engine, commits, schema, physical_predicate)?; + Ok(Some(it).into_iter().flatten()) } /// Get global state that is valid for the entire scan. This is somewhat expensive so should @@ -211,7 +223,7 @@ mod tests { use crate::engine::sync::SyncEngine; use crate::expressions::{column_expr, Scalar}; - use crate::scan::ColumnType; + use crate::scan::{ColumnType, PhysicalPredicate}; use crate::schema::{DataType, StructField, StructType}; use crate::table_changes::COMMIT_VERSION_COL_NAME; use crate::{Expression, Table}; @@ -237,7 +249,7 @@ mod tests { ColumnType::Selected("_commit_timestamp".to_string()), ] ); - assert_eq!(scan.predicate, None); + assert_eq!(scan.physical_predicate, PhysicalPredicate::None); assert!(!scan.have_partition_cols); } @@ -277,6 +289,12 @@ mod tests { .into() ); assert!(!scan.have_partition_cols); - assert_eq!(scan.predicate, Some(predicate)); + assert_eq!( + scan.physical_predicate, + PhysicalPredicate::Some( + predicate, + StructType::new([StructField::new("id", DataType::INTEGER, true),]).into() + ) + ); } } diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 857ed3279..a0a8160c1 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -417,7 +417,7 @@ fn read_with_scan_data( .read_parquet_files( &[meta], global_state.read_schema.clone(), - scan.predicate().clone(), + scan.physical_predicate().clone(), ) .unwrap(); @@ -484,6 +484,7 @@ fn read_table_data( .map(|col| table_schema.field(col).cloned().unwrap()); Arc::new(Schema::new(selected_fields)) }); + println!("Read {url:?} with schema {read_schema:#?} and predicate {predicate:#?}"); let scan = snapshot .into_scan_builder() .with_schema_opt(read_schema) @@ -851,6 +852,10 @@ fn invalid_skips_none_predicates() -> Result<(), Box> { let empty_struct = Expression::struct_from(vec![]); let cases = vec![ (Expression::literal(false), table_for_numbers(vec![])), + ( + Expression::and(column_expr!("number"), false), + table_for_numbers(vec![]), + ), ( Expression::literal(true), table_for_numbers(vec![1, 2, 3, 4, 5, 6]), @@ -1041,17 +1046,17 @@ fn predicate_references_invalid_missing_column() -> Result<(), Box Result<(), Box