Skip to content

Commit

Permalink
[FEAT]: expr simplifier (#3393)
Browse files Browse the repository at this point in the history
still todo: 

- [x] add a bunch of test cases. 
~~- [ ] add simplification for `AND` exprs~~

### Note for reviewers:  

This bypasses the expr simplifier rule if the source is
`SQLScanOperator` it was simplifying the queries in a way that weren't
working with connectorx. I haven't been able to debug exactly what was
going on, but I think it's pretty ok to just skip this optimization for
this source for now.

Also, wanted to implement a const expr evaluator, but since our
`eval_expressions` is associated with `daft_table::Table`, this kind of
optimization is not currently possible, and some refactoring is needed.
  • Loading branch information
universalmind303 authored Dec 5, 2024
1 parent c2abed8 commit e1c3faf
Show file tree
Hide file tree
Showing 15 changed files with 800 additions and 3 deletions.
3 changes: 3 additions & 0 deletions daft/delta_lake/delta_lake_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ def __init__(
def schema(self) -> Schema:
return self._schema

def name(self) -> str:
return "DeltaLakeScanOperator"

def display_name(self) -> str:
return f"DeltaLakeScanOperator({self._table.metadata().name})"

Expand Down
3 changes: 3 additions & 0 deletions daft/hudi/hudi_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def __init__(self, table_uri: str, storage_config: StorageConfig) -> None:
def schema(self) -> Schema:
return self._schema

def name(self) -> str:
return "HudiScanOperator"

def display_name(self) -> str:
return f"HudiScanOperator({self._table.props.name})"

Expand Down
3 changes: 3 additions & 0 deletions daft/iceberg/iceberg_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ def __init__(self, iceberg_table: Table, snapshot_id: int | None, storage_config
def schema(self) -> Schema:
return self._schema

def name(self) -> str:
return "IcebergScanOperator"

def display_name(self) -> str:
return f"IcebergScanOperator({'.'.join(self._table.name())})"

Expand Down
3 changes: 3 additions & 0 deletions daft/io/_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ def __init__(
self._generators = generators
self._schema = schema

def name(self) -> str:
return self.display_name()

def display_name(self) -> str:
return "GeneratorScanOperator"

Expand Down
3 changes: 3 additions & 0 deletions daft/io/_lance.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ class LanceDBScanOperator(ScanOperator):
def __init__(self, ds: "lance.LanceDataset"):
self._ds = ds

def name(self) -> str:
return "LanceDBScanOperator"

def display_name(self) -> str:
return f"LanceDBScanOperator({self._ds.uri})"

Expand Down
3 changes: 3 additions & 0 deletions daft/sql/sql_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ def __init__(
def schema(self) -> Schema:
return self._schema

def name(self) -> str:
return "SQLScanOperator"

def display_name(self) -> str:
return f"SQLScanOperator(sql={self.sql}, conn={self.conn})"

Expand Down
2 changes: 2 additions & 0 deletions src/common/scan-info/src/scan_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use daft_schema::schema::SchemaRef;
use crate::{PartitionField, Pushdowns, ScanTaskLikeRef};

pub trait ScanOperator: Send + Sync + Debug {
fn name(&self) -> &str;

fn schema(&self) -> SchemaRef;
fn partitioning_keys(&self) -> &[PartitionField];
fn file_path_column(&self) -> Option<&str>;
Expand Down
3 changes: 3 additions & 0 deletions src/common/scan-info/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ Pushdowns: {pushdowns}
}

impl ScanOperator for DummyScanOperator {
fn name(&self) -> &'static str {
"dummy"
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand Down
13 changes: 12 additions & 1 deletion src/daft-logical-plan/src/optimization/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use super::{
logical_plan_tracker::LogicalPlanTracker,
rules::{
DropRepartition, EliminateCrossJoin, EnrichWithStats, LiftProjectFromAgg, MaterializeScans,
OptimizerRule, PushDownFilter, PushDownLimit, PushDownProjection, SplitActorPoolProjects,
OptimizerRule, PushDownFilter, PushDownLimit, PushDownProjection, SimplifyExpressionsRule,
SplitActorPoolProjects,
},
};
use crate::LogicalPlan;
Expand Down Expand Up @@ -97,6 +98,11 @@ impl Optimizer {
],
RuleExecutionStrategy::Once,
),
// we want to simplify expressions first to make the rest of the rules easier
RuleBatch::new(
vec![Box::new(SimplifyExpressionsRule::new())],
RuleExecutionStrategy::FixedPoint(Some(3)),
),
// --- Bulk of our rules ---
RuleBatch::new(
vec![
Expand Down Expand Up @@ -129,6 +135,11 @@ impl Optimizer {
vec![Box::new(EnrichWithStats::new())],
RuleExecutionStrategy::Once,
),
// try to simplify expressions again as other rules could introduce new exprs
RuleBatch::new(
vec![Box::new(SimplifyExpressionsRule::new())],
RuleExecutionStrategy::FixedPoint(Some(3)),
),
];

Self::with_rule_batches(rule_batches, config)
Expand Down
2 changes: 2 additions & 0 deletions src/daft-logical-plan/src/optimization/rules/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod push_down_filter;
mod push_down_limit;
mod push_down_projection;
mod rule;
mod simplify_expressions;
mod split_actor_pool_projects;

pub use drop_repartition::DropRepartition;
Expand All @@ -18,4 +19,5 @@ pub use push_down_filter::PushDownFilter;
pub use push_down_limit::PushDownLimit;
pub use push_down_projection::PushDownProjection;
pub use rule::OptimizerRule;
pub use simplify_expressions::SimplifyExpressionsRule;
pub use split_actor_pool_projects::SplitActorPoolProjects;
Loading

0 comments on commit e1c3faf

Please sign in to comment.