From 6d93a853e0aa3ab152991d6e13a0dc276fd5dfdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Berkay=20=C5=9Eahin?= <124376117+berkaysynnada@users.noreply.github.com> Date: Sun, 3 Dec 2023 01:44:18 +0300 Subject: [PATCH] Refactors on TreeNode Implementations (#8395) * minor changes * PipelineStatePropagator tree refactor * Remove duplications by children_unbounded() * Remove on-the-fly tree construction * Minor changes --------- Co-authored-by: Mustafa Akur --- .../src/physical_optimizer/join_selection.rs | 21 +++++-- .../physical_optimizer/pipeline_checker.rs | 40 ++++++------- datafusion/physical-expr/src/equivalence.rs | 2 +- .../physical-expr/src/sort_properties.rs | 58 +++++++------------ datafusion/physical-expr/src/utils.rs | 15 ++--- 5 files changed, 65 insertions(+), 71 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index a7ecd1ca655c..0c3ac2d24529 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -434,7 +434,7 @@ fn hash_join_convert_symmetric_subrule( config_options: &ConfigOptions, ) -> Option> { if let Some(hash_join) = input.plan.as_any().downcast_ref::() { - let ub_flags = &input.children_unbounded; + let ub_flags = input.children_unbounded(); let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]); input.unbounded = left_unbounded || right_unbounded; let result = if left_unbounded && right_unbounded { @@ -511,7 +511,7 @@ fn hash_join_swap_subrule( _config_options: &ConfigOptions, ) -> Option> { if let Some(hash_join) = input.plan.as_any().downcast_ref::() { - let ub_flags = &input.children_unbounded; + let ub_flags = input.children_unbounded(); let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]); input.unbounded = left_unbounded || right_unbounded; let result = if left_unbounded @@ -577,7 +577,7 @@ fn apply_subrules( } let is_unbounded = input .plan - .unbounded_output(&input.children_unbounded) + .unbounded_output(&input.children_unbounded()) // Treat the case where an operator can not run on unbounded data as // if it can and it outputs unbounded data. Do not raise an error yet. // Such operators may be fixed, adjusted or replaced later on during @@ -1253,6 +1253,7 @@ mod hash_join_tests { use arrow::record_batch::RecordBatch; use datafusion_common::utils::DataPtr; use datafusion_common::JoinType; + use datafusion_physical_plan::empty::EmptyExec; use std::sync::Arc; struct TestCase { @@ -1620,10 +1621,22 @@ mod hash_join_tests { false, )?; + let children = vec![ + PipelineStatePropagator { + plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))), + unbounded: left_unbounded, + children: vec![], + }, + PipelineStatePropagator { + plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))), + unbounded: right_unbounded, + children: vec![], + }, + ]; let initial_hash_join_state = PipelineStatePropagator { plan: Arc::new(join), unbounded: false, - children_unbounded: vec![left_unbounded, right_unbounded], + children, }; let optimized_hash_join = diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs index 43ae7dbfe7b6..d59248aadf05 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs @@ -70,19 +70,27 @@ impl PhysicalOptimizerRule for PipelineChecker { pub struct PipelineStatePropagator { pub(crate) plan: Arc, pub(crate) unbounded: bool, - pub(crate) children_unbounded: Vec, + pub(crate) children: Vec, } impl PipelineStatePropagator { /// Constructs a new, default pipelining state. pub fn new(plan: Arc) -> Self { - let length = plan.children().len(); + let children = plan.children(); PipelineStatePropagator { plan, unbounded: false, - children_unbounded: vec![false; length], + children: children.into_iter().map(Self::new).collect(), } } + + /// Returns the children unboundedness information. + pub fn children_unbounded(&self) -> Vec { + self.children + .iter() + .map(|c| c.unbounded) + .collect::>() + } } impl TreeNode for PipelineStatePropagator { @@ -90,9 +98,8 @@ impl TreeNode for PipelineStatePropagator { where F: FnMut(&Self) -> Result, { - let children = self.plan.children(); - for child in children { - match op(&PipelineStatePropagator::new(child))? { + for child in &self.children { + match op(child)? { VisitRecursion::Continue => {} VisitRecursion::Skip => return Ok(VisitRecursion::Continue), VisitRecursion::Stop => return Ok(VisitRecursion::Stop), @@ -106,25 +113,18 @@ impl TreeNode for PipelineStatePropagator { where F: FnMut(Self) -> Result, { - let children = self.plan.children(); - if !children.is_empty() { - let new_children = children + if !self.children.is_empty() { + let new_children = self + .children .into_iter() - .map(PipelineStatePropagator::new) .map(transform) .collect::>>()?; - let children_unbounded = new_children - .iter() - .map(|c| c.unbounded) - .collect::>(); - let children_plans = new_children - .into_iter() - .map(|child| child.plan) - .collect::>(); + let children_plans = new_children.iter().map(|c| c.plan.clone()).collect(); + Ok(PipelineStatePropagator { plan: with_new_children_if_necessary(self.plan, children_plans)?.into(), unbounded: self.unbounded, - children_unbounded, + children: new_children, }) } else { Ok(self) @@ -149,7 +149,7 @@ pub fn check_finiteness_requirements( } input .plan - .unbounded_output(&input.children_unbounded) + .unbounded_output(&input.children_unbounded()) .map(|value| { input.unbounded = value; Transformed::Yes(input) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index f9f03300f5e9..4a562f4ef101 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -1520,7 +1520,7 @@ fn update_ordering( node.state = SortProperties::Ordered(options); } else if !node.expr.children().is_empty() { // We have an intermediate (non-leaf) node, account for its children: - node.state = node.expr.get_ordering(&node.children_states); + node.state = node.expr.get_ordering(&node.children_state()); } else if node.expr.as_any().is::() { // We have a Literal, which is the other possible leaf node type: node.state = node.expr.get_ordering(&[]); diff --git a/datafusion/physical-expr/src/sort_properties.rs b/datafusion/physical-expr/src/sort_properties.rs index f8648abdf7a7..f51374461776 100644 --- a/datafusion/physical-expr/src/sort_properties.rs +++ b/datafusion/physical-expr/src/sort_properties.rs @@ -17,13 +17,12 @@ use std::{ops::Neg, sync::Arc}; -use crate::PhysicalExpr; use arrow_schema::SortOptions; + +use crate::PhysicalExpr; use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_common::Result; -use itertools::Itertools; - /// To propagate [`SortOptions`] across the [`PhysicalExpr`], it is insufficient /// to simply use `Option`: There must be a differentiation between /// unordered columns and literal values, since literals may not break the ordering @@ -35,11 +34,12 @@ use itertools::Itertools; /// sorted data; however the ((a_ordered + 999) + c_ordered) expression can. Therefore, /// we need two different variants for literals and unordered columns as literals are /// often more ordering-friendly under most mathematical operations. -#[derive(PartialEq, Debug, Clone, Copy)] +#[derive(PartialEq, Debug, Clone, Copy, Default)] pub enum SortProperties { /// Use the ordinary [`SortOptions`] struct to represent ordered data: Ordered(SortOptions), // This alternative represents unordered data: + #[default] Unordered, // Singleton is used for single-valued literal numbers: Singleton, @@ -151,34 +151,24 @@ impl Neg for SortProperties { pub struct ExprOrdering { pub expr: Arc, pub state: SortProperties, - pub children_states: Vec, + pub children: Vec, } impl ExprOrdering { /// Creates a new [`ExprOrdering`] with [`SortProperties::Unordered`] states /// for `expr` and its children. pub fn new(expr: Arc) -> Self { - let size = expr.children().len(); + let children = expr.children(); Self { expr, - state: SortProperties::Unordered, - children_states: vec![SortProperties::Unordered; size], + state: Default::default(), + children: children.into_iter().map(Self::new).collect(), } } - /// Updates this [`ExprOrdering`]'s children states with the given states. - pub fn with_new_children(mut self, children_states: Vec) -> Self { - self.children_states = children_states; - self - } - - /// Creates new [`ExprOrdering`] objects for each child of the expression. - pub fn children_expr_orderings(&self) -> Vec { - self.expr - .children() - .into_iter() - .map(ExprOrdering::new) - .collect() + /// Get a reference to each child state. + pub fn children_state(&self) -> Vec { + self.children.iter().map(|c| c.state).collect() } } @@ -187,8 +177,8 @@ impl TreeNode for ExprOrdering { where F: FnMut(&Self) -> Result, { - for child in self.children_expr_orderings() { - match op(&child)? { + for child in &self.children { + match op(child)? { VisitRecursion::Continue => {} VisitRecursion::Skip => return Ok(VisitRecursion::Continue), VisitRecursion::Stop => return Ok(VisitRecursion::Stop), @@ -197,25 +187,19 @@ impl TreeNode for ExprOrdering { Ok(VisitRecursion::Continue) } - fn map_children(self, transform: F) -> Result + fn map_children(mut self, transform: F) -> Result where F: FnMut(Self) -> Result, { - if self.children_states.is_empty() { + if self.children.is_empty() { Ok(self) } else { - let child_expr_orderings = self.children_expr_orderings(); - // After mapping over the children, the function `F` applies to the - // current object and updates its state. - Ok(self.with_new_children( - child_expr_orderings - .into_iter() - // Update children states after this transformation: - .map(transform) - // Extract the state (i.e. sort properties) information: - .map_ok(|c| c.state) - .collect::>>()?, - )) + self.children = self + .children + .into_iter() + .map(transform) + .collect::>>()?; + Ok(self) } } } diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index ed62956de8e0..71a7ff5fb778 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -129,10 +129,11 @@ pub struct ExprTreeNode { impl ExprTreeNode { pub fn new(expr: Arc) -> Self { + let children = expr.children(); ExprTreeNode { expr, data: None, - child_nodes: vec![], + child_nodes: children.into_iter().map(Self::new).collect_vec(), } } @@ -140,12 +141,8 @@ impl ExprTreeNode { &self.expr } - pub fn children(&self) -> Vec> { - self.expr - .children() - .into_iter() - .map(ExprTreeNode::new) - .collect() + pub fn children(&self) -> &[ExprTreeNode] { + &self.child_nodes } } @@ -155,7 +152,7 @@ impl TreeNode for ExprTreeNode { F: FnMut(&Self) -> Result, { for child in self.children() { - match op(&child)? { + match op(child)? { VisitRecursion::Continue => {} VisitRecursion::Skip => return Ok(VisitRecursion::Continue), VisitRecursion::Stop => return Ok(VisitRecursion::Stop), @@ -170,7 +167,7 @@ impl TreeNode for ExprTreeNode { F: FnMut(Self) -> Result, { self.child_nodes = self - .children() + .child_nodes .into_iter() .map(transform) .collect::>>()?;