From 0d4dc3601b390c50640e841e70c522cd733e02f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Berkay=20=C5=9Eahin?= <124376117+berkaysynnada@users.noreply.github.com> Date: Tue, 31 Oct 2023 00:01:09 +0300 Subject: [PATCH] Refactor of Ordering and Prunability Traversals and States (#7985) * simplify ExprOrdering * Comment improvements * Move map/transform comment up --------- Co-authored-by: Mehmet Ozan Kabak --- .../physical-expr/src/sort_properties.rs | 59 ++++++------ datafusion/physical-expr/src/utils.rs | 90 ++++++++++++++++++- 2 files changed, 120 insertions(+), 29 deletions(-) diff --git a/datafusion/physical-expr/src/sort_properties.rs b/datafusion/physical-expr/src/sort_properties.rs index 097f491cb979..8ae3379218fb 100644 --- a/datafusion/physical-expr/src/sort_properties.rs +++ b/datafusion/physical-expr/src/sort_properties.rs @@ -155,37 +155,36 @@ impl Neg for SortProperties { #[derive(Debug)] pub struct ExprOrdering { pub expr: Arc, - pub state: Option, - pub children_states: Option>, + pub state: SortProperties, + pub children_states: Vec, } impl ExprOrdering { + /// Creates a new [`ExprOrdering`] with [`SortProperties::Unordered`] states + /// for `expr` and its children. pub fn new(expr: Arc) -> Self { + let size = expr.children().len(); Self { expr, - state: None, - children_states: None, + state: SortProperties::Unordered, + children_states: vec![SortProperties::Unordered; size], } } - pub fn children(&self) -> Vec { + /// Updates this [`ExprOrdering`]'s children states with the given states. + pub fn with_new_children(mut self, children_states: Vec) -> Self { + self.children_states = children_states; + self + } + + /// Creates new [`ExprOrdering`] objects for each child of the expression. + pub fn children_expr_orderings(&self) -> Vec { self.expr .children() .into_iter() .map(ExprOrdering::new) .collect() } - - pub fn new_with_children( - children_states: Vec, - parent_expr: Arc, - ) -> Self { - Self { - expr: parent_expr, - state: None, - children_states: Some(children_states), - } - } } impl TreeNode for ExprOrdering { @@ -193,7 +192,7 @@ impl TreeNode for ExprOrdering { where F: FnMut(&Self) -> Result, { - for child in self.children() { + for child in self.children_expr_orderings() { match op(&child)? { VisitRecursion::Continue => {} VisitRecursion::Skip => return Ok(VisitRecursion::Continue), @@ -207,17 +206,20 @@ impl TreeNode for ExprOrdering { where F: FnMut(Self) -> Result, { - let children = self.children(); - if children.is_empty() { + if self.children_states.is_empty() { Ok(self) } else { - Ok(ExprOrdering::new_with_children( - children + let child_expr_orderings = self.children_expr_orderings(); + // After mapping over the children, the function `F` applies to the + // current object and updates its state. + Ok(self.with_new_children( + child_expr_orderings .into_iter() + // Update children states after this transformation: .map(transform) - .map_ok(|c| c.state.unwrap_or(SortProperties::Unordered)) + // Extract the state (i.e. sort properties) information: + .map_ok(|c| c.state) .collect::>>()?, - self.expr, )) } } @@ -248,13 +250,13 @@ pub fn update_ordering( // a BinaryExpr like a + b), and there is an ordering equivalence of // it (let's say like c + d), we actually can find it at this step. if sort_expr.expr.eq(&node.expr) { - node.state = Some(SortProperties::Ordered(sort_expr.options)); + node.state = SortProperties::Ordered(sort_expr.options); return Ok(Transformed::Yes(node)); } - if let Some(children_sort_options) = &node.children_states { + if !node.expr.children().is_empty() { // We have an intermediate (non-leaf) node, account for its children: - node.state = Some(node.expr.get_ordering(children_sort_options)); + node.state = node.expr.get_ordering(&node.children_states); } else if let Some(column) = node.expr.as_any().downcast_ref::() { // We have a Column, which is one of the two possible leaf node types: node.state = get_indices_of_matching_sort_exprs_with_order_eq( @@ -268,10 +270,11 @@ pub fn update_ordering( descending: sort_options[0].descending, nulls_first: sort_options[0].nulls_first, }) - }); + }) + .unwrap_or(SortProperties::Unordered); } else { // We have a Literal, which is the other possible leaf node type: - node.state = Some(node.expr.get_ordering(&[])); + node.state = node.expr.get_ordering(&[]); } Ok(Transformed::Yes(node)) } diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index b2a6bb5ca6d2..b38117d206cc 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -773,7 +773,7 @@ pub fn find_orderings_of_exprs( &input_ordering_equal_properties, ) })?; - if let Some(SortProperties::Ordered(sort_options)) = transformed.state { + if let SortProperties::Ordered(sort_options) = transformed.state { orderings.push(Some(PhysicalSortExpr { expr: Arc::new(Column::new(name, index)), options: sort_options, @@ -1836,4 +1836,92 @@ mod tests { Ok(()) } + + #[test] + fn test_find_orderings_of_exprs() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + Field::new("d", DataType::Int32, true), + ]); + + let mut eq = EquivalenceProperties::new(Arc::new(schema.clone())); + let col_a = &col("a", &schema)?; + let col_b = &col("b", &schema)?; + let col_c = &col("c", &schema)?; + let col_d = &col("d", &schema)?; + let option_asc = SortOptions { + descending: false, + nulls_first: false, + }; + // b=a (e.g they are aliases) + eq.add_equal_conditions((&Column::new("b", 1), &Column::new("a", 0))); + let mut oeq = OrderingEquivalenceProperties::new(Arc::new(schema.clone())); + // [b ASC], [d ASC] + oeq.add_equal_conditions(( + &vec![PhysicalSortExpr { + expr: col_b.clone(), + options: option_asc, + }], + &vec![PhysicalSortExpr { + expr: col_d.clone(), + options: option_asc, + }], + )); + + let orderings = find_orderings_of_exprs( + &[ + // d + b + ( + Arc::new(BinaryExpr::new( + col_d.clone(), + Operator::Plus, + col_b.clone(), + )), + "d+b".to_string(), + ), + // b as b_new + (col_b.clone(), "b_new".to_string()), + // a as a_new + (col_a.clone(), "a_new".to_string()), + // a + c + ( + Arc::new(BinaryExpr::new( + col_a.clone(), + Operator::Plus, + col_c.clone(), + )), + "a+c".to_string(), + ), + ], + Some(&[PhysicalSortExpr { + expr: col_b.clone(), + options: option_asc, + }]), + eq, + oeq, + )?; + + assert_eq!( + vec![ + Some(PhysicalSortExpr { + expr: Arc::new(Column::new("d+b", 0)), + options: option_asc, + }), + Some(PhysicalSortExpr { + expr: Arc::new(Column::new("b_new", 1)), + options: option_asc, + }), + Some(PhysicalSortExpr { + expr: Arc::new(Column::new("a_new", 2)), + options: option_asc, + }), + None, + ], + orderings + ); + + Ok(()) + } }