Skip to content

Commit

Permalink
[SPARK-33038][SQL] Combine AQE initial and current plan string when t…
Browse files Browse the repository at this point in the history
…wo plans are the same

### What changes were proposed in this pull request?
This PR combines the current plan and the initial plan in the AQE query plan string when the two plans are the same. It also removes the `== Current Plan ==` and `== Initial Plan ==` headers:

Before
```scala
AdaptiveSparkPlan isFinalPlan=false
+- == Current Plan ==
   SortMergeJoin [key#13], [a#23], Inner
   :- Sort [key#13 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(key#13, 5), true, [id=#94]
            ...
+- == Initial Plan ==
   SortMergeJoin [key#13], [a#23], Inner
   :- Sort [key#13 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(key#13, 5), true, [id=#94]
            ...
```
After
```scala
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [key#13], [a#23], Inner
   :- Sort [key#13 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(key#13, 5), true, [id=#94]
            ...
```
For SQL `EXPLAIN` output:
Before
```scala
AdaptiveSparkPlan (8)
+- == Current Plan ==
   Sort (7)
   +- Exchange (6)
      ...
+- == Initial Plan ==
   Sort (7)
   +- Exchange (6)
      ...
```
After
```scala
AdaptiveSparkPlan (8)
+- Sort (7)
   +- Exchange (6)
      ...
```

### Why are the changes needed?
To simplify the AQE plan string by removing the redundant plan information.

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
Modified the existing unit test.

Closes apache#29915 from allisonwang-db/aqe-explain.

Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Xiao Li <[email protected]>
  • Loading branch information
allisonwang-db authored and gatorsmile committed Oct 5, 2020
1 parent a09747b commit 14aeab3
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -300,26 +300,40 @@ case class AdaptiveSparkPlanExec(
maxFields,
printNodeId,
indent)
generateTreeStringWithHeader(
if (isFinalPlan) "Final Plan" else "Current Plan",
currentPhysicalPlan,
depth,
lastChildren,
append,
verbose,
maxFields,
printNodeId)
generateTreeStringWithHeader(
"Initial Plan",
initialPlan,
depth,
lastChildren,
append,
verbose,
maxFields,
printNodeId)
if (currentPhysicalPlan.fastEquals(initialPlan)) {
currentPhysicalPlan.generateTreeString(
depth + 1,
lastChildren :+ true,
append,
verbose,
prefix = "",
addSuffix = false,
maxFields,
printNodeId,
indent)
} else {
generateTreeStringWithHeader(
if (isFinalPlan) "Final Plan" else "Current Plan",
currentPhysicalPlan,
depth,
lastChildren,
append,
verbose,
maxFields,
printNodeId)
generateTreeStringWithHeader(
"Initial Plan",
initialPlan,
depth,
lastChildren,
append,
verbose,
maxFields,
printNodeId)
}
}


private def generateTreeStringWithHeader(
header: String,
plan: SparkPlan,
Expand Down
123 changes: 13 additions & 110 deletions sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,7 @@ struct<plan:string>
-- !query output
== Physical Plan ==
AdaptiveSparkPlan (8)
+- == Current Plan ==
Sort (7)
+- Exchange (6)
+- HashAggregate (5)
+- Exchange (4)
+- HashAggregate (3)
+- Filter (2)
+- Scan parquet default.explain_temp1 (1)
+- == Initial Plan ==
Sort (7)
+- Sort (7)
+- Exchange (6)
+- HashAggregate (5)
+- Exchange (4)
Expand Down Expand Up @@ -126,16 +117,7 @@ struct<plan:string>
-- !query output
== Physical Plan ==
AdaptiveSparkPlan (8)
+- == Current Plan ==
Project (7)
+- Filter (6)
+- HashAggregate (5)
+- Exchange (4)
+- HashAggregate (3)
+- Filter (2)
+- Scan parquet default.explain_temp1 (1)
+- == Initial Plan ==
Project (7)
+- Project (7)
+- Filter (6)
+- HashAggregate (5)
+- Exchange (4)
Expand Down Expand Up @@ -196,17 +178,7 @@ struct<plan:string>
-- !query output
== Physical Plan ==
AdaptiveSparkPlan (9)
+- == Current Plan ==
HashAggregate (8)
+- Exchange (7)
+- HashAggregate (6)
+- Union (5)
:- Filter (2)
: +- Scan parquet default.explain_temp1 (1)
+- Filter (4)
+- Scan parquet default.explain_temp1 (3)
+- == Initial Plan ==
HashAggregate (8)
+- HashAggregate (8)
+- Exchange (7)
+- HashAggregate (6)
+- Union (5)
Expand Down Expand Up @@ -274,15 +246,7 @@ struct<plan:string>
-- !query output
== Physical Plan ==
AdaptiveSparkPlan (7)
+- == Current Plan ==
BroadcastHashJoin Inner BuildRight (6)
:- Filter (2)
: +- Scan parquet default.explain_temp1 (1)
+- BroadcastExchange (5)
+- Filter (4)
+- Scan parquet default.explain_temp2 (3)
+- == Initial Plan ==
BroadcastHashJoin Inner BuildRight (6)
+- BroadcastHashJoin Inner BuildRight (6)
:- Filter (2)
: +- Scan parquet default.explain_temp1 (1)
+- BroadcastExchange (5)
Expand Down Expand Up @@ -337,14 +301,7 @@ struct<plan:string>
-- !query output
== Physical Plan ==
AdaptiveSparkPlan (6)
+- == Current Plan ==
BroadcastHashJoin LeftOuter BuildRight (5)
:- Scan parquet default.explain_temp1 (1)
+- BroadcastExchange (4)
+- Filter (3)
+- Scan parquet default.explain_temp2 (2)
+- == Initial Plan ==
BroadcastHashJoin LeftOuter BuildRight (5)
+- BroadcastHashJoin LeftOuter BuildRight (5)
:- Scan parquet default.explain_temp1 (1)
+- BroadcastExchange (4)
+- Filter (3)
Expand Down Expand Up @@ -398,11 +355,7 @@ struct<plan:string>
-- !query output
== Physical Plan ==
AdaptiveSparkPlan (3)
+- == Current Plan ==
Filter (2)
+- Scan parquet default.explain_temp1 (1)
+- == Initial Plan ==
Filter (2)
+- Filter (2)
+- Scan parquet default.explain_temp1 (1)


Expand Down Expand Up @@ -438,11 +391,7 @@ struct<plan:string>
-- !query output
== Physical Plan ==
AdaptiveSparkPlan (3)
+- == Current Plan ==
Filter (2)
+- Scan parquet default.explain_temp1 (1)
+- == Initial Plan ==
Filter (2)
+- Filter (2)
+- Scan parquet default.explain_temp1 (1)


Expand Down Expand Up @@ -470,11 +419,7 @@ struct<plan:string>
-- !query output
== Physical Plan ==
AdaptiveSparkPlan (3)
+- == Current Plan ==
Project (2)
+- Scan parquet default.explain_temp1 (1)
+- == Initial Plan ==
Project (2)
+- Project (2)
+- Scan parquet default.explain_temp1 (1)


Expand Down Expand Up @@ -506,15 +451,7 @@ struct<plan:string>
-- !query output
== Physical Plan ==
AdaptiveSparkPlan (7)
+- == Current Plan ==
BroadcastHashJoin Inner BuildRight (6)
:- Filter (2)
: +- Scan parquet default.explain_temp1 (1)
+- BroadcastExchange (5)
+- Filter (4)
+- Scan parquet default.explain_temp1 (3)
+- == Initial Plan ==
BroadcastHashJoin Inner BuildRight (6)
+- BroadcastHashJoin Inner BuildRight (6)
:- Filter (2)
: +- Scan parquet default.explain_temp1 (1)
+- BroadcastExchange (5)
Expand Down Expand Up @@ -572,21 +509,7 @@ struct<plan:string>
-- !query output
== Physical Plan ==
AdaptiveSparkPlan (13)
+- == Current Plan ==
BroadcastHashJoin Inner BuildRight (12)
:- HashAggregate (5)
: +- Exchange (4)
: +- HashAggregate (3)
: +- Filter (2)
: +- Scan parquet default.explain_temp1 (1)
+- BroadcastExchange (11)
+- HashAggregate (10)
+- Exchange (9)
+- HashAggregate (8)
+- Filter (7)
+- Scan parquet default.explain_temp1 (6)
+- == Initial Plan ==
BroadcastHashJoin Inner BuildRight (12)
+- BroadcastHashJoin Inner BuildRight (12)
:- HashAggregate (5)
: +- Exchange (4)
: +- HashAggregate (3)
Expand Down Expand Up @@ -710,13 +633,7 @@ struct<plan:string>
-- !query output
== Physical Plan ==
AdaptiveSparkPlan (5)
+- == Current Plan ==
HashAggregate (4)
+- Exchange (3)
+- HashAggregate (2)
+- Scan parquet default.explain_temp1 (1)
+- == Initial Plan ==
HashAggregate (4)
+- HashAggregate (4)
+- Exchange (3)
+- HashAggregate (2)
+- Scan parquet default.explain_temp1 (1)
Expand Down Expand Up @@ -761,13 +678,7 @@ struct<plan:string>
-- !query output
== Physical Plan ==
AdaptiveSparkPlan (5)
+- == Current Plan ==
ObjectHashAggregate (4)
+- Exchange (3)
+- ObjectHashAggregate (2)
+- Scan parquet default.explain_temp4 (1)
+- == Initial Plan ==
ObjectHashAggregate (4)
+- ObjectHashAggregate (4)
+- Exchange (3)
+- ObjectHashAggregate (2)
+- Scan parquet default.explain_temp4 (1)
Expand Down Expand Up @@ -812,15 +723,7 @@ struct<plan:string>
-- !query output
== Physical Plan ==
AdaptiveSparkPlan (7)
+- == Current Plan ==
SortAggregate (6)
+- Sort (5)
+- Exchange (4)
+- SortAggregate (3)
+- Sort (2)
+- Scan parquet default.explain_temp4 (1)
+- == Initial Plan ==
SortAggregate (6)
+- SortAggregate (6)
+- Sort (5)
+- Exchange (4)
+- SortAggregate (3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -842,8 +842,8 @@ class AdaptiveQueryExecSuite
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val df = sql("SELECT * FROM testData join testData2 ON key = a where value = '1'")
val planBefore = df.queryExecution.executedPlan
assert(planBefore.toString.contains("== Current Plan =="))
assert(planBefore.toString.contains("== Initial Plan =="))
assert(!planBefore.toString.contains("== Current Plan =="))
assert(!planBefore.toString.contains("== Initial Plan =="))
df.collect()
val planAfter = df.queryExecution.executedPlan
assert(planAfter.toString.contains("== Final Plan =="))
Expand Down

0 comments on commit 14aeab3

Please sign in to comment.