Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Avoid copying LogicalPlans / Exprs during OptimizerPasses #9708

Closed
wants to merge 10 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix a bit
alamb committed Mar 21, 2024
commit 11ebead4933bdb1ffd3b3322a1e0ced167f0dbf9
43 changes: 26 additions & 17 deletions datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
Original file line number Diff line number Diff line change
@@ -19,8 +19,8 @@

use std::sync::Arc;

use datafusion_common::tree_node::Transformed;
use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, Result};
use datafusion_common::tree_node::{Transformed, TransformedResult};
use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, Result, not_impl_err};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::logical_plan::LogicalPlan;
use datafusion_expr::simplify::SimplifyContext;
@@ -56,9 +56,7 @@ impl OptimizerRule for SimplifyExpressions {
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let mut execution_props = ExecutionProps::new();
execution_props.query_execution_start_time = config.query_execution_start_time();
Ok(Some(Self::optimize_internal(plan, &execution_props)?))
return not_impl_err!("Should use optimized owned")
}

fn supports_owned(&self) -> bool {
@@ -68,21 +66,23 @@ impl OptimizerRule for SimplifyExpressions {
/// if supports_owned returns true, calls try_optimize_owned
fn try_optimize_owned(
&self,
_plan: LogicalPlan,
_config: &dyn OptimizerConfig,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
todo!();
let mut execution_props = ExecutionProps::new();
execution_props.query_execution_start_time = config.query_execution_start_time();
Self::optimize_internal(plan, &execution_props)
}
}

impl SimplifyExpressions {
fn optimize_internal(
plan: &LogicalPlan,
plan: LogicalPlan,
execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
) -> Result<Transformed<LogicalPlan>> {
let schema = if !plan.inputs().is_empty() {
DFSchemaRef::new(merge_schema(plan.inputs()))
} else if let LogicalPlan::TableScan(scan) = plan {
} else if let LogicalPlan::TableScan(scan) = &plan {
// When predicates are pushed into a table scan, there is no input
// schema to resolve predicates against, so it must be handled specially
//
@@ -102,11 +102,15 @@ impl SimplifyExpressions {
};
let info = SimplifyContext::new(execution_props).with_schema(schema);

let new_inputs = plan
.inputs()
.iter()
.map(|input| Self::optimize_internal(input, execution_props))
.collect::<Result<Vec<_>>>()?;
// rewrite all inputs
let mut transformed = false;
let plan = plan.rewrite_inputs(&mut |plan| {
let t = Self::optimize_internal(plan, execution_props)?;
if t.transformed {
transformed = true;
}
Ok(t.data)
})?;

let simplifier = ExprSimplifier::new(info);

@@ -134,8 +138,13 @@ impl SimplifyExpressions {
})
.collect::<Result<Vec<_>>>()?;

plan.with_new_exprs(exprs, new_inputs)
Ok(if transformed {
Transformed::yes(plan)
} else {
Transformed::no(plan)
})
}

}

impl SimplifyExpressions {
15 changes: 14 additions & 1 deletion datafusion/sqllogictest/test_files/aal.slt
Original file line number Diff line number Diff line change
@@ -2,5 +2,18 @@
statement ok
create table t as values (1), (2);

query
query I
select column1 + column1 from t;
----
2
4

query TT
explain select column1 + column1 from t;
----
logical_plan
Projection: t.column1 + t.column1
--TableScan: t projection=[column1]
physical_plan
ProjectionExec: expr=[column1@0 + column1@0 as t.column1 + t.column1]
--MemoryExec: partitions=1, partition_sizes=[1]