Skip to content

Commit

Permalink
perf: Collapse expanded filters in eager (#20493)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Dec 29, 2024
1 parent 5671b0f commit f5f4cb5
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 26 deletions.
35 changes: 35 additions & 0 deletions crates/polars-plan/src/frame/opt_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,41 @@ impl OptFlags {
pub fn schema_only() -> Self {
Self::TYPE_COERCION | Self::TYPE_CHECK
}

pub fn eager(&self) -> bool {
self.contains(OptFlags::EAGER)
}

pub fn cluster_with_columns(&self) -> bool {
self.contains(OptFlags::CLUSTER_WITH_COLUMNS)
}

pub fn collapse_joins(&self) -> bool {
self.contains(OptFlags::COLLAPSE_JOINS)
}

pub fn predicate_pushdown(&self) -> bool {
self.contains(OptFlags::PREDICATE_PUSHDOWN)
}

pub fn projection_pushdown(&self) -> bool {
self.contains(OptFlags::PROJECTION_PUSHDOWN)
}
pub fn simplify_expr(&self) -> bool {
self.contains(OptFlags::SIMPLIFY_EXPR)
}
pub fn slice_pushdown(&self) -> bool {
self.contains(OptFlags::SLICE_PUSHDOWN)
}
pub fn streaming(&self) -> bool {
self.contains(OptFlags::STREAMING)
}
pub fn new_streaming(&self) -> bool {
self.contains(OptFlags::NEW_STREAMING)
}
pub fn fast_projection(&self) -> bool {
self.contains(OptFlags::FAST_PROJECTION)
}
}

impl Default for OptFlags {
Expand Down
7 changes: 7 additions & 0 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
predicates.push(n)
}
}
let multiple_filters = predicates.len() > 1;

for predicate in predicates {
let predicate = ExprIR::from_node(predicate, ctxt.expr_arena);
Expand All @@ -449,6 +450,12 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
let lp = IR::Filter { input, predicate };
input = run_conversion(lp, ctxt, "filter")?;
}

// Ensure that predicate are combined by optimizer
if ctxt.opt_flags.eager() && multiple_filters {
ctxt.opt_flags.insert(OptFlags::EAGER);
}

Ok(input)
} else {
ctxt.conversion_optimizer
Expand Down
44 changes: 18 additions & 26 deletions crates/polars-plan/src/plans/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,8 @@ pub fn optimize(
}
let mut lp_top = to_alp(logical_plan, expr_arena, lp_arena, &mut opt_state)?;

// get toggle values
let cluster_with_columns = opt_state.contains(OptFlags::CLUSTER_WITH_COLUMNS);
let collapse_joins = opt_state.contains(OptFlags::COLLAPSE_JOINS);
let predicate_pushdown = opt_state.contains(OptFlags::PREDICATE_PUSHDOWN);
let projection_pushdown = opt_state.contains(OptFlags::PROJECTION_PUSHDOWN);
let simplify_expr = opt_state.contains(OptFlags::SIMPLIFY_EXPR);
let slice_pushdown = opt_state.contains(OptFlags::SLICE_PUSHDOWN);
let streaming = opt_state.contains(OptFlags::STREAMING);
let new_streaming = opt_state.contains(OptFlags::NEW_STREAMING);
let fast_projection = opt_state.contains(OptFlags::FAST_PROJECTION);

// Don't run optimizations that don't make sense on a single node.
// This keeps eager execution more snappy.
let eager = opt_state.contains(OptFlags::EAGER);
#[cfg(feature = "cse")]
let comm_subplan_elim = opt_state.contains(OptFlags::COMM_SUBPLAN_ELIM);

Expand All @@ -106,15 +94,16 @@ pub fn optimize(
let comm_subexpr_elim = false;

#[allow(unused_variables)]
let agg_scan_projection = opt_state.contains(OptFlags::FILE_CACHING) && !streaming && !eager;
let agg_scan_projection =
opt_state.contains(OptFlags::FILE_CACHING) && !opt_state.streaming() && !opt_state.eager();

// During debug we check if the optimizations have not modified the final schema.
#[cfg(debug_assertions)]
let prev_schema = lp_arena.get(lp_top).schema(lp_arena).into_owned();

// Collect members for optimizations that need it.
let mut members = MemberCollector::new();
if !eager && (comm_subexpr_elim || projection_pushdown) {
if !opt_state.eager() && (comm_subexpr_elim || opt_state.projection_pushdown()) {
members.collect(lp_top, lp_arena, expr_arena)
}

Expand All @@ -125,7 +114,7 @@ pub fn optimize(
set_order_flags(lp_top, lp_arena, expr_arena, scratch);
}

if simplify_expr {
if opt_state.simplify_expr() {
#[cfg(feature = "fused")]
rules.push(Box::new(fused::FusedArithmetic {}));
}
Expand Down Expand Up @@ -153,7 +142,7 @@ pub fn optimize(
let _cse_plan_changed = false;

// Should be run before predicate pushdown.
if projection_pushdown {
if opt_state.projection_pushdown() {
let mut projection_pushdown_opt = ProjectionPushDown::new();
let alp = lp_arena.take(lp_top);
let alp = projection_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
Expand All @@ -165,33 +154,36 @@ pub fn optimize(
}
}

if predicate_pushdown {
if opt_state.predicate_pushdown() {
let mut predicate_pushdown_opt = PredicatePushDown::new(expr_eval);
let alp = lp_arena.take(lp_top);
let alp = predicate_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
lp_arena.replace(lp_top, alp);
}

if cluster_with_columns {
if opt_state.cluster_with_columns() {
cluster_with_columns::optimize(lp_top, lp_arena, expr_arena)
}

// Make sure it is after predicate pushdown
if collapse_joins && members.has_filter_with_join_input {
if opt_state.collapse_joins() && members.has_filter_with_join_input {
collapse_joins::optimize(lp_top, lp_arena, expr_arena)
}

// Make sure its before slice pushdown.
if fast_projection {
rules.push(Box::new(SimpleProjectionAndCollapse::new(eager)));
if opt_state.fast_projection() {
rules.push(Box::new(SimpleProjectionAndCollapse::new(
opt_state.eager(),
)));
}

if !eager {
if !opt_state.eager() {
rules.push(Box::new(DelayRechunk::new()));
}

if slice_pushdown {
let mut slice_pushdown_opt = SlicePushDown::new(streaming, new_streaming);
if opt_state.slice_pushdown() {
let mut slice_pushdown_opt =
SlicePushDown::new(opt_state.streaming(), opt_state.new_streaming());
let alp = lp_arena.take(lp_top);
let alp = slice_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;

Expand All @@ -202,11 +194,11 @@ pub fn optimize(
}
// This optimization removes branches, so we must do it when type coercion
// is completed.
if simplify_expr {
if opt_state.simplify_expr() {
rules.push(Box::new(SimplifyBooleanRule {}));
}

if !eager {
if !opt_state.eager() {
rules.push(Box::new(FlattenUnionRule {}));
}

Expand Down

0 comments on commit f5f4cb5

Please sign in to comment.