-
Notifications
You must be signed in to change notification settings - Fork 1.7k
refactor: merge CoalesceAsyncExecInput into CoalesceBatches #18540
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
refactor: merge CoalesceAsyncExecInput into CoalesceBatches #18540
Conversation
…ty into CoalesceBatches
| plan, | ||
| target_batch_size, | ||
| )))) | ||
| } else if let Some(async_exec) = plan_any.downcast_ref::<AsyncFuncExec>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add it to the expression for wrap_in_coalesce before? E.g.
|| plan_any .downcast_ref::<AsyncFuncExec>() .map(|f| ... etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered adding it at first, but I realized that inside AsyncFuncExec, the CoalesceBatchesExec is wrapped around the first child, whereas for the other operators it's wrapped around the entire plan.
I'm not sure whether changing the order inside AsyncFuncExec to match the others would have any impact, so I kept it as is for now.
I'll look into it further, but if anyone already has context on why it’s done this way, I’d appreciate any insights.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the idea is that for async functions, we are specifically interested in batching together inputs to the function so ideally it is not called as often (which can be expensive for async function). Whereas for coalesce batches in general, it looks like it considers the output of a node and if it is too small it wraps that note in a coalesce.
So for async, we consider input to aync node for coalesce logic; for other node types we look at their output for coalesce logic
We should copy the comment from coalesce_async_exec_input.rs here to not lose that context, e.g.
Coalesce inputs to async functions to reduce number of async function invocations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick reply. Just added the comment.
Jefffrey
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems fine to me, apart from minor comment about some documentation.
We have existing tests for this:
datafusion/datafusion/sqllogictest/test_files/async_udf.slt
Lines 28 to 41 in f162fd3
| query TT | |
| explain select min(async_abs(x)) from data; | |
| ---- | |
| logical_plan | |
| 01)Aggregate: groupBy=[[]], aggr=[[min(async_abs(data.x))]] | |
| 02)--TableScan: data projection=[x] | |
| physical_plan | |
| 01)AggregateExec: mode=Final, gby=[], aggr=[min(async_abs(data.x))] | |
| 02)--CoalescePartitionsExec | |
| 03)----AggregateExec: mode=Partial, gby=[], aggr=[min(async_abs(data.x))] | |
| 04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | |
| 05)--------AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=async_abs(x@0))] | |
| 06)----------CoalesceBatchesExec: target_batch_size=8192 | |
| 07)------------DataSourceExec: partitions=1, partition_sizes=[1] |
Since they pass it looks like this refactor works.
| plan, | ||
| target_batch_size, | ||
| )))) | ||
| } else if let Some(async_exec) = plan_any.downcast_ref::<AsyncFuncExec>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the idea is that for async functions, we are specifically interested in batching together inputs to the function so ideally it is not called as often (which can be expensive for async function). Whereas for coalesce batches in general, it looks like it considers the output of a node and if it is too small it wraps that note in a coalesce.
So for async, we consider input to aync node for coalesce logic; for other node types we look at their output for coalesce logic
We should copy the comment from coalesce_async_exec_input.rs here to not lose that context, e.g.
Coalesce inputs to async functions to reduce number of async function invocations
Which issue does this PR close?
CoalesceAsyncExecInputphysical optimizer rule intoCoalesceBatches#18155.Rationale for this change
What changes are included in this PR?
Merges the functionality of
CoalesceAsyncExecInputintoCoalesceBatchesto remove redundant optimizer logic and simplify batch coalescing behavior.Are these changes tested?
Behavior is covered by existing ``CoalesceBatches and optimizer tests.
Are there any user-facing changes?
No