Skip to content

Commit

Permalink
feat(optimizer): Add join reordering as an optimizer rule (#3642)
Browse files Browse the repository at this point in the history
Applies the naive left deep join order from #3616 as an optimizer rule.
This optimizer rule is gated behind an environment variable that allows
us to validate the rule on our current workloads.

Currently join reordering results in errors for 50% of TPC-H queries
during join graph building. We'll tackle these in a follow-up PR.
  • Loading branch information
desmondcheongzx authored Jan 8, 2025
1 parent e6c084f commit 1562569
Show file tree
Hide file tree
Showing 11 changed files with 332 additions and 117 deletions.
10 changes: 9 additions & 1 deletion src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,20 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Serialize, Deserialize, Default, Debug)]
pub struct DaftPlanningConfig {
pub default_io_config: IOConfig,
pub enable_join_reordering: bool,
}

impl DaftPlanningConfig {
#[must_use]
pub fn from_env() -> Self {
Default::default()
let mut cfg: Self = Default::default();
let join_reordering_var_name = "DAFT_DEV_ENABLE_JOIN_REORDERING";
if let Ok(val) = std::env::var(join_reordering_var_name)
&& matches!(val.trim().to_lowercase().as_str(), "1" | "true")
{
cfg.enable_join_reordering = true;
}
cfg
}
}

Expand Down
15 changes: 11 additions & 4 deletions src/daft-logical-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use {
use crate::{
logical_plan::LogicalPlan,
ops,
optimization::Optimizer,
optimization::OptimizerBuilder,
partitioning::{
HashRepartitionConfig, IntoPartitionsConfig, RandomShuffleConfig, RepartitionSpec,
},
Expand Down Expand Up @@ -381,8 +381,7 @@ impl LogicalPlanBuilder {
Ok(self.with_new_plan(pivot_logical_plan))
}

// Helper function to create inner joins more ergonimically in tests.
#[cfg(test)]
// Helper function to create inner joins more ergonimically.
pub(crate) fn inner_join<Right: Into<LogicalPlanRef>>(
&self,
right: Right,
Expand Down Expand Up @@ -616,7 +615,15 @@ impl LogicalPlanBuilder {
}

pub fn optimize(&self) -> DaftResult<Self> {
let optimizer = Optimizer::new(Default::default());
let optimizer = OptimizerBuilder::default()
.when(
self.config
.as_ref()
.map_or(false, |conf| conf.enable_join_reordering),
|builder| builder.reorder_joins(),
)
.simplify_expressions()
.build();

// Run LogicalPlan optimizations
let unoptimized_plan = self.build();
Expand Down
10 changes: 7 additions & 3 deletions src/daft-logical-plan/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use indexmap::IndexSet;
use snafu::Snafu;

pub use crate::ops::*;
use crate::stats::PlanStats;
use crate::stats::{PlanStats, StatsState};

/// Logical plan for a Daft query.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -204,7 +204,7 @@ impl LogicalPlan {
}
}

pub fn materialized_stats(&self) -> &PlanStats {
pub fn stats_state(&self) -> &StatsState {
match self {
Self::Source(Source { stats_state, .. })
| Self::Project(Project { stats_state, .. })
Expand All @@ -223,7 +223,7 @@ impl LogicalPlan {
| Self::Sink(Sink { stats_state, .. })
| Self::Sample(Sample { stats_state, .. })
| Self::MonotonicallyIncreasingId(MonotonicallyIncreasingId { stats_state, .. }) => {
stats_state.materialized_stats()
stats_state
}
Self::Intersect(_) => {
panic!("Intersect nodes should be optimized away before stats are materialized")
Expand All @@ -234,6 +234,10 @@ impl LogicalPlan {
}
}

pub fn materialized_stats(&self) -> &PlanStats {
self.stats_state().materialized_stats()
}

// Materializes stats over logical plans. If stats are already materialized, this function recomputes stats, which might be
// useful if stats become stale during query planning.
pub fn with_materialized_stats(self) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion src/daft-logical-plan/src/optimization/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ mod rules;
#[cfg(test)]
mod test;

pub use optimizer::{Optimizer, OptimizerConfig};
pub use optimizer::{Optimizer, OptimizerBuilder, OptimizerConfig};
169 changes: 108 additions & 61 deletions src/daft-logical-plan/src/optimization/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::{
rules::{
DropRepartition, EliminateCrossJoin, EnrichWithStats, FilterNullJoinKey,
LiftProjectFromAgg, MaterializeScans, OptimizerRule, PushDownFilter, PushDownLimit,
PushDownProjection, SimplifyExpressionsRule, SplitActorPoolProjects,
PushDownProjection, ReorderJoins, SimplifyExpressionsRule, SplitActorPoolProjects,
UnnestPredicateSubquery, UnnestScalarSubquery,
},
};
Expand Down Expand Up @@ -80,6 +80,113 @@ pub enum RuleExecutionStrategy {
FixedPoint(Option<usize>),
}

pub struct OptimizerBuilder {
// Batches of rules for the optimizer to apply.
rule_batches: Vec<RuleBatch>,
// Config for optimizer.
config: OptimizerConfig,
}

impl Default for OptimizerBuilder {
fn default() -> Self {
Self {
rule_batches: vec![
// --- Rewrite rules ---
RuleBatch::new(
vec![
Box::new(LiftProjectFromAgg::new()),
Box::new(UnnestScalarSubquery::new()),
Box::new(UnnestPredicateSubquery::new()),
Box::new(SplitActorPoolProjects::new()),
],
RuleExecutionStrategy::FixedPoint(None),
),
// we want to simplify expressions first to make the rest of the rules easier
RuleBatch::new(
vec![Box::new(SimplifyExpressionsRule::new())],
RuleExecutionStrategy::FixedPoint(Some(3)),
),
// --- Bulk of our rules ---
RuleBatch::new(
vec![
Box::new(DropRepartition::new()),
Box::new(FilterNullJoinKey::new()),
Box::new(PushDownFilter::new()),
Box::new(PushDownProjection::new()),
Box::new(EliminateCrossJoin::new()),
],
// Use a fixed-point policy for the pushdown rules: PushDownProjection can produce a Filter node
// at the current node, which would require another batch application in order to have a chance to push
// that Filter node through upstream nodes.
// TODO(Clark): Refine this fixed-point policy.
RuleExecutionStrategy::FixedPoint(Some(3)),
),
// --- Limit pushdowns ---
// This needs to be separate from PushDownProjection because otherwise the limit and
// projection just keep swapping places, preventing optimization
// (see https://github.com/Eventual-Inc/Daft/issues/2616)
RuleBatch::new(
vec![Box::new(PushDownLimit::new())],
RuleExecutionStrategy::FixedPoint(Some(3)),
),
// --- Materialize scan nodes ---
RuleBatch::new(
vec![Box::new(MaterializeScans::new())],
RuleExecutionStrategy::Once,
),
// --- Enrich logical plan with stats ---
RuleBatch::new(
vec![Box::new(EnrichWithStats::new())],
RuleExecutionStrategy::Once,
),
],
config: Default::default(),
}
}
}

impl OptimizerBuilder {
pub fn reorder_joins(mut self) -> Self {
self.rule_batches.push(RuleBatch::new(
vec![
Box::new(ReorderJoins::new()),
Box::new(EnrichWithStats::new()),
],
RuleExecutionStrategy::Once,
));
self
}

pub fn simplify_expressions(mut self) -> Self {
// Try to simplify expressions again as other rules could introduce new exprs.
self.rule_batches.push(RuleBatch::new(
vec![Box::new(SimplifyExpressionsRule::new())],
RuleExecutionStrategy::FixedPoint(Some(3)),
));
self
}

pub fn with_optimizer_config(mut self, config: OptimizerConfig) -> Self {
self.config = config;
self
}

pub fn build(self) -> Optimizer {
Optimizer {
rule_batches: self.rule_batches,
config: self.config,
}
}

pub fn when(self, condition: bool, f: impl FnOnce(Self) -> Self) -> Self {
if condition {
f(self)
} else {
self
}
}
}

/// Logical rule-based optimizer.
pub struct Optimizer {
// Batches of rules for the optimizer to apply.
Expand All @@ -89,66 +196,6 @@ pub struct Optimizer {
}

impl Optimizer {
pub fn new(config: OptimizerConfig) -> Self {
let rule_batches = vec![
// --- Rewrite rules ---
RuleBatch::new(
vec![
Box::new(LiftProjectFromAgg::new()),
Box::new(UnnestScalarSubquery::new()),
Box::new(UnnestPredicateSubquery::new()),
Box::new(SplitActorPoolProjects::new()),
],
RuleExecutionStrategy::FixedPoint(None),
),
// we want to simplify expressions first to make the rest of the rules easier
RuleBatch::new(
vec![Box::new(SimplifyExpressionsRule::new())],
RuleExecutionStrategy::FixedPoint(Some(3)),
),
// --- Bulk of our rules ---
RuleBatch::new(
vec![
Box::new(DropRepartition::new()),
Box::new(FilterNullJoinKey::new()),
Box::new(PushDownFilter::new()),
Box::new(PushDownProjection::new()),
Box::new(EliminateCrossJoin::new()),
],
// Use a fixed-point policy for the pushdown rules: PushDownProjection can produce a Filter node
// at the current node, which would require another batch application in order to have a chance to push
// that Filter node through upstream nodes.
// TODO(Clark): Refine this fixed-point policy.
RuleExecutionStrategy::FixedPoint(Some(3)),
),
// --- Limit pushdowns ---
// This needs to be separate from PushDownProjection because otherwise the limit and
// projection just keep swapping places, preventing optimization
// (see https://github.com/Eventual-Inc/Daft/issues/2616)
RuleBatch::new(
vec![Box::new(PushDownLimit::new())],
RuleExecutionStrategy::FixedPoint(Some(3)),
),
// --- Materialize scan nodes ---
RuleBatch::new(
vec![Box::new(MaterializeScans::new())],
RuleExecutionStrategy::Once,
),
// --- Enrich logical plan with stats ---
RuleBatch::new(
vec![Box::new(EnrichWithStats::new())],
RuleExecutionStrategy::Once,
),
// try to simplify expressions again as other rules could introduce new exprs
RuleBatch::new(
vec![Box::new(SimplifyExpressionsRule::new())],
RuleExecutionStrategy::FixedPoint(Some(3)),
),
];

Self::with_rule_batches(rule_batches, config)
}

pub fn with_rule_batches(rule_batches: Vec<RuleBatch>, config: OptimizerConfig) -> Self {
Self {
rule_batches,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ use common_error::DaftResult;
use common_treenode::{Transformed, TreeNode};

use super::OptimizerRule;
use crate::LogicalPlan;
use crate::{stats::StatsState, LogicalPlan};

// Add stats to all logical plan nodes in a bottom up fashion.
// All scan nodes MUST be materialized before stats are enriched.
impl OptimizerRule for EnrichWithStats {
fn try_optimize(&self, plan: Arc<LogicalPlan>) -> DaftResult<Transformed<Arc<LogicalPlan>>> {
plan.transform_up(|node: Arc<LogicalPlan>| {
Ok(Transformed::yes(
Arc::unwrap_or_clone(node).with_materialized_stats().into(),
))
let node = Arc::unwrap_or_clone(node);
if matches!(node.stats_state(), StatsState::Materialized(_)) {
Ok(Transformed::no(node.arced()))
} else {
Ok(Transformed::yes(node.with_materialized_stats().into()))
}
})
}
}
1 change: 1 addition & 0 deletions src/daft-logical-plan/src/optimization/rules/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub use materialize_scans::MaterializeScans;
pub use push_down_filter::PushDownFilter;
pub use push_down_limit::PushDownLimit;
pub use push_down_projection::PushDownProjection;
pub use reorder_joins::ReorderJoins;
pub use rule::OptimizerRule;
pub use simplify_expressions::SimplifyExpressionsRule;
pub use split_actor_pool_projects::SplitActorPoolProjects;
Expand Down
Loading

0 comments on commit 1562569

Please sign in to comment.