diff --git a/crates/polars-plan/src/frame/opt_state.rs b/crates/polars-plan/src/frame/opt_state.rs index 23e4d28e44b0..affbf90774e8 100644 --- a/crates/polars-plan/src/frame/opt_state.rs +++ b/crates/polars-plan/src/frame/opt_state.rs @@ -47,6 +47,41 @@ impl OptFlags { pub fn schema_only() -> Self { Self::TYPE_COERCION | Self::TYPE_CHECK } + + pub fn eager(&self) -> bool { + self.contains(OptFlags::EAGER) + } + + pub fn cluster_with_columns(&self) -> bool { + self.contains(OptFlags::CLUSTER_WITH_COLUMNS) + } + + pub fn collapse_joins(&self) -> bool { + self.contains(OptFlags::COLLAPSE_JOINS) + } + + pub fn predicate_pushdown(&self) -> bool { + self.contains(OptFlags::PREDICATE_PUSHDOWN) + } + + pub fn projection_pushdown(&self) -> bool { + self.contains(OptFlags::PROJECTION_PUSHDOWN) + } + pub fn simplify_expr(&self) -> bool { + self.contains(OptFlags::SIMPLIFY_EXPR) + } + pub fn slice_pushdown(&self) -> bool { + self.contains(OptFlags::SLICE_PUSHDOWN) + } + pub fn streaming(&self) -> bool { + self.contains(OptFlags::STREAMING) + } + pub fn new_streaming(&self) -> bool { + self.contains(OptFlags::NEW_STREAMING) + } + pub fn fast_projection(&self) -> bool { + self.contains(OptFlags::FAST_PROJECTION) + } } impl Default for OptFlags { diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index eeb36adba234..2f74f10237f9 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -441,6 +441,7 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult predicates.push(n) } } + let multiple_filters = predicates.len() > 1; for predicate in predicates { let predicate = ExprIR::from_node(predicate, ctxt.expr_arena); @@ -449,6 +450,12 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult let lp = IR::Filter { input, predicate }; input = run_conversion(lp, ctxt, "filter")?; } + + // Ensure that predicate are combined by optimizer + if ctxt.opt_flags.eager() && multiple_filters { + ctxt.opt_flags.insert(OptFlags::EAGER); + } + Ok(input) } else { ctxt.conversion_optimizer diff --git a/crates/polars-plan/src/plans/optimizer/mod.rs b/crates/polars-plan/src/plans/optimizer/mod.rs index 7661ec7c5368..19ea4e9c5317 100644 --- a/crates/polars-plan/src/plans/optimizer/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/mod.rs @@ -83,20 +83,8 @@ pub fn optimize( } let mut lp_top = to_alp(logical_plan, expr_arena, lp_arena, &mut opt_state)?; - // get toggle values - let cluster_with_columns = opt_state.contains(OptFlags::CLUSTER_WITH_COLUMNS); - let collapse_joins = opt_state.contains(OptFlags::COLLAPSE_JOINS); - let predicate_pushdown = opt_state.contains(OptFlags::PREDICATE_PUSHDOWN); - let projection_pushdown = opt_state.contains(OptFlags::PROJECTION_PUSHDOWN); - let simplify_expr = opt_state.contains(OptFlags::SIMPLIFY_EXPR); - let slice_pushdown = opt_state.contains(OptFlags::SLICE_PUSHDOWN); - let streaming = opt_state.contains(OptFlags::STREAMING); - let new_streaming = opt_state.contains(OptFlags::NEW_STREAMING); - let fast_projection = opt_state.contains(OptFlags::FAST_PROJECTION); - // Don't run optimizations that don't make sense on a single node. // This keeps eager execution more snappy. - let eager = opt_state.contains(OptFlags::EAGER); #[cfg(feature = "cse")] let comm_subplan_elim = opt_state.contains(OptFlags::COMM_SUBPLAN_ELIM); @@ -106,7 +94,8 @@ pub fn optimize( let comm_subexpr_elim = false; #[allow(unused_variables)] - let agg_scan_projection = opt_state.contains(OptFlags::FILE_CACHING) && !streaming && !eager; + let agg_scan_projection = + opt_state.contains(OptFlags::FILE_CACHING) && !opt_state.streaming() && !opt_state.eager(); // During debug we check if the optimizations have not modified the final schema. #[cfg(debug_assertions)] @@ -114,7 +103,7 @@ pub fn optimize( // Collect members for optimizations that need it. let mut members = MemberCollector::new(); - if !eager && (comm_subexpr_elim || projection_pushdown) { + if !opt_state.eager() && (comm_subexpr_elim || opt_state.projection_pushdown()) { members.collect(lp_top, lp_arena, expr_arena) } @@ -125,7 +114,7 @@ pub fn optimize( set_order_flags(lp_top, lp_arena, expr_arena, scratch); } - if simplify_expr { + if opt_state.simplify_expr() { #[cfg(feature = "fused")] rules.push(Box::new(fused::FusedArithmetic {})); } @@ -153,7 +142,7 @@ pub fn optimize( let _cse_plan_changed = false; // Should be run before predicate pushdown. - if projection_pushdown { + if opt_state.projection_pushdown() { let mut projection_pushdown_opt = ProjectionPushDown::new(); let alp = lp_arena.take(lp_top); let alp = projection_pushdown_opt.optimize(alp, lp_arena, expr_arena)?; @@ -165,33 +154,36 @@ pub fn optimize( } } - if predicate_pushdown { + if opt_state.predicate_pushdown() { let mut predicate_pushdown_opt = PredicatePushDown::new(expr_eval); let alp = lp_arena.take(lp_top); let alp = predicate_pushdown_opt.optimize(alp, lp_arena, expr_arena)?; lp_arena.replace(lp_top, alp); } - if cluster_with_columns { + if opt_state.cluster_with_columns() { cluster_with_columns::optimize(lp_top, lp_arena, expr_arena) } // Make sure it is after predicate pushdown - if collapse_joins && members.has_filter_with_join_input { + if opt_state.collapse_joins() && members.has_filter_with_join_input { collapse_joins::optimize(lp_top, lp_arena, expr_arena) } // Make sure its before slice pushdown. - if fast_projection { - rules.push(Box::new(SimpleProjectionAndCollapse::new(eager))); + if opt_state.fast_projection() { + rules.push(Box::new(SimpleProjectionAndCollapse::new( + opt_state.eager(), + ))); } - if !eager { + if !opt_state.eager() { rules.push(Box::new(DelayRechunk::new())); } - if slice_pushdown { - let mut slice_pushdown_opt = SlicePushDown::new(streaming, new_streaming); + if opt_state.slice_pushdown() { + let mut slice_pushdown_opt = + SlicePushDown::new(opt_state.streaming(), opt_state.new_streaming()); let alp = lp_arena.take(lp_top); let alp = slice_pushdown_opt.optimize(alp, lp_arena, expr_arena)?; @@ -202,11 +194,11 @@ pub fn optimize( } // This optimization removes branches, so we must do it when type coercion // is completed. - if simplify_expr { + if opt_state.simplify_expr() { rules.push(Box::new(SimplifyBooleanRule {})); } - if !eager { + if !opt_state.eager() { rules.push(Box::new(FlattenUnionRule {})); }