From 20f3d65053946fb664bfdc5d17cfc84a0f2eb28b Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Fri, 5 Apr 2024 15:50:37 +0200 Subject: [PATCH 1/8] fix --- datafusion/core/src/execution/context/mod.rs | 4 +- datafusion/expr/src/logical_plan/plan.rs | 539 ++++++++++++++---- datafusion/expr/src/tree_node/expr.rs | 2 +- datafusion/expr/src/tree_node/plan.rs | 49 +- datafusion/optimizer/src/analyzer/mod.rs | 4 +- datafusion/optimizer/src/analyzer/subquery.rs | 2 +- datafusion/optimizer/src/plan_signature.rs | 4 +- 7 files changed, 446 insertions(+), 158 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 1a582be3013d..1da669d187c2 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -65,7 +65,7 @@ use datafusion_common::{ alias::AliasGenerator, config::{ConfigExtension, TableOptions}, exec_err, not_impl_err, plan_datafusion_err, plan_err, - tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}, + tree_node::{TreeNodeRecursion, TreeNodeVisitor}, SchemaReference, TableReference, }; use datafusion_execution::registry::SerializerRegistry; @@ -2313,7 +2313,7 @@ impl SQLOptions { /// Return an error if the [`LogicalPlan`] has any nodes that are /// incompatible with this [`SQLOptions`]. pub fn verify_plan(&self, plan: &LogicalPlan) -> Result<()> { - plan.visit(&mut BadPlanVisitor::new(self))?; + plan.visit_with_subqueries(&mut BadPlanVisitor::new(self))?; Ok(()) } } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 3d40dcae0e4b..abf914e4e104 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -34,8 +34,7 @@ use crate::logical_plan::extension::UserDefinedLogicalNode; use crate::logical_plan::{DmlStatement, Statement}; use crate::utils::{ enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs, - grouping_set_expr_count, grouping_set_to_exprlist, inspect_expr_pre, - split_conjunction, + grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction, }; use crate::{ build_join_schema, expr_vec_fmt, BinaryExpr, BuiltInWindowFunction, @@ -45,16 +44,19 @@ use crate::{ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor, + Transformed, TransformedResult, TreeNode, TreeNodeIterator, TreeNodeRecursion, + TreeNodeRewriter, TreeNodeVisitor, }; use datafusion_common::{ - aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints, - DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence, - FunctionalDependencies, ParamValues, Result, TableReference, UnnestOptions, + aggregate_functional_dependencies, internal_err, map_until_stop_and_collect, + plan_err, Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, Dependency, + FunctionalDependence, FunctionalDependencies, ParamValues, Result, TableReference, + UnnestOptions, }; // backwards compatibility use crate::display::PgJsonVisitor; +use crate::tree_node::expr::transform_option_vec; pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan}; pub use datafusion_common::{JoinConstraint, JoinType}; @@ -248,9 +250,9 @@ impl LogicalPlan { /// DataFusion's optimizer attempts to optimize them away. pub fn expressions(self: &LogicalPlan) -> Vec { let mut exprs = vec![]; - self.inspect_expressions(|e| { + self.apply_expressions(|e| { exprs.push(e.clone()); - Ok(()) as Result<()> + Ok(TreeNodeRecursion::Continue) }) // closure always returns OK .unwrap(); @@ -261,13 +263,13 @@ impl LogicalPlan { /// logical plan nodes and all its descendant nodes. pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec { let mut exprs = vec![]; - self.inspect_expressions(|e| { + self.apply_expressions(|e| { find_out_reference_exprs(e).into_iter().for_each(|e| { if !exprs.contains(&e) { exprs.push(e) } }); - Ok(()) as Result<(), DataFusionError> + Ok(TreeNodeRecursion::Continue) }) // closure always returns OK .unwrap(); @@ -285,57 +287,57 @@ impl LogicalPlan { /// Calls `f` on all expressions (non-recursively) in the current /// logical plan node. This does not include expressions in any /// children. - pub fn inspect_expressions(self: &LogicalPlan, mut f: F) -> Result<(), E> - where - F: FnMut(&Expr) -> Result<(), E>, - { + pub fn apply_expressions Result>( + &self, + mut f: F, + ) -> Result { match self { LogicalPlan::Projection(Projection { expr, .. }) => { - expr.iter().try_for_each(f) - } - LogicalPlan::Values(Values { values, .. }) => { - values.iter().flatten().try_for_each(f) + expr.iter().apply_until_stop(f) } + LogicalPlan::Values(Values { values, .. }) => values + .iter() + .apply_until_stop(|value| value.iter().apply_until_stop(&mut f)), LogicalPlan::Filter(Filter { predicate, .. }) => f(predicate), LogicalPlan::Repartition(Repartition { partitioning_scheme, .. }) => match partitioning_scheme { - Partitioning::Hash(expr, _) => expr.iter().try_for_each(f), - Partitioning::DistributeBy(expr) => expr.iter().try_for_each(f), - Partitioning::RoundRobinBatch(_) => Ok(()), + Partitioning::Hash(expr, _) | Partitioning::DistributeBy(expr) => { + expr.iter().apply_until_stop(f) + } + Partitioning::RoundRobinBatch(_) => Ok(TreeNodeRecursion::Continue), }, LogicalPlan::Window(Window { window_expr, .. }) => { - window_expr.iter().try_for_each(f) + window_expr.iter().apply_until_stop(f) } LogicalPlan::Aggregate(Aggregate { group_expr, aggr_expr, .. - }) => group_expr.iter().chain(aggr_expr.iter()).try_for_each(f), + }) => group_expr + .iter() + .chain(aggr_expr.iter()) + .apply_until_stop(f), // There are two part of expression for join, equijoin(on) and non-equijoin(filter). // 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`. // 2. the second part is non-equijoin(filter). LogicalPlan::Join(Join { on, filter, .. }) => { on.iter() + // TODO: why we need to create an `Expr::eq`? Cloning `Expr` is costly... // it not ideal to create an expr here to analyze them, but could cache it on the Join itself .map(|(l, r)| Expr::eq(l.clone(), r.clone())) - .try_for_each(|e| f(&e))?; - - if let Some(filter) = filter.as_ref() { - f(filter) - } else { - Ok(()) - } + .apply_until_stop(|e| f(&e))? + .visit_sibling(|| filter.iter().apply_until_stop(f)) } - LogicalPlan::Sort(Sort { expr, .. }) => expr.iter().try_for_each(f), + LogicalPlan::Sort(Sort { expr, .. }) => expr.iter().apply_until_stop(f), LogicalPlan::Extension(extension) => { // would be nice to avoid this copy -- maybe can // update extension to just observer Exprs - extension.node.expressions().iter().try_for_each(f) + extension.node.expressions().iter().apply_until_stop(f) } LogicalPlan::TableScan(TableScan { filters, .. }) => { - filters.iter().try_for_each(f) + filters.iter().apply_until_stop(f) } LogicalPlan::Unnest(Unnest { column, .. }) => { f(&Expr::Column(column.clone())) @@ -348,8 +350,8 @@ impl LogicalPlan { })) => on_expr .iter() .chain(select_expr.iter()) - .chain(sort_expr.clone().unwrap_or(vec![]).iter()) - .try_for_each(f), + .chain(sort_expr.iter().flatten()) + .apply_until_stop(f), // plans without expressions LogicalPlan::EmptyRelation(_) | LogicalPlan::RecursiveQuery(_) @@ -366,10 +368,225 @@ impl LogicalPlan { | LogicalPlan::Ddl(_) | LogicalPlan::Copy(_) | LogicalPlan::DescribeTable(_) - | LogicalPlan::Prepare(_) => Ok(()), + | LogicalPlan::Prepare(_) => Ok(TreeNodeRecursion::Continue), } } + pub fn map_expressions Result>>( + self, + mut f: F, + ) -> Result> { + Ok(match self { + LogicalPlan::Projection(Projection { + expr, + input, + schema, + }) => expr + .into_iter() + .map_until_stop_and_collect(f)? + .update_data(|expr| { + LogicalPlan::Projection(Projection { + expr, + input, + schema, + }) + }), + LogicalPlan::Values(Values { schema, values }) => values + .into_iter() + .map_until_stop_and_collect(|value| { + value.into_iter().map_until_stop_and_collect(&mut f) + })? + .update_data(|values| LogicalPlan::Values(Values { schema, values })), + LogicalPlan::Filter(Filter { predicate, input }) => f(predicate)? + .update_data(|predicate| { + LogicalPlan::Filter(Filter { predicate, input }) + }), + LogicalPlan::Repartition(Repartition { + input, + partitioning_scheme, + }) => match partitioning_scheme { + Partitioning::Hash(expr, usize) => expr + .into_iter() + .map_until_stop_and_collect(f)? + .update_data(|expr| Partitioning::Hash(expr, usize)), + Partitioning::DistributeBy(expr) => expr + .into_iter() + .map_until_stop_and_collect(f)? + .update_data(Partitioning::DistributeBy), + Partitioning::RoundRobinBatch(_) => Transformed::no(partitioning_scheme), + } + .update_data(|partitioning_scheme| { + LogicalPlan::Repartition(Repartition { + input, + partitioning_scheme, + }) + }), + LogicalPlan::Window(Window { + input, + window_expr, + schema, + }) => window_expr + .into_iter() + .map_until_stop_and_collect(f)? + .update_data(|window_expr| { + LogicalPlan::Window(Window { + input, + window_expr, + schema, + }) + }), + LogicalPlan::Aggregate(Aggregate { + input, + group_expr, + aggr_expr, + schema, + }) => map_until_stop_and_collect!( + group_expr.into_iter().map_until_stop_and_collect(&mut f), + aggr_expr, + aggr_expr.into_iter().map_until_stop_and_collect(&mut f) + )? + .update_data(|(group_expr, aggr_expr)| { + LogicalPlan::Aggregate(Aggregate { + input, + group_expr, + aggr_expr, + schema, + }) + }), + + // There are two part of expression for join, equijoin(on) and non-equijoin(filter). + // 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`. + // 2. the second part is non-equijoin(filter). + LogicalPlan::Join(Join { + left, + right, + on, + filter, + join_type, + join_constraint, + schema, + null_equals_null, + }) => map_until_stop_and_collect!( + on.into_iter().map_until_stop_and_collect( + |on| map_until_stop_and_collect!(f(on.0), on.1, f(on.1)) + ), + filter, + filter.map_or(Ok::<_, DataFusionError>(Transformed::no(None)), |e| { + Ok(f(e)?.update_data(Some)) + }) + )? + .update_data(|(on, filter)| { + LogicalPlan::Join(Join { + left, + right, + on, + filter, + join_type, + join_constraint, + schema, + null_equals_null, + }) + }), + LogicalPlan::Sort(Sort { expr, input, fetch }) => expr + .into_iter() + .map_until_stop_and_collect(f)? + .update_data(|expr| LogicalPlan::Sort(Sort { expr, input, fetch })), + LogicalPlan::Extension(Extension { node }) => { + // would be nice to avoid this copy -- maybe can + // update extension to just observer Exprs + node.expressions() + .into_iter() + .map_until_stop_and_collect(f)? + .update_data(|exprs| { + LogicalPlan::Extension(Extension { + node: UserDefinedLogicalNode::from_template( + node.as_ref(), + exprs.as_slice(), + node.inputs() + .into_iter() + .cloned() + .collect::>() + .as_slice(), + ), + }) + }) + } + LogicalPlan::TableScan(TableScan { + table_name, + source, + projection, + projected_schema, + filters, + fetch, + }) => filters + .into_iter() + .map_until_stop_and_collect(f)? + .update_data(|filters| { + LogicalPlan::TableScan(TableScan { + table_name, + source, + projection, + projected_schema, + filters, + fetch, + }) + }), + LogicalPlan::Unnest(Unnest { + input, + column, + schema, + options, + }) => f(Expr::Column(column))?.map_data(|column| match column { + Expr::Column(column) => Ok(LogicalPlan::Unnest(Unnest { + input, + column, + schema, + options, + })), + _ => internal_err!("Transformation should return Column"), + })?, + LogicalPlan::Distinct(Distinct::On(DistinctOn { + on_expr, + select_expr, + sort_expr, + input, + schema, + })) => map_until_stop_and_collect!( + on_expr.into_iter().map_until_stop_and_collect(&mut f), + select_expr, + select_expr.into_iter().map_until_stop_and_collect(&mut f), + sort_expr, + transform_option_vec(sort_expr, &mut f) + )? + .update_data(|(on_expr, select_expr, sort_expr)| { + LogicalPlan::Distinct(Distinct::On(DistinctOn { + on_expr, + select_expr, + sort_expr, + input, + schema, + })) + }), + // plans without expressions + LogicalPlan::EmptyRelation(_) + | LogicalPlan::RecursiveQuery(_) + | LogicalPlan::Subquery(_) + | LogicalPlan::SubqueryAlias(_) + | LogicalPlan::Limit(_) + | LogicalPlan::Statement(_) + | LogicalPlan::CrossJoin(_) + | LogicalPlan::Analyze(_) + | LogicalPlan::Explain(_) + | LogicalPlan::Union(_) + | LogicalPlan::Distinct(Distinct::All(_)) + | LogicalPlan::Dml(_) + | LogicalPlan::Ddl(_) + | LogicalPlan::Copy(_) + | LogicalPlan::DescribeTable(_) + | LogicalPlan::Prepare(_) => Transformed::no(self), + }) + } + /// returns all inputs of this `LogicalPlan` node. Does not /// include inputs to inputs, or subqueries. pub fn inputs(&self) -> Vec<&LogicalPlan> { @@ -417,7 +634,7 @@ impl LogicalPlan { pub fn using_columns(&self) -> Result>, DataFusionError> { let mut using_columns: Vec> = vec![]; - self.apply(&mut |plan| { + self.apply_with_subqueries(&mut |plan| { if let LogicalPlan::Join(Join { join_constraint: JoinConstraint::Using, on, @@ -1079,57 +1296,174 @@ impl LogicalPlan { } } +/// This macro is used to determine continuation during combined transforming +/// traversals. +macro_rules! handle_transform_recursion { + ($F_DOWN:expr, $F_CHILD:expr, $F_UP:expr) => {{ + $F_DOWN? + .transform_children(|n| n.map_subqueries($F_CHILD))? + .transform_sibling(|n| n.map_children($F_CHILD))? + .transform_parent(|n| $F_UP(n)) + }}; +} + +macro_rules! handle_transform_recursion_down { + ($F_DOWN:expr, $F_CHILD:expr) => {{ + $F_DOWN? + .transform_children(|n| n.map_subqueries($F_CHILD))? + .transform_sibling(|n| n.map_children($F_CHILD)) + }}; +} + +macro_rules! handle_transform_recursion_up { + ($SELF:expr, $F_CHILD:expr, $F_UP:expr) => {{ + $SELF + .map_subqueries($F_CHILD)? + .transform_sibling(|n| n.map_children($F_CHILD))? + .transform_parent(|n| $F_UP(n)) + }}; +} + impl LogicalPlan { - /// applies `op` to any subqueries in the plan - pub(crate) fn apply_subqueries(&self, op: &mut F) -> Result<()> - where - F: FnMut(&Self) -> Result, - { - self.inspect_expressions(|expr| { - // recursively look for subqueries - inspect_expr_pre(expr, |expr| { - match expr { - Expr::Exists(Exists { subquery, .. }) - | Expr::InSubquery(InSubquery { subquery, .. }) - | Expr::ScalarSubquery(subquery) => { - // use a synthetic plan so the collector sees a - // LogicalPlan::Subquery (even though it is - // actually a Subquery alias) - let synthetic_plan = LogicalPlan::Subquery(subquery.clone()); - synthetic_plan.apply(op)?; - } - _ => {} + pub fn visit_with_subqueries>( + &self, + visitor: &mut V, + ) -> Result { + visitor + .f_down(self)? + .visit_children(|| self.apply_subqueries(|c| c.visit(visitor)))? + .visit_sibling(|| self.apply_children(|c| c.visit(visitor)))? + .visit_parent(|| visitor.f_up(self)) + } + + pub fn rewrite_with_subqueries>( + self, + rewriter: &mut R, + ) -> Result> { + handle_transform_recursion!( + rewriter.f_down(self), + |c| c.rewrite_with_subqueries(rewriter), + |n| rewriter.f_up(n) + ) + } + + pub fn apply_with_subqueries Result>( + &self, + f: &mut F, + ) -> Result { + self.apply(&mut |n| { + f(n)? + .visit_children(|| self.apply_subqueries(|c| c.apply_with_subqueries(f)))? + .visit_sibling(|| self.apply_children(|c| c.apply_with_subqueries(f))) + }) + } + + pub fn transform_with_subqueries Result>>( + self, + f: &F, + ) -> Result> { + self.transform_up_with_subqueries(f) + } + + pub fn transform_down_with_subqueries Result>>( + self, + f: &F, + ) -> Result> { + handle_transform_recursion_down!(f(self), |c| c.transform_down_with_subqueries(f)) + } + + pub fn transform_down_mut_with_subqueries< + F: FnMut(Self) -> Result>, + >( + self, + f: &mut F, + ) -> Result> { + handle_transform_recursion_down!(f(self), |c| c + .transform_down_mut_with_subqueries(f)) + } + + pub fn transform_up_with_subqueries Result>>( + self, + f: &F, + ) -> Result> { + handle_transform_recursion_up!(self, |c| c.transform_up_with_subqueries(f), f) + } + + pub fn transform_up_mut_with_subqueries< + F: FnMut(Self) -> Result>, + >( + self, + f: &mut F, + ) -> Result> { + handle_transform_recursion_up!(self, |c| c.transform_up_mut_with_subqueries(f), f) + } + + pub fn transform_down_up_with_subqueries< + FD: FnMut(Self) -> Result>, + FU: FnMut(Self) -> Result>, + >( + self, + f_down: &mut FD, + f_up: &mut FU, + ) -> Result> { + handle_transform_recursion!( + f_down(self), + |c| c.transform_down_up_with_subqueries(f_down, f_up), + f_up + ) + } + + fn apply_subqueries Result>( + &self, + mut f: F, + ) -> Result { + self.apply_expressions(|expr| { + match expr { + Expr::Exists(Exists { subquery, .. }) + | Expr::InSubquery(InSubquery { subquery, .. }) + | Expr::ScalarSubquery(subquery) => { + // use a synthetic plan so the collector sees a + // LogicalPlan::Subquery (even though it is + // actually a Subquery alias) + f(&LogicalPlan::Subquery(subquery.clone())) } - Ok::<(), DataFusionError>(()) - }) - })?; - Ok(()) + _ => Ok(TreeNodeRecursion::Continue), + } + }) } - /// applies visitor to any subqueries in the plan - pub(crate) fn visit_subqueries(&self, v: &mut V) -> Result<()> - where - V: TreeNodeVisitor, - { - self.inspect_expressions(|expr| { - // recursively look for subqueries - inspect_expr_pre(expr, |expr| { - match expr { - Expr::Exists(Exists { subquery, .. }) - | Expr::InSubquery(InSubquery { subquery, .. }) - | Expr::ScalarSubquery(subquery) => { - // use a synthetic plan so the visitor sees a - // LogicalPlan::Subquery (even though it is - // actually a Subquery alias) - let synthetic_plan = LogicalPlan::Subquery(subquery.clone()); - synthetic_plan.visit(v)?; + fn map_subqueries Result>>( + self, + mut f: F, + ) -> Result> { + self.map_expressions(|expr| match expr { + Expr::Exists(Exists { subquery, negated }) => { + f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s { + LogicalPlan::Subquery(subquery) => { + Ok(Expr::Exists(Exists { subquery, negated })) } - _ => {} - } - Ok::<(), DataFusionError>(()) - }) - })?; - Ok(()) + _ => internal_err!("Transformation should return Subquery"), + }) + } + Expr::InSubquery(InSubquery { + expr, + subquery, + negated, + }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s { + LogicalPlan::Subquery(subquery) => Ok(Expr::InSubquery(InSubquery { + expr, + subquery, + negated, + })), + _ => internal_err!("Transformation should return Subquery"), + }), + Expr::ScalarSubquery(subquery) => f(LogicalPlan::Subquery(subquery))? + .map_data(|s| match s { + LogicalPlan::Subquery(subquery) => Ok(Expr::ScalarSubquery(subquery)), + _ => internal_err!("Transformation should return Subquery"), + }), + _ => Ok(Transformed::no(expr)), + }) } /// Return a `LogicalPlan` with all placeholders (e.g $1 $2, @@ -1165,8 +1499,8 @@ impl LogicalPlan { ) -> Result>, DataFusionError> { let mut param_types: HashMap> = HashMap::new(); - self.apply(&mut |plan| { - plan.inspect_expressions(|expr| { + self.apply_with_subqueries(&mut |plan| { + plan.apply_expressions(|expr| { expr.apply(&mut |expr| { if let Expr::Placeholder(Placeholder { id, data_type }) = expr { let prev = param_types.get(id); @@ -1183,13 +1517,10 @@ impl LogicalPlan { } } Ok(TreeNodeRecursion::Continue) - })?; - Ok::<(), DataFusionError>(()) - })?; - Ok(TreeNodeRecursion::Continue) - })?; - - Ok(param_types) + }) + }) + }) + .map(|_| param_types) } /// Return an Expr with all placeholders replaced with their @@ -1257,7 +1588,7 @@ impl LogicalPlan { fn fmt(&self, f: &mut Formatter) -> fmt::Result { let with_schema = false; let mut visitor = IndentVisitor::new(f, with_schema); - match self.0.visit(&mut visitor) { + match self.0.visit_with_subqueries(&mut visitor) { Ok(_) => Ok(()), Err(_) => Err(fmt::Error), } @@ -1300,7 +1631,7 @@ impl LogicalPlan { fn fmt(&self, f: &mut Formatter) -> fmt::Result { let with_schema = true; let mut visitor = IndentVisitor::new(f, with_schema); - match self.0.visit(&mut visitor) { + match self.0.visit_with_subqueries(&mut visitor) { Ok(_) => Ok(()), Err(_) => Err(fmt::Error), } @@ -1320,7 +1651,7 @@ impl LogicalPlan { fn fmt(&self, f: &mut Formatter) -> fmt::Result { let mut visitor = PgJsonVisitor::new(f); visitor.with_schema(true); - match self.0.visit(&mut visitor) { + match self.0.visit_with_subqueries(&mut visitor) { Ok(_) => Ok(()), Err(_) => Err(fmt::Error), } @@ -1369,12 +1700,16 @@ impl LogicalPlan { visitor.start_graph()?; visitor.pre_visit_plan("LogicalPlan")?; - self.0.visit(&mut visitor).map_err(|_| fmt::Error)?; + self.0 + .visit_with_subqueries(&mut visitor) + .map_err(|_| fmt::Error)?; visitor.post_visit_plan()?; visitor.set_with_schema(true); visitor.pre_visit_plan("Detailed LogicalPlan")?; - self.0.visit(&mut visitor).map_err(|_| fmt::Error)?; + self.0 + .visit_with_subqueries(&mut visitor) + .map_err(|_| fmt::Error)?; visitor.post_visit_plan()?; visitor.end_graph()?; @@ -2908,7 +3243,7 @@ digraph { fn visit_order() { let mut visitor = OkVisitor::default(); let plan = test_plan(); - let res = plan.visit(&mut visitor); + let res = plan.visit_with_subqueries(&mut visitor); assert!(res.is_ok()); assert_eq!( @@ -2984,7 +3319,7 @@ digraph { ..Default::default() }; let plan = test_plan(); - let res = plan.visit(&mut visitor); + let res = plan.visit_with_subqueries(&mut visitor); assert!(res.is_ok()); assert_eq!( @@ -3000,7 +3335,7 @@ digraph { ..Default::default() }; let plan = test_plan(); - let res = plan.visit(&mut visitor); + let res = plan.visit_with_subqueries(&mut visitor); assert!(res.is_ok()); assert_eq!( @@ -3051,7 +3386,7 @@ digraph { ..Default::default() }; let plan = test_plan(); - let res = plan.visit(&mut visitor).unwrap_err(); + let res = plan.visit_with_subqueries(&mut visitor).unwrap_err(); assert_eq!( "This feature is not implemented: Error in pre_visit", res.strip_backtrace() @@ -3069,7 +3404,7 @@ digraph { ..Default::default() }; let plan = test_plan(); - let res = plan.visit(&mut visitor).unwrap_err(); + let res = plan.visit_with_subqueries(&mut visitor).unwrap_err(); assert_eq!( "This feature is not implemented: Error in post_visit", res.strip_backtrace() diff --git a/datafusion/expr/src/tree_node/expr.rs b/datafusion/expr/src/tree_node/expr.rs index 97331720ce7d..85097f6249e1 100644 --- a/datafusion/expr/src/tree_node/expr.rs +++ b/datafusion/expr/src/tree_node/expr.rs @@ -412,7 +412,7 @@ where } /// &mut transform a Option<`Vec` of `Expr`s> -fn transform_option_vec( +pub fn transform_option_vec( ove: Option>, f: &mut F, ) -> Result>>> diff --git a/datafusion/expr/src/tree_node/plan.rs b/datafusion/expr/src/tree_node/plan.rs index 7a6b1005fede..de9533e76b17 100644 --- a/datafusion/expr/src/tree_node/plan.rs +++ b/datafusion/expr/src/tree_node/plan.rs @@ -20,58 +20,11 @@ use crate::LogicalPlan; use datafusion_common::tree_node::{ - Transformed, TreeNode, TreeNodeIterator, TreeNodeRecursion, TreeNodeVisitor, + Transformed, TreeNode, TreeNodeIterator, TreeNodeRecursion, }; use datafusion_common::Result; impl TreeNode for LogicalPlan { - fn apply Result>( - &self, - f: &mut F, - ) -> Result { - // Compared to the default implementation, we need to invoke - // [`Self::apply_subqueries`] before visiting its children - f(self)?.visit_children(|| { - self.apply_subqueries(f)?; - self.apply_children(|n| n.apply(f)) - }) - } - - /// To use, define a struct that implements the trait [`TreeNodeVisitor`] and then invoke - /// [`LogicalPlan::visit`]. - /// - /// For example, for a logical plan like: - /// - /// ```text - /// Projection: id - /// Filter: state Eq Utf8(\"CO\")\ - /// CsvScan: employee.csv projection=Some([0, 3])"; - /// ``` - /// - /// The sequence of visit operations would be: - /// ```text - /// visitor.pre_visit(Projection) - /// visitor.pre_visit(Filter) - /// visitor.pre_visit(CsvScan) - /// visitor.post_visit(CsvScan) - /// visitor.post_visit(Filter) - /// visitor.post_visit(Projection) - /// ``` - fn visit>( - &self, - visitor: &mut V, - ) -> Result { - // Compared to the default implementation, we need to invoke - // [`Self::visit_subqueries`] before visiting its children - visitor - .f_down(self)? - .visit_children(|| { - self.visit_subqueries(visitor)?; - self.apply_children(|n| n.visit(visitor)) - })? - .visit_parent(|| visitor.f_up(self)) - } - fn apply_children Result>( &self, f: F, diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index ae61aea997b7..02034ba68027 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -21,7 +21,7 @@ use log::debug; use datafusion_common::config::ConfigOptions; use datafusion_common::instant::Instant; -use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; +use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::expr::Exists; use datafusion_expr::expr::InSubquery; @@ -155,7 +155,7 @@ impl Analyzer { /// Do necessary check and fail the invalid plan fn check_plan(plan: &LogicalPlan) -> Result<()> { - plan.apply(&mut |plan: &LogicalPlan| { + plan.apply_with_subqueries(&mut |plan: &LogicalPlan| { for expr in plan.expressions().iter() { // recursively look for subqueries inspect_expr_pre(expr, |expr| match expr { diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/optimizer/src/analyzer/subquery.rs index 038361c3ee8c..79375e52da1f 100644 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ b/datafusion/optimizer/src/analyzer/subquery.rs @@ -283,7 +283,7 @@ fn strip_inner_query(inner_plan: &LogicalPlan) -> &LogicalPlan { fn get_correlated_expressions(inner_plan: &LogicalPlan) -> Result> { let mut exprs = vec![]; - inner_plan.apply(&mut |plan| { + inner_plan.apply_with_subqueries(&mut |plan| { if let LogicalPlan::Filter(Filter { predicate, .. }) = plan { let (correlated, _): (Vec<_>, Vec<_>) = split_conjunction(predicate) .into_iter() diff --git a/datafusion/optimizer/src/plan_signature.rs b/datafusion/optimizer/src/plan_signature.rs index 4143d52a053e..a8e323ff429f 100644 --- a/datafusion/optimizer/src/plan_signature.rs +++ b/datafusion/optimizer/src/plan_signature.rs @@ -21,7 +21,7 @@ use std::{ num::NonZeroUsize, }; -use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; +use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_expr::LogicalPlan; /// Non-unique identifier of a [`LogicalPlan`]. @@ -73,7 +73,7 @@ impl LogicalPlanSignature { /// Get total number of [`LogicalPlan`]s in the plan. fn get_node_number(plan: &LogicalPlan) -> NonZeroUsize { let mut node_number = 0; - plan.apply(&mut |_plan| { + plan.apply_with_subqueries(&mut |_plan| { node_number += 1; Ok(TreeNodeRecursion::Continue) }) From caa0d676a465b69524b785ca1d22067a1dcea91d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 5 Apr 2024 17:01:56 -0400 Subject: [PATCH 2/8] clippy --- datafusion/expr/src/logical_plan/plan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index abf914e4e104..23d4bbe0cbdd 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1303,7 +1303,7 @@ macro_rules! handle_transform_recursion { $F_DOWN? .transform_children(|n| n.map_subqueries($F_CHILD))? .transform_sibling(|n| n.map_children($F_CHILD))? - .transform_parent(|n| $F_UP(n)) + .transform_parent($F_UP) }}; } From c72349217e0dde13d972e71fa99ae4e5e6025cdf Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Sun, 7 Apr 2024 09:16:35 +0200 Subject: [PATCH 3/8] remove accidental extra apply --- datafusion/expr/src/logical_plan/plan.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 23d4bbe0cbdd..ee930fec5676 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1351,11 +1351,9 @@ impl LogicalPlan { &self, f: &mut F, ) -> Result { - self.apply(&mut |n| { - f(n)? - .visit_children(|| self.apply_subqueries(|c| c.apply_with_subqueries(f)))? - .visit_sibling(|| self.apply_children(|c| c.apply_with_subqueries(f))) - }) + f(self)? + .visit_children(|| self.apply_subqueries(|c| c.apply_with_subqueries(f)))? + .visit_sibling(|| self.apply_children(|c| c.apply_with_subqueries(f))) } pub fn transform_with_subqueries Result>>( From 7a2e4183b69484fa783e2b930449bf1045617b99 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Sun, 7 Apr 2024 09:16:47 +0200 Subject: [PATCH 4/8] minor fixes --- datafusion/common/src/tree_node.rs | 3 +-- datafusion/expr/src/tree_node/plan.rs | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index 8e088e7a0b56..42514537e28d 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -25,10 +25,9 @@ use crate::Result; /// These macros are used to determine continuation during transforming traversals. macro_rules! handle_transform_recursion { ($F_DOWN:expr, $F_CHILD:expr, $F_UP:expr) => {{ - #[allow(clippy::redundant_closure_call)] $F_DOWN? .transform_children(|n| n.map_children($F_CHILD))? - .transform_parent(|n| $F_UP(n)) + .transform_parent($F_UP) }}; } diff --git a/datafusion/expr/src/tree_node/plan.rs b/datafusion/expr/src/tree_node/plan.rs index de9533e76b17..482fc96b519b 100644 --- a/datafusion/expr/src/tree_node/plan.rs +++ b/datafusion/expr/src/tree_node/plan.rs @@ -38,8 +38,8 @@ impl TreeNode for LogicalPlan { ) -> Result> { let new_children = self .inputs() - .iter() - .map(|&c| c.clone()) + .into_iter() + .cloned() .map_until_stop_and_collect(f)?; // Propagate up `new_children.transformed` and `new_children.tnr` // along with the node containing transformed children. From 6d4712f373fb2909f27b986bf01c21900503eecb Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Sun, 7 Apr 2024 10:03:29 +0200 Subject: [PATCH 5/8] fix `LogicalPlan::apply_expressions()` and `LogicalPlan::map_subqueries()` --- datafusion/expr/src/logical_plan/plan.rs | 52 +++++++++++++----------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index ee930fec5676..e810f270c821 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1416,7 +1416,7 @@ impl LogicalPlan { mut f: F, ) -> Result { self.apply_expressions(|expr| { - match expr { + expr.apply(&mut |expr| match expr { Expr::Exists(Exists { subquery, .. }) | Expr::InSubquery(InSubquery { subquery, .. }) | Expr::ScalarSubquery(subquery) => { @@ -1426,7 +1426,7 @@ impl LogicalPlan { f(&LogicalPlan::Subquery(subquery.clone())) } _ => Ok(TreeNodeRecursion::Continue), - } + }) }) } @@ -1434,33 +1434,37 @@ impl LogicalPlan { self, mut f: F, ) -> Result> { - self.map_expressions(|expr| match expr { - Expr::Exists(Exists { subquery, negated }) => { - f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s { - LogicalPlan::Subquery(subquery) => { - Ok(Expr::Exists(Exists { subquery, negated })) - } - _ => internal_err!("Transformation should return Subquery"), - }) - } - Expr::InSubquery(InSubquery { - expr, - subquery, - negated, - }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s { - LogicalPlan::Subquery(subquery) => Ok(Expr::InSubquery(InSubquery { + self.map_expressions(|expr| { + expr.transform_down_mut(&mut |expr| match expr { + Expr::Exists(Exists { subquery, negated }) => { + f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s { + LogicalPlan::Subquery(subquery) => { + Ok(Expr::Exists(Exists { subquery, negated })) + } + _ => internal_err!("Transformation should return Subquery"), + }) + } + Expr::InSubquery(InSubquery { expr, subquery, negated, - })), - _ => internal_err!("Transformation should return Subquery"), - }), - Expr::ScalarSubquery(subquery) => f(LogicalPlan::Subquery(subquery))? - .map_data(|s| match s { - LogicalPlan::Subquery(subquery) => Ok(Expr::ScalarSubquery(subquery)), + }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s { + LogicalPlan::Subquery(subquery) => Ok(Expr::InSubquery(InSubquery { + expr, + subquery, + negated, + })), _ => internal_err!("Transformation should return Subquery"), }), - _ => Ok(Transformed::no(expr)), + Expr::ScalarSubquery(subquery) => f(LogicalPlan::Subquery(subquery))? + .map_data(|s| match s { + LogicalPlan::Subquery(subquery) => { + Ok(Expr::ScalarSubquery(subquery)) + } + _ => internal_err!("Transformation should return Subquery"), + }), + _ => Ok(Transformed::no(expr)), + }) }) } From 90e7941d94c1483d11a3a8f389ac6bf72297a196 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Sun, 7 Apr 2024 16:05:15 +0200 Subject: [PATCH 6/8] fix `LogicalPlan::visit_with_subqueries()` --- datafusion/expr/src/logical_plan/plan.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index e810f270c821..506c9266b3e0 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1331,8 +1331,10 @@ impl LogicalPlan { ) -> Result { visitor .f_down(self)? - .visit_children(|| self.apply_subqueries(|c| c.visit(visitor)))? - .visit_sibling(|| self.apply_children(|c| c.visit(visitor)))? + .visit_children(|| { + self.apply_subqueries(|c| c.visit_with_subqueries(visitor)) + })? + .visit_sibling(|| self.apply_children(|c| c.visit_with_subqueries(visitor)))? .visit_parent(|| visitor.f_up(self)) } From 7c90e70dc51af45e20d72a213b0d717cb6d8e75d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 7 Apr 2024 12:11:00 -0400 Subject: [PATCH 7/8] Add deprecated LogicalPlan::inspect_expressions --- datafusion/expr/src/logical_plan/plan.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 506c9266b3e0..4f55bbfe3f4d 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -284,6 +284,27 @@ impl LogicalPlan { exprs } + #[deprecated(since = "37.0.0", note = "Use `apply_expressions` instead")] + pub fn inspect_expressions(self: &LogicalPlan, mut f: F) -> Result<(), E> + where + F: FnMut(&Expr) -> Result<(), E>, + { + let mut err = Ok(()); + self.apply_expressions(|e| { + if let Err(e) = f(e) { + // save the error for later (it may not be a DataFusionError + err = Err(e); + Ok(TreeNodeRecursion::Stop) + } else { + Ok(TreeNodeRecursion::Continue) + } + }) + // The closure always returns OK, so this will always too + .expect("no way to return error during recursion"); + + err + } + /// Calls `f` on all expressions (non-recursively) in the current /// logical plan node. This does not include expressions in any /// children. From 12d4a8c99a38b9d45a95dc54bbe2f5a51960850c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 6 Apr 2024 06:36:12 -0400 Subject: [PATCH 8/8] Implement TreeNode::map_children in place --- datafusion/common/src/tree_node.rs | 9 + datafusion/expr/src/logical_plan/ddl.rs | 18 ++ datafusion/expr/src/logical_plan/mod.rs | 1 + datafusion/expr/src/logical_plan/rewrite.rs | 228 ++++++++++++++++++++ datafusion/expr/src/tree_node/plan.rs | 27 +-- 5 files changed, 265 insertions(+), 18 deletions(-) create mode 100644 datafusion/expr/src/logical_plan/rewrite.rs diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index 42514537e28d..3a49d9e378b6 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -534,6 +534,15 @@ impl Transformed { TreeNodeRecursion::Jump | TreeNodeRecursion::Stop => Ok(self), } } + + /// Discards the data of this [`Transformed`] object transforming it into Transformed<()> + pub fn discard_data(self) -> Transformed<()> { + Transformed { + data: (), + transformed: self.transformed, + tnr: self.tnr, + } + } } /// Transformation helper to process a sequence of iterable tree nodes that are siblings. diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index 8d72c9a8b036..418d318e243e 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -110,6 +110,24 @@ impl DdlStatement { } } + /// Return a mutable reference to the input `LogicalPlan`, if any + pub fn input_mut(&mut self) -> Option<&mut Arc> { + match self { + DdlStatement::CreateMemoryTable(CreateMemoryTable { input, .. }) => { + Some(input) + } + DdlStatement::CreateExternalTable(_) => None, + DdlStatement::CreateView(CreateView { input, .. }) => Some(input), + DdlStatement::CreateCatalogSchema(_) => None, + DdlStatement::CreateCatalog(_) => None, + DdlStatement::DropTable(_) => None, + DdlStatement::DropView(_) => None, + DdlStatement::DropCatalogSchema(_) => None, + DdlStatement::CreateFunction(_) => None, + DdlStatement::DropFunction(_) => None, + } + } + /// Return a `format`able structure with the a human readable /// description of this LogicalPlan node per node, not including /// children. diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 84781cb2e9ec..7b3e688e4a7b 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -21,6 +21,7 @@ pub mod display; pub mod dml; mod extension; mod plan; +mod rewrite; mod statement; pub use builder::{ diff --git a/datafusion/expr/src/logical_plan/rewrite.rs b/datafusion/expr/src/logical_plan/rewrite.rs new file mode 100644 index 000000000000..ebcc3838f44b --- /dev/null +++ b/datafusion/expr/src/logical_plan/rewrite.rs @@ -0,0 +1,228 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Methods for rewriting logical plans + +use crate::{ + Aggregate, CrossJoin, Distinct, DistinctOn, EmptyRelation, Filter, Join, Limit, + LogicalPlan, Prepare, Projection, RecursiveQuery, Repartition, Sort, Subquery, + SubqueryAlias, Union, Unnest, UserDefinedLogicalNode, Window, +}; +use datafusion_common::tree_node::{Transformed, TreeNodeIterator}; +use datafusion_common::{DFSchema, DFSchemaRef, Result}; +use std::sync::{Arc, OnceLock}; + +/// A temporary node that is left in place while rewriting the children of a +/// [`LogicalPlan`]. This is necessary to ensure that the `LogicalPlan` is +/// always in a valid state (from the Rust perspective) +static PLACEHOLDER: OnceLock> = OnceLock::new(); + +/// its inputs, so this code would not be needed. However, for now we try and +/// unwrap the `Arc` which avoids `clone`ing in most cases. +/// +/// On error, node be left with a placeholder logical plan +fn rewrite_arc( + node: &mut Arc, + mut f: F, +) -> datafusion_common::Result>> +where + F: FnMut(LogicalPlan) -> Result>, +{ + // We need to leave a valid node in the Arc, while we rewrite the existing + // one, so use a single global static placeholder node + let mut new_node = PLACEHOLDER + .get_or_init(|| { + Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: DFSchemaRef::new(DFSchema::empty()), + })) + }) + .clone(); + + // take the old value out of the Arc + std::mem::swap(node, &mut new_node); + + // try to update existing node, if it isn't shared with others + let new_node = Arc::try_unwrap(new_node) + // if None is returned, there is another reference to this + // LogicalPlan, so we must clone instead + .unwrap_or_else(|node| node.as_ref().clone()); + + // apply the actual transform + let result = f(new_node)?; + + // put the new value back into the Arc + let mut new_node = Arc::new(result.data); + std::mem::swap(node, &mut new_node); + + // return the `node` back + Ok(Transformed::new(node, result.transformed, result.tnr)) +} + +/// Rewrite the arc and discard the contents of Transformed +fn rewrite_arc_no_data( + node: &mut Arc, + f: F, +) -> datafusion_common::Result> +where + F: FnMut(LogicalPlan) -> Result>, +{ + rewrite_arc(node, f).map(|res| res.discard_data()) +} + +/// Rewrites all inputs for an Extension node "in place" +/// (it currently has to copy values because there are no APIs for in place modification) +/// +/// Should be removed when we have an API for in place modifications of the +/// extension to avoid these copies +fn rewrite_extension_inputs( + node: &mut Arc, + f: F, +) -> datafusion_common::Result> +where + F: FnMut(LogicalPlan) -> Result>, +{ + let Transformed { + data: new_inputs, + transformed, + tnr, + } = node + .inputs() + .into_iter() + .cloned() + .map_until_stop_and_collect(f)?; + + let exprs = node.expressions(); + let mut new_node = node.from_template(&exprs, &new_inputs); + std::mem::swap(node, &mut new_node); + Ok(Transformed { + data: (), + transformed, + tnr, + }) +} + +impl LogicalPlan { + /// Applies `f` to each child (input) of this plan node, rewriting them *in place.* + /// + /// Note that this function returns `Transformed<()>` because it does not + /// consume `self`, but instead modifies it in place. However, `F` transforms + /// the children by ownership + /// + /// # Notes + /// + /// Inputs include ONLY direct children, not embedded subquery + /// `LogicalPlan`s, for example such as are in [`Expr::Exists`]. + /// + /// [`Expr::Exists`]: crate::expr::Expr::Exists + pub(crate) fn rewrite_children(&mut self, mut f: F) -> Result> + where + F: FnMut(Self) -> Result>, + { + let children_result = match self { + LogicalPlan::Projection(Projection { input, .. }) => { + rewrite_arc_no_data(input, &mut f) + } + LogicalPlan::Filter(Filter { input, .. }) => { + rewrite_arc_no_data(input, &mut f) + } + LogicalPlan::Repartition(Repartition { input, .. }) => { + rewrite_arc_no_data(input, &mut f) + } + LogicalPlan::Window(Window { input, .. }) => { + rewrite_arc_no_data(input, &mut f) + } + LogicalPlan::Aggregate(Aggregate { input, .. }) => { + rewrite_arc_no_data(input, &mut f) + } + LogicalPlan::Sort(Sort { input, .. }) => rewrite_arc_no_data(input, &mut f), + LogicalPlan::Join(Join { left, right, .. }) => { + let results = [left, right] + .into_iter() + .map_until_stop_and_collect(|input| rewrite_arc(input, &mut f))?; + Ok(results.discard_data()) + } + LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => { + let results = [left, right] + .into_iter() + .map_until_stop_and_collect(|input| rewrite_arc(input, &mut f))?; + Ok(results.discard_data()) + } + LogicalPlan::Limit(Limit { input, .. }) => rewrite_arc_no_data(input, &mut f), + LogicalPlan::Subquery(Subquery { subquery, .. }) => { + rewrite_arc_no_data(subquery, &mut f) + } + LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => { + rewrite_arc_no_data(input, &mut f) + } + LogicalPlan::Extension(extension) => { + rewrite_extension_inputs(&mut extension.node, &mut f) + } + LogicalPlan::Union(Union { inputs, .. }) => { + let results = inputs + .iter_mut() + .map_until_stop_and_collect(|input| rewrite_arc(input, &mut f))?; + Ok(results.discard_data()) + } + LogicalPlan::Distinct( + Distinct::All(input) | Distinct::On(DistinctOn { input, .. }), + ) => rewrite_arc_no_data(input, &mut f), + LogicalPlan::Explain(explain) => { + rewrite_arc_no_data(&mut explain.plan, &mut f) + } + LogicalPlan::Analyze(analyze) => { + rewrite_arc_no_data(&mut analyze.input, &mut f) + } + LogicalPlan::Dml(write) => rewrite_arc_no_data(&mut write.input, &mut f), + LogicalPlan::Copy(copy) => rewrite_arc_no_data(&mut copy.input, &mut f), + LogicalPlan::Ddl(ddl) => { + if let Some(input) = ddl.input_mut() { + rewrite_arc_no_data(input, &mut f) + } else { + Ok(Transformed::no(())) + } + } + LogicalPlan::Unnest(Unnest { input, .. }) => { + rewrite_arc_no_data(input, &mut f) + } + LogicalPlan::Prepare(Prepare { input, .. }) => { + rewrite_arc_no_data(input, &mut f) + } + LogicalPlan::RecursiveQuery(RecursiveQuery { + static_term, + recursive_term, + .. + }) => { + let results = [static_term, recursive_term] + .into_iter() + .map_until_stop_and_collect(|input| rewrite_arc(input, &mut f))?; + Ok(results.discard_data()) + } + // plans without inputs + LogicalPlan::TableScan { .. } + | LogicalPlan::Statement { .. } + | LogicalPlan::EmptyRelation { .. } + | LogicalPlan::Values { .. } + | LogicalPlan::DescribeTable(_) => Ok(Transformed::no(())), + }?; + + // after visiting the actual children we we need to visit any subqueries + // that are inside the expressions + // TODO use pattern introduced in https://github.com/apache/arrow-datafusion/pull/9913 + Ok(children_result) + } +} diff --git a/datafusion/expr/src/tree_node/plan.rs b/datafusion/expr/src/tree_node/plan.rs index 482fc96b519b..f8fe36e529dd 100644 --- a/datafusion/expr/src/tree_node/plan.rs +++ b/datafusion/expr/src/tree_node/plan.rs @@ -32,23 +32,14 @@ impl TreeNode for LogicalPlan { self.inputs().into_iter().apply_until_stop(f) } - fn map_children Result>>( - self, - f: F, - ) -> Result> { - let new_children = self - .inputs() - .into_iter() - .cloned() - .map_until_stop_and_collect(f)?; - // Propagate up `new_children.transformed` and `new_children.tnr` - // along with the node containing transformed children. - if new_children.transformed { - new_children.map_data(|new_children| { - self.with_new_exprs(self.expressions(), new_children) - }) - } else { - Ok(new_children.update_data(|_| self)) - } + fn map_children(mut self, f: F) -> Result> + where + F: FnMut(Self) -> Result>, + { + // Apply the rewrite *in place* for each child to avoid cloning + let result = self.rewrite_children(f)?; + + // return ourself + Ok(result.update_data(|_| self)) } }