Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 0 additions & 71 deletions datafusion/physical-optimizer/src/coalesce_async_exec_input.rs

This file was deleted.

22 changes: 19 additions & 3 deletions datafusion/physical-optimizer/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ use crate::PhysicalOptimizerRule;

use std::sync::Arc;

use datafusion_common::config::ConfigOptions;
use datafusion_common::error::Result;
use datafusion_common::{config::ConfigOptions, internal_err};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_plan::{
coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec,
repartition::RepartitionExec, ExecutionPlan,
async_func::AsyncFuncExec, coalesce_batches::CoalesceBatchesExec, filter::FilterExec,
joins::HashJoinExec, repartition::RepartitionExec, ExecutionPlan,
};

use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
Expand Down Expand Up @@ -72,11 +72,27 @@ impl PhysicalOptimizerRule for CoalesceBatches {
)
})
.unwrap_or(false);

if wrap_in_coalesce {
Ok(Transformed::yes(Arc::new(CoalesceBatchesExec::new(
plan,
target_batch_size,
))))
} else if let Some(async_exec) = plan_any.downcast_ref::<AsyncFuncExec>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add it to the expression for wrap_in_coalesce before? E.g.

|| plan_any .downcast_ref::<AsyncFuncExec>() .map(|f| ... etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered adding it at first, but I realized that inside AsyncFuncExec, the CoalesceBatchesExec is wrapped around the first child, whereas for the other operators it's wrapped around the entire plan.
I'm not sure whether changing the order inside AsyncFuncExec to match the others would have any impact, so I kept it as is for now.
I'll look into it further, but if anyone already has context on why it’s done this way, I’d appreciate any insights.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the idea is that for async functions, we are specifically interested in batching together inputs to the function so ideally it is not called as often (which can be expensive for async function). Whereas for coalesce batches in general, it looks like it considers the output of a node and if it is too small it wraps that note in a coalesce.

So for async, we consider input to aync node for coalesce logic; for other node types we look at their output for coalesce logic

We should copy the comment from coalesce_async_exec_input.rs here to not lose that context, e.g.

Coalesce inputs to async functions to reduce number of async function invocations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick reply. Just added the comment.

// Coalesce inputs to async functions to reduce number of async function invocations
let children = async_exec.children();
if children.len() != 1 {
return internal_err!(
"Expected AsyncFuncExec to have exactly one child"
);
}

let coalesce_exec = Arc::new(CoalesceBatchesExec::new(
Arc::clone(children[0]),
target_batch_size,
));
let new_plan = plan.with_new_children(vec![coalesce_exec])?;
Ok(Transformed::yes(new_plan))
} else {
Ok(Transformed::no(plan))
}
Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#![deny(clippy::clone_on_ref_ptr)]

pub mod aggregate_statistics;
pub mod coalesce_async_exec_input;
pub mod coalesce_batches;
pub mod combine_partial_final_agg;
pub mod enforce_distribution;
Expand Down
2 changes: 0 additions & 2 deletions datafusion/physical-optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use crate::sanity_checker::SanityCheckPlan;
use crate::topk_aggregation::TopKAggregation;
use crate::update_aggr_exprs::OptimizeAggregateOrder;

use crate::coalesce_async_exec_input::CoalesceAsyncExecInput;
use crate::limit_pushdown_past_window::LimitPushPastWindows;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Result;
Expand Down Expand Up @@ -123,7 +122,6 @@ impl PhysicalOptimizer {
// The CoalesceBatches rule will not influence the distribution and ordering of the
// whole plan tree. Therefore, to avoid influencing other rules, it should run last.
Arc::new(CoalesceBatches::new()),
Arc::new(CoalesceAsyncExecInput::new()),
// Remove the ancillary output requirement operator since we are done with the planning
// phase.
Arc::new(OutputRequirements::new_remove_mode()),
Expand Down
4 changes: 0 additions & 4 deletions datafusion/sqllogictest/test_files/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ physical_plan after EnforceSorting SAME TEXT AS ABOVE
physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after coalesce_batches SAME TEXT AS ABOVE
physical_plan after coalesce_async_exec_input SAME TEXT AS ABOVE
physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true
physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
Expand Down Expand Up @@ -317,7 +316,6 @@ physical_plan after EnforceSorting SAME TEXT AS ABOVE
physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after coalesce_batches SAME TEXT AS ABOVE
physical_plan after coalesce_async_exec_input SAME TEXT AS ABOVE
physical_plan after OutputRequirements
01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
Expand Down Expand Up @@ -362,7 +360,6 @@ physical_plan after EnforceSorting SAME TEXT AS ABOVE
physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after coalesce_batches SAME TEXT AS ABOVE
physical_plan after coalesce_async_exec_input SAME TEXT AS ABOVE
physical_plan after OutputRequirements
01)GlobalLimitExec: skip=0, fetch=10
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet
Expand Down Expand Up @@ -602,7 +599,6 @@ physical_plan after EnforceSorting SAME TEXT AS ABOVE
physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after coalesce_batches SAME TEXT AS ABOVE
physical_plan after coalesce_async_exec_input SAME TEXT AS ABOVE
physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true
physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
Expand Down