Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ fn pushdown_requirement_to_children(
} else if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
handle_hash_join(hash_join, parent_required)
} else {
handle_custom_pushdown(plan, parent_required, maintains_input_order)
handle_custom_pushdown(plan, parent_required, &maintains_input_order)
}
// TODO: Add support for Projection push down
}
Expand Down Expand Up @@ -604,7 +604,7 @@ fn expr_source_side(
fn handle_custom_pushdown(
plan: &Arc<dyn ExecutionPlan>,
parent_required: OrderingRequirements,
maintains_input_order: Vec<bool>,
maintains_input_order: &[bool],
) -> Result<Option<Vec<Option<OrderingRequirements>>>> {
// If the plan has no children, return early:
if plan.children().is_empty() {
Expand Down
11 changes: 6 additions & 5 deletions datafusion/physical-optimizer/src/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ impl PhysicalOptimizerRule for FilterPushdown {
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(
push_down_filters(Arc::clone(&plan), vec![], config, self.phase)?
push_down_filters(&Arc::clone(&plan), vec![], config, self.phase)?
.updated_node
.unwrap_or(plan),
)
Expand All @@ -438,7 +438,7 @@ impl PhysicalOptimizerRule for FilterPushdown {
}

fn push_down_filters(
node: Arc<dyn ExecutionPlan>,
node: &Arc<dyn ExecutionPlan>,
parent_predicates: Vec<Arc<dyn PhysicalExpr>>,
config: &ConfigOptions,
phase: FilterPushdownPhase,
Expand Down Expand Up @@ -510,7 +510,8 @@ fn push_down_filters(
let num_parent_filters = all_predicates.len() - num_self_filters;

// Any filters that could not be pushed down to a child are marked as not-supported to our parents
let result = push_down_filters(Arc::clone(child), all_predicates, config, phase)?;
let result =
push_down_filters(&Arc::clone(child), all_predicates, config, phase)?;

if let Some(new_child) = result.updated_node {
// If we have a filter pushdown result, we need to update our children
Expand Down Expand Up @@ -571,7 +572,7 @@ fn push_down_filters(
}

// Re-create this node with new children
let updated_node = with_new_children_if_necessary(Arc::clone(&node), new_children)?;
let updated_node = with_new_children_if_necessary(Arc::clone(node), new_children)?;

// TODO: by calling `handle_child_pushdown_result` we are assuming that the
// `ExecutionPlan` implementation will not change the plan itself.
Expand All @@ -596,7 +597,7 @@ fn push_down_filters(
)?;
// Compare pointers for new_node and node, if they are different we must replace
// ourselves because of changes in our children.
if res.updated_node.is_none() && !Arc::ptr_eq(&updated_node, &node) {
if res.updated_node.is_none() && !Arc::ptr_eq(&updated_node, node) {
res.updated_node = Some(updated_node)
}
Ok(res)
Expand Down
3 changes: 3 additions & 0 deletions datafusion/physical-optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
// Make sure fast / cheap clones on Arc are explicit:
// https://github.com/apache/datafusion/issues/11143
#![deny(clippy::clone_on_ref_ptr)]
// https://github.com/apache/datafusion/issues/18503
#![deny(clippy::needless_pass_by_value)]
#![cfg_attr(test, allow(clippy::needless_pass_by_value))]

pub mod aggregate_statistics;
pub mod coalesce_async_exec_input;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-optimizer/src/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ fn try_push_down_join_filter(

let join_filter = minimize_join_filter(
Arc::clone(rhs_rewrite.data.1.expression()),
rhs_rewrite.data.1.column_indices().to_vec(),
rhs_rewrite.data.1.column_indices(),
lhs_rewrite.data.0.schema().as_ref(),
rhs_rewrite.data.0.schema().as_ref(),
);
Expand Down Expand Up @@ -238,7 +238,7 @@ fn try_push_down_projection(
/// columns are not needed anymore.
fn minimize_join_filter(
expr: Arc<dyn PhysicalExpr>,
old_column_indices: Vec<ColumnIndex>,
old_column_indices: &[ColumnIndex],
lhs_schema: &Schema,
rhs_schema: &Schema,
) -> JoinFilter {
Expand Down