Skip to content

Commit be684c4

Browse files
gene-bordegarayalamb
authored andcommitted
fix: Eliminate consecutive repartitions (apache#18521)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#18341. - Closes apache#9370 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Cases where two RepartitionExec operators appear consecutively in the plan. This is unneeded overhead that eliminating provides speed ups. Full Report: [The Physical Optimizer and Fixing Consecutive Repartitions In the Enforce Distribution Rule.pdf](https://github.com/user-attachments/files/23420831/The.Physical.Optimizer.and.Fixing.Consecutive.Repartitions.In.the.Enforce.Distribution.Rule.pdf) Issue Report: [Fixing Consecutive Repartitions In the Enforce Distribution Rule.pdf](https://github.com/user-attachments/files/23420880/Fixing.Consecutive.Repartitions.In.the.Enforce.Distribution.Rule.pdf) ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Change to repartition adding logic in `enforce_distribution.rs` A ton of test and bench updates to mirror new behavior ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes benchmarked and tested, check report for benchmarks ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent ad48005 commit be684c4

37 files changed

+662
-837
lines changed

datafusion/core/tests/dataframe/mod.rs

Lines changed: 24 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2864,10 +2864,9 @@ async fn test_count_wildcard_on_sort() -> Result<()> {
28642864
| | ProjectionExec: expr=[b@0 as b, count(Int64(1))@1 as count(*), count(Int64(1))@1 as count(Int64(1))] |
28652865
| | AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[count(Int64(1))] |
28662866
| | CoalesceBatchesExec: target_batch_size=8192 |
2867-
| | RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4 |
2868-
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
2869-
| | AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[count(Int64(1))] |
2870-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
2867+
| | RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=1 |
2868+
| | AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[count(Int64(1))] |
2869+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
28712870
| | |
28722871
+---------------+------------------------------------------------------------------------------------------------------------+
28732872
"###
@@ -2876,22 +2875,21 @@ async fn test_count_wildcard_on_sort() -> Result<()> {
28762875
assert_snapshot!(
28772876
pretty_format_batches(&df_results).unwrap(),
28782877
@r###"
2879-
+---------------+--------------------------------------------------------------------------------+
2880-
| plan_type | plan |
2881-
+---------------+--------------------------------------------------------------------------------+
2882-
| logical_plan | Sort: count(*) ASC NULLS LAST |
2883-
| | Aggregate: groupBy=[[t1.b]], aggr=[[count(Int64(1)) AS count(*)]] |
2884-
| | TableScan: t1 projection=[b] |
2885-
| physical_plan | SortPreservingMergeExec: [count(*)@1 ASC NULLS LAST] |
2886-
| | SortExec: expr=[count(*)@1 ASC NULLS LAST], preserve_partitioning=[true] |
2887-
| | AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[count(*)] |
2888-
| | CoalesceBatchesExec: target_batch_size=8192 |
2889-
| | RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4 |
2890-
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
2891-
| | AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[count(*)] |
2892-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
2893-
| | |
2894-
+---------------+--------------------------------------------------------------------------------+
2878+
+---------------+----------------------------------------------------------------------------+
2879+
| plan_type | plan |
2880+
+---------------+----------------------------------------------------------------------------+
2881+
| logical_plan | Sort: count(*) ASC NULLS LAST |
2882+
| | Aggregate: groupBy=[[t1.b]], aggr=[[count(Int64(1)) AS count(*)]] |
2883+
| | TableScan: t1 projection=[b] |
2884+
| physical_plan | SortPreservingMergeExec: [count(*)@1 ASC NULLS LAST] |
2885+
| | SortExec: expr=[count(*)@1 ASC NULLS LAST], preserve_partitioning=[true] |
2886+
| | AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[count(*)] |
2887+
| | CoalesceBatchesExec: target_batch_size=8192 |
2888+
| | RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=1 |
2889+
| | AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[count(*)] |
2890+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
2891+
| | |
2892+
+---------------+----------------------------------------------------------------------------+
28952893
"###
28962894
);
28972895
Ok(())
@@ -3351,10 +3349,9 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
33513349
| | ProjectionExec: expr=[count(Int64(1))@1 as count(*), a@0 as a, true as __always_true] |
33523350
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(Int64(1))] |
33533351
| | CoalesceBatchesExec: target_batch_size=8192 |
3354-
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 |
3355-
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
3356-
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))] |
3357-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
3352+
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 |
3353+
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))] |
3354+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
33583355
| | |
33593356
+---------------+---------------------------------------------------------------------------------------------------------------------------+
33603357
"
@@ -3408,10 +3405,9 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
34083405
| | ProjectionExec: expr=[count(*)@1 as count(*), a@0 as a, true as __always_true] |
34093406
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)] |
34103407
| | CoalesceBatchesExec: target_batch_size=8192 |
3411-
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 |
3412-
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
3413-
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)] |
3414-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
3408+
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 |
3409+
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)] |
3410+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
34153411
| | |
34163412
+---------------+---------------------------------------------------------------------------------------------------------------------------+
34173413
"

0 commit comments

Comments
 (0)