Skip to content

Commit

Permalink
Refactors on TreeNode Implementations (apache#8395)
Browse files Browse the repository at this point in the history
* minor changes

* PipelineStatePropagator tree refactor

* Remove duplications by children_unbounded()

* Remove on-the-fly tree construction

* Minor changes

---------

Co-authored-by: Mustafa Akur <[email protected]>
  • Loading branch information
2 people authored and appletreeisyellow committed Dec 14, 2023
1 parent d214ebe commit 6d93a85
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 71 deletions.
21 changes: 17 additions & 4 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ fn hash_join_convert_symmetric_subrule(
config_options: &ConfigOptions,
) -> Option<Result<PipelineStatePropagator>> {
if let Some(hash_join) = input.plan.as_any().downcast_ref::<HashJoinExec>() {
let ub_flags = &input.children_unbounded;
let ub_flags = input.children_unbounded();
let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
input.unbounded = left_unbounded || right_unbounded;
let result = if left_unbounded && right_unbounded {
Expand Down Expand Up @@ -511,7 +511,7 @@ fn hash_join_swap_subrule(
_config_options: &ConfigOptions,
) -> Option<Result<PipelineStatePropagator>> {
if let Some(hash_join) = input.plan.as_any().downcast_ref::<HashJoinExec>() {
let ub_flags = &input.children_unbounded;
let ub_flags = input.children_unbounded();
let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
input.unbounded = left_unbounded || right_unbounded;
let result = if left_unbounded
Expand Down Expand Up @@ -577,7 +577,7 @@ fn apply_subrules(
}
let is_unbounded = input
.plan
.unbounded_output(&input.children_unbounded)
.unbounded_output(&input.children_unbounded())
// Treat the case where an operator can not run on unbounded data as
// if it can and it outputs unbounded data. Do not raise an error yet.
// Such operators may be fixed, adjusted or replaced later on during
Expand Down Expand Up @@ -1253,6 +1253,7 @@ mod hash_join_tests {
use arrow::record_batch::RecordBatch;
use datafusion_common::utils::DataPtr;
use datafusion_common::JoinType;
use datafusion_physical_plan::empty::EmptyExec;
use std::sync::Arc;

struct TestCase {
Expand Down Expand Up @@ -1620,10 +1621,22 @@ mod hash_join_tests {
false,
)?;

let children = vec![
PipelineStatePropagator {
plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
unbounded: left_unbounded,
children: vec![],
},
PipelineStatePropagator {
plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
unbounded: right_unbounded,
children: vec![],
},
];
let initial_hash_join_state = PipelineStatePropagator {
plan: Arc::new(join),
unbounded: false,
children_unbounded: vec![left_unbounded, right_unbounded],
children,
};

let optimized_hash_join =
Expand Down
40 changes: 20 additions & 20 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,29 +70,36 @@ impl PhysicalOptimizerRule for PipelineChecker {
pub struct PipelineStatePropagator {
pub(crate) plan: Arc<dyn ExecutionPlan>,
pub(crate) unbounded: bool,
pub(crate) children_unbounded: Vec<bool>,
pub(crate) children: Vec<PipelineStatePropagator>,
}

impl PipelineStatePropagator {
/// Constructs a new, default pipelining state.
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
let length = plan.children().len();
let children = plan.children();
PipelineStatePropagator {
plan,
unbounded: false,
children_unbounded: vec![false; length],
children: children.into_iter().map(Self::new).collect(),
}
}

/// Returns the children unboundedness information.
pub fn children_unbounded(&self) -> Vec<bool> {
self.children
.iter()
.map(|c| c.unbounded)
.collect::<Vec<_>>()
}
}

impl TreeNode for PipelineStatePropagator {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
let children = self.plan.children();
for child in children {
match op(&PipelineStatePropagator::new(child))? {
for child in &self.children {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
Expand All @@ -106,25 +113,18 @@ impl TreeNode for PipelineStatePropagator {
where
F: FnMut(Self) -> Result<Self>,
{
let children = self.plan.children();
if !children.is_empty() {
let new_children = children
if !self.children.is_empty() {
let new_children = self
.children
.into_iter()
.map(PipelineStatePropagator::new)
.map(transform)
.collect::<Result<Vec<_>>>()?;
let children_unbounded = new_children
.iter()
.map(|c| c.unbounded)
.collect::<Vec<bool>>();
let children_plans = new_children
.into_iter()
.map(|child| child.plan)
.collect::<Vec<_>>();
let children_plans = new_children.iter().map(|c| c.plan.clone()).collect();

Ok(PipelineStatePropagator {
plan: with_new_children_if_necessary(self.plan, children_plans)?.into(),
unbounded: self.unbounded,
children_unbounded,
children: new_children,
})
} else {
Ok(self)
Expand All @@ -149,7 +149,7 @@ pub fn check_finiteness_requirements(
}
input
.plan
.unbounded_output(&input.children_unbounded)
.unbounded_output(&input.children_unbounded())
.map(|value| {
input.unbounded = value;
Transformed::Yes(input)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/equivalence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1520,7 +1520,7 @@ fn update_ordering(
node.state = SortProperties::Ordered(options);
} else if !node.expr.children().is_empty() {
// We have an intermediate (non-leaf) node, account for its children:
node.state = node.expr.get_ordering(&node.children_states);
node.state = node.expr.get_ordering(&node.children_state());
} else if node.expr.as_any().is::<Literal>() {
// We have a Literal, which is the other possible leaf node type:
node.state = node.expr.get_ordering(&[]);
Expand Down
58 changes: 21 additions & 37 deletions datafusion/physical-expr/src/sort_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

use std::{ops::Neg, sync::Arc};

use crate::PhysicalExpr;
use arrow_schema::SortOptions;

use crate::PhysicalExpr;
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::Result;

use itertools::Itertools;

/// To propagate [`SortOptions`] across the [`PhysicalExpr`], it is insufficient
/// to simply use `Option<SortOptions>`: There must be a differentiation between
/// unordered columns and literal values, since literals may not break the ordering
Expand All @@ -35,11 +34,12 @@ use itertools::Itertools;
/// sorted data; however the ((a_ordered + 999) + c_ordered) expression can. Therefore,
/// we need two different variants for literals and unordered columns as literals are
/// often more ordering-friendly under most mathematical operations.
#[derive(PartialEq, Debug, Clone, Copy)]
#[derive(PartialEq, Debug, Clone, Copy, Default)]
pub enum SortProperties {
/// Use the ordinary [`SortOptions`] struct to represent ordered data:
Ordered(SortOptions),
// This alternative represents unordered data:
#[default]
Unordered,
// Singleton is used for single-valued literal numbers:
Singleton,
Expand Down Expand Up @@ -151,34 +151,24 @@ impl Neg for SortProperties {
pub struct ExprOrdering {
pub expr: Arc<dyn PhysicalExpr>,
pub state: SortProperties,
pub children_states: Vec<SortProperties>,
pub children: Vec<ExprOrdering>,
}

impl ExprOrdering {
/// Creates a new [`ExprOrdering`] with [`SortProperties::Unordered`] states
/// for `expr` and its children.
pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
let size = expr.children().len();
let children = expr.children();
Self {
expr,
state: SortProperties::Unordered,
children_states: vec![SortProperties::Unordered; size],
state: Default::default(),
children: children.into_iter().map(Self::new).collect(),
}
}

/// Updates this [`ExprOrdering`]'s children states with the given states.
pub fn with_new_children(mut self, children_states: Vec<SortProperties>) -> Self {
self.children_states = children_states;
self
}

/// Creates new [`ExprOrdering`] objects for each child of the expression.
pub fn children_expr_orderings(&self) -> Vec<ExprOrdering> {
self.expr
.children()
.into_iter()
.map(ExprOrdering::new)
.collect()
/// Get a reference to each child state.
pub fn children_state(&self) -> Vec<SortProperties> {
self.children.iter().map(|c| c.state).collect()
}
}

Expand All @@ -187,8 +177,8 @@ impl TreeNode for ExprOrdering {
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children_expr_orderings() {
match op(&child)? {
for child in &self.children {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
Expand All @@ -197,25 +187,19 @@ impl TreeNode for ExprOrdering {
Ok(VisitRecursion::Continue)
}

fn map_children<F>(self, transform: F) -> Result<Self>
fn map_children<F>(mut self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
{
if self.children_states.is_empty() {
if self.children.is_empty() {
Ok(self)
} else {
let child_expr_orderings = self.children_expr_orderings();
// After mapping over the children, the function `F` applies to the
// current object and updates its state.
Ok(self.with_new_children(
child_expr_orderings
.into_iter()
// Update children states after this transformation:
.map(transform)
// Extract the state (i.e. sort properties) information:
.map_ok(|c| c.state)
.collect::<Result<Vec<_>>>()?,
))
self.children = self
.children
.into_iter()
.map(transform)
.collect::<Result<Vec<_>>>()?;
Ok(self)
}
}
}
15 changes: 6 additions & 9 deletions datafusion/physical-expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,23 +129,20 @@ pub struct ExprTreeNode<T> {

impl<T> ExprTreeNode<T> {
pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
let children = expr.children();
ExprTreeNode {
expr,
data: None,
child_nodes: vec![],
child_nodes: children.into_iter().map(Self::new).collect_vec(),
}
}

pub fn expression(&self) -> &Arc<dyn PhysicalExpr> {
&self.expr
}

pub fn children(&self) -> Vec<ExprTreeNode<T>> {
self.expr
.children()
.into_iter()
.map(ExprTreeNode::new)
.collect()
pub fn children(&self) -> &[ExprTreeNode<T>] {
&self.child_nodes
}
}

Expand All @@ -155,7 +152,7 @@ impl<T: Clone> TreeNode for ExprTreeNode<T> {
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children() {
match op(&child)? {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
Expand All @@ -170,7 +167,7 @@ impl<T: Clone> TreeNode for ExprTreeNode<T> {
F: FnMut(Self) -> Result<Self>,
{
self.child_nodes = self
.children()
.child_nodes
.into_iter()
.map(transform)
.collect::<Result<Vec<_>>>()?;
Expand Down

0 comments on commit 6d93a85

Please sign in to comment.