Skip to content

Commit a8925f3

Browse files
rkrishn7alamb
andauthored
chore: refactor usage of reassign_predicate_columns (#17703)
* chore: refactor usage of `reassign_predicate_columns` * chore: Address PR comments --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent d05dcc3 commit a8925f3

File tree

6 files changed

+60
-65
lines changed

6 files changed

+60
-65
lines changed

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}
7676
use datafusion_common::Result;
7777
use datafusion_datasource::schema_adapter::{SchemaAdapterFactory, SchemaMapper};
7878
use datafusion_physical_expr::expressions::Column;
79-
use datafusion_physical_expr::utils::reassign_predicate_columns;
79+
use datafusion_physical_expr::utils::reassign_expr_columns;
8080
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
8181

8282
use datafusion_physical_plan::metrics;
@@ -119,9 +119,8 @@ impl DatafusionArrowPredicate {
119119
rows_matched: metrics::Count,
120120
time: metrics::Time,
121121
) -> Result<Self> {
122-
let projected_schema = Arc::clone(&candidate.filter_schema);
123122
let physical_expr =
124-
reassign_predicate_columns(candidate.expr, &projected_schema, true)?;
123+
reassign_expr_columns(candidate.expr, &candidate.filter_schema)?;
125124

126125
Ok(Self {
127126
physical_expr,

datafusion/datasource/src/file_scan_config.rs

Lines changed: 24 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -48,21 +48,21 @@ use datafusion_common::{
4848
use datafusion_execution::{
4949
object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
5050
};
51-
use datafusion_physical_expr::{expressions::Column, utils::reassign_predicate_columns};
52-
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
51+
use datafusion_expr::Operator;
52+
use datafusion_physical_expr::expressions::BinaryExpr;
53+
use datafusion_physical_expr::{expressions::Column, utils::reassign_expr_columns};
54+
use datafusion_physical_expr::{split_conjunction, EquivalenceProperties, Partitioning};
5355
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
5456
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
5557
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
58+
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
5659
use datafusion_physical_plan::projection::ProjectionExpr;
5760
use datafusion_physical_plan::{
5861
display::{display_orderings, ProjectSchemaDisplay},
5962
metrics::ExecutionPlanMetricsSet,
6063
projection::{all_alias_free_columns, new_projections_for_columns},
6164
DisplayAs, DisplayFormatType,
6265
};
63-
use datafusion_physical_plan::{
64-
filter::collect_columns_from_predicate, filter_pushdown::FilterPushdownPropagation,
65-
};
6666

6767
use datafusion_physical_plan::coop::cooperative;
6868
use datafusion_physical_plan::execution_plan::SchedulingType;
@@ -588,27 +588,14 @@ impl DataSource for FileScanConfig {
588588
if let Some(filter) = self.file_source.filter() {
589589
// We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with.
590590
// Note that this will *ignore* any non-projected columns: these don't factor into ordering / equivalence.
591-
match reassign_predicate_columns(filter, &schema, true) {
592-
Ok(filter) => {
593-
match Self::add_filter_equivalence_info(
594-
filter,
595-
&mut eq_properties,
596-
&schema,
597-
) {
598-
Ok(()) => {}
599-
Err(e) => {
600-
warn!("Failed to add filter equivalence info: {e}");
601-
#[cfg(debug_assertions)]
602-
panic!("Failed to add filter equivalence info: {e}");
603-
}
604-
}
605-
}
591+
match Self::add_filter_equivalence_info(filter, &mut eq_properties, &schema) {
592+
Ok(()) => {}
606593
Err(e) => {
607-
warn!("Failed to reassign predicate columns: {e}");
594+
warn!("Failed to add filter equivalence info: {e}");
608595
#[cfg(debug_assertions)]
609-
panic!("Failed to reassign predicate columns: {e}");
596+
panic!("Failed to add filter equivalence info: {e}");
610597
}
611-
};
598+
}
612599
}
613600
eq_properties
614601
}
@@ -764,24 +751,24 @@ impl FileScanConfig {
764751
eq_properties: &mut EquivalenceProperties,
765752
schema: &Schema,
766753
) -> Result<()> {
767-
macro_rules! ignore_dangling_col {
768-
($col:expr) => {
769-
if let Some(col) = $col.as_any().downcast_ref::<Column>() {
770-
if schema.index_of(col.name()).is_err() {
771-
continue;
754+
// Gather valid equality pairs from the filter expression
755+
let equal_pairs = split_conjunction(&filter).into_iter().filter_map(|expr| {
756+
// Ignore any binary expressions that reference non-existent columns in the current schema
757+
// (e.g. due to unnecessary projections being removed)
758+
reassign_expr_columns(Arc::clone(expr), schema)
759+
.ok()
760+
.and_then(|expr| match expr.as_any().downcast_ref::<BinaryExpr>() {
761+
Some(expr) if expr.op() == &Operator::Eq => {
762+
Some((Arc::clone(expr.left()), Arc::clone(expr.right())))
772763
}
773-
}
774-
};
775-
}
764+
_ => None,
765+
})
766+
});
776767

777-
let (equal_pairs, _) = collect_columns_from_predicate(&filter);
778768
for (lhs, rhs) in equal_pairs {
779-
// Ignore any binary expressions that reference non-existent columns in the current schema
780-
// (e.g. due to unnecessary projections being removed)
781-
ignore_dangling_col!(lhs);
782-
ignore_dangling_col!(rhs);
783-
eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
769+
eq_properties.add_equal_conditions(lhs, rhs)?
784770
}
771+
785772
Ok(())
786773
}
787774

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
337337
mod test {
338338
use crate::{
339339
expressions::{col, lit, BinaryExpr},
340-
utils::reassign_predicate_columns,
340+
utils::reassign_expr_columns,
341341
};
342342
use arrow::{
343343
array::RecordBatch,
@@ -375,18 +375,16 @@ mod test {
375375
]));
376376
// Each ParquetExec calls `with_new_children` on the DynamicFilterPhysicalExpr
377377
// and remaps the children to the file schema.
378-
let dynamic_filter_1 = reassign_predicate_columns(
378+
let dynamic_filter_1 = reassign_expr_columns(
379379
Arc::clone(&dynamic_filter) as Arc<dyn PhysicalExpr>,
380380
&filter_schema_1,
381-
false,
382381
)
383382
.unwrap();
384383
let snap = dynamic_filter_1.snapshot().unwrap().unwrap();
385384
insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42), field: Field { name: "lit", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }"#);
386-
let dynamic_filter_2 = reassign_predicate_columns(
385+
let dynamic_filter_2 = reassign_expr_columns(
387386
Arc::clone(&dynamic_filter) as Arc<dyn PhysicalExpr>,
388387
&filter_schema_2,
389-
false,
390388
)
391389
.unwrap();
392390
let snap = dynamic_filter_2.snapshot().unwrap().unwrap();

datafusion/physical-expr/src/utils/mod.rs

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -238,22 +238,23 @@ pub fn collect_columns(expr: &Arc<dyn PhysicalExpr>) -> HashSet<Column> {
238238
columns
239239
}
240240

241-
/// Re-assign column indices referenced in predicate according to given schema.
242-
/// This may be helpful when dealing with projections.
243-
pub fn reassign_predicate_columns(
244-
pred: Arc<dyn PhysicalExpr>,
241+
/// Re-assign indices of [`Column`]s within the given [`PhysicalExpr`] according to
242+
/// the provided [`Schema`].
243+
///
244+
/// This can be useful when attempting to map an expression onto a different schema.
245+
///
246+
/// # Errors
247+
///
248+
/// This function will return an error if any column in the expression cannot be found
249+
/// in the provided schema.
250+
pub fn reassign_expr_columns(
251+
expr: Arc<dyn PhysicalExpr>,
245252
schema: &Schema,
246-
ignore_not_found: bool,
247253
) -> Result<Arc<dyn PhysicalExpr>> {
248-
pred.transform_down(|expr| {
249-
let expr_any = expr.as_any();
250-
251-
if let Some(column) = expr_any.downcast_ref::<Column>() {
252-
let index = match schema.index_of(column.name()) {
253-
Ok(idx) => idx,
254-
Err(_) if ignore_not_found => usize::MAX,
255-
Err(e) => return Err(e.into()),
256-
};
254+
expr.transform_down(|expr| {
255+
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
256+
let index = schema.index_of(column.name())?;
257+
257258
return Ok(Transformed::yes(Arc::new(Column::new(
258259
column.name(),
259260
index,
@@ -506,7 +507,7 @@ pub(crate) mod tests {
506507
}
507508

508509
#[test]
509-
fn test_reassign_predicate_columns_in_list() {
510+
fn test_reassign_expr_columns_in_list() {
510511
let int_field = Field::new("should_not_matter", DataType::Int64, true);
511512
let dict_field = Field::new(
512513
"id",
@@ -526,7 +527,7 @@ pub(crate) mod tests {
526527
)
527528
.unwrap();
528529

529-
let actual = reassign_predicate_columns(pred, &schema_small, false).unwrap();
530+
let actual = reassign_expr_columns(pred, &schema_small).unwrap();
530531

531532
let expected = in_list(
532533
Arc::new(Column::new_with_schema("id", &schema_small).unwrap()),

datafusion/physical-plan/src/filter.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ impl FilterExec {
265265
default_selectivity,
266266
)?;
267267
let mut eq_properties = input.equivalence_properties().clone();
268-
let (equal_pairs, _) = collect_columns_from_predicate(predicate);
268+
let (equal_pairs, _) = collect_columns_from_predicate_inner(predicate);
269269
for (lhs, rhs) in equal_pairs {
270270
eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
271271
}
@@ -716,8 +716,18 @@ impl RecordBatchStream for FilterExecStream {
716716
}
717717

718718
/// Return the equals Column-Pairs and Non-equals Column-Pairs
719+
#[deprecated(
720+
since = "51.0.0",
721+
note = "This function will be internal in the future"
722+
)]
719723
pub fn collect_columns_from_predicate(
720724
predicate: &'_ Arc<dyn PhysicalExpr>,
725+
) -> EqualAndNonEqual<'_> {
726+
collect_columns_from_predicate_inner(predicate)
727+
}
728+
729+
fn collect_columns_from_predicate_inner(
730+
predicate: &'_ Arc<dyn PhysicalExpr>,
721731
) -> EqualAndNonEqual<'_> {
722732
let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
723733
let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
@@ -787,7 +797,7 @@ mod tests {
787797
&schema,
788798
)?;
789799

790-
let (equal_pairs, ne_pairs) = collect_columns_from_predicate(&predicate);
800+
let (equal_pairs, ne_pairs) = collect_columns_from_predicate_inner(&predicate);
791801
assert_eq!(2, equal_pairs.len());
792802
assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
793803
assert!(equal_pairs[0].1.eq(&lit(4u32)));

datafusion/physical-plan/src/filter_pushdown.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use std::collections::HashSet;
3838
use std::sync::Arc;
3939

4040
use datafusion_common::Result;
41-
use datafusion_physical_expr::utils::{collect_columns, reassign_predicate_columns};
41+
use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
4242
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
4343
use itertools::Itertools;
4444

@@ -343,7 +343,7 @@ impl ChildFilterDescription {
343343
// All columns exist in child - we can push down
344344
// Need to reassign column indices to match child schema
345345
let reassigned_filter =
346-
reassign_predicate_columns(Arc::clone(filter), &child_schema, false)?;
346+
reassign_expr_columns(Arc::clone(filter), &child_schema)?;
347347
child_parent_filters
348348
.push(PushedDownPredicate::supported(reassigned_filter));
349349
} else {

0 commit comments

Comments
 (0)