From 14aeab3b279b1c23cddb86b97afc048c195b9b75 Mon Sep 17 00:00:00 2001 From: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com> Date: Mon, 5 Oct 2020 09:30:27 -0700 Subject: [PATCH] [SPARK-33038][SQL] Combine AQE initial and current plan string when two plans are the same ### What changes were proposed in this pull request? This PR combines the current plan and the initial plan in the AQE query plan string when the two plans are the same. It also removes the `== Current Plan ==` and `== Initial Plan ==` headers: Before ```scala AdaptiveSparkPlan isFinalPlan=false +- == Current Plan == SortMergeJoin [key#13], [a#23], Inner :- Sort [key#13 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(key#13, 5), true, [id=#94] ... +- == Initial Plan == SortMergeJoin [key#13], [a#23], Inner :- Sort [key#13 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(key#13, 5), true, [id=#94] ... ``` After ```scala AdaptiveSparkPlan isFinalPlan=false +- SortMergeJoin [key#13], [a#23], Inner :- Sort [key#13 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(key#13, 5), true, [id=#94] ... ``` For SQL `EXPLAIN` output: Before ```scala AdaptiveSparkPlan (8) +- == Current Plan == Sort (7) +- Exchange (6) ... +- == Initial Plan == Sort (7) +- Exchange (6) ... ``` After ```scala AdaptiveSparkPlan (8) +- Sort (7) +- Exchange (6) ... ``` ### Why are the changes needed? To simplify the AQE plan string by removing the redundant plan information. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Modified the existing unit test. Closes #29915 from allisonwang-db/aqe-explain. Authored-by: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com> Signed-off-by: Xiao Li --- .../adaptive/AdaptiveSparkPlanExec.scala | 50 ++++--- .../sql-tests/results/explain-aqe.sql.out | 123 ++---------------- .../adaptive/AdaptiveQueryExecSuite.scala | 4 +- 3 files changed, 47 insertions(+), 130 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 6c197fedd8c56..0e032569bb8a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -300,26 +300,40 @@ case class AdaptiveSparkPlanExec( maxFields, printNodeId, indent) - generateTreeStringWithHeader( - if (isFinalPlan) "Final Plan" else "Current Plan", - currentPhysicalPlan, - depth, - lastChildren, - append, - verbose, - maxFields, - printNodeId) - generateTreeStringWithHeader( - "Initial Plan", - initialPlan, - depth, - lastChildren, - append, - verbose, - maxFields, - printNodeId) + if (currentPhysicalPlan.fastEquals(initialPlan)) { + currentPhysicalPlan.generateTreeString( + depth + 1, + lastChildren :+ true, + append, + verbose, + prefix = "", + addSuffix = false, + maxFields, + printNodeId, + indent) + } else { + generateTreeStringWithHeader( + if (isFinalPlan) "Final Plan" else "Current Plan", + currentPhysicalPlan, + depth, + lastChildren, + append, + verbose, + maxFields, + printNodeId) + generateTreeStringWithHeader( + "Initial Plan", + initialPlan, + depth, + lastChildren, + append, + verbose, + maxFields, + printNodeId) + } } + private def generateTreeStringWithHeader( header: String, plan: SparkPlan, diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out index 3a850160b43e0..5435cde050fd1 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out @@ -54,16 +54,7 @@ struct -- !query output == Physical Plan == AdaptiveSparkPlan (8) -+- == Current Plan == - Sort (7) - +- Exchange (6) - +- HashAggregate (5) - +- Exchange (4) - +- HashAggregate (3) - +- Filter (2) - +- Scan parquet default.explain_temp1 (1) -+- == Initial Plan == - Sort (7) ++- Sort (7) +- Exchange (6) +- HashAggregate (5) +- Exchange (4) @@ -126,16 +117,7 @@ struct -- !query output == Physical Plan == AdaptiveSparkPlan (8) -+- == Current Plan == - Project (7) - +- Filter (6) - +- HashAggregate (5) - +- Exchange (4) - +- HashAggregate (3) - +- Filter (2) - +- Scan parquet default.explain_temp1 (1) -+- == Initial Plan == - Project (7) ++- Project (7) +- Filter (6) +- HashAggregate (5) +- Exchange (4) @@ -196,17 +178,7 @@ struct -- !query output == Physical Plan == AdaptiveSparkPlan (9) -+- == Current Plan == - HashAggregate (8) - +- Exchange (7) - +- HashAggregate (6) - +- Union (5) - :- Filter (2) - : +- Scan parquet default.explain_temp1 (1) - +- Filter (4) - +- Scan parquet default.explain_temp1 (3) -+- == Initial Plan == - HashAggregate (8) ++- HashAggregate (8) +- Exchange (7) +- HashAggregate (6) +- Union (5) @@ -274,15 +246,7 @@ struct -- !query output == Physical Plan == AdaptiveSparkPlan (7) -+- == Current Plan == - BroadcastHashJoin Inner BuildRight (6) - :- Filter (2) - : +- Scan parquet default.explain_temp1 (1) - +- BroadcastExchange (5) - +- Filter (4) - +- Scan parquet default.explain_temp2 (3) -+- == Initial Plan == - BroadcastHashJoin Inner BuildRight (6) ++- BroadcastHashJoin Inner BuildRight (6) :- Filter (2) : +- Scan parquet default.explain_temp1 (1) +- BroadcastExchange (5) @@ -337,14 +301,7 @@ struct -- !query output == Physical Plan == AdaptiveSparkPlan (6) -+- == Current Plan == - BroadcastHashJoin LeftOuter BuildRight (5) - :- Scan parquet default.explain_temp1 (1) - +- BroadcastExchange (4) - +- Filter (3) - +- Scan parquet default.explain_temp2 (2) -+- == Initial Plan == - BroadcastHashJoin LeftOuter BuildRight (5) ++- BroadcastHashJoin LeftOuter BuildRight (5) :- Scan parquet default.explain_temp1 (1) +- BroadcastExchange (4) +- Filter (3) @@ -398,11 +355,7 @@ struct -- !query output == Physical Plan == AdaptiveSparkPlan (3) -+- == Current Plan == - Filter (2) - +- Scan parquet default.explain_temp1 (1) -+- == Initial Plan == - Filter (2) ++- Filter (2) +- Scan parquet default.explain_temp1 (1) @@ -438,11 +391,7 @@ struct -- !query output == Physical Plan == AdaptiveSparkPlan (3) -+- == Current Plan == - Filter (2) - +- Scan parquet default.explain_temp1 (1) -+- == Initial Plan == - Filter (2) ++- Filter (2) +- Scan parquet default.explain_temp1 (1) @@ -470,11 +419,7 @@ struct -- !query output == Physical Plan == AdaptiveSparkPlan (3) -+- == Current Plan == - Project (2) - +- Scan parquet default.explain_temp1 (1) -+- == Initial Plan == - Project (2) ++- Project (2) +- Scan parquet default.explain_temp1 (1) @@ -506,15 +451,7 @@ struct -- !query output == Physical Plan == AdaptiveSparkPlan (7) -+- == Current Plan == - BroadcastHashJoin Inner BuildRight (6) - :- Filter (2) - : +- Scan parquet default.explain_temp1 (1) - +- BroadcastExchange (5) - +- Filter (4) - +- Scan parquet default.explain_temp1 (3) -+- == Initial Plan == - BroadcastHashJoin Inner BuildRight (6) ++- BroadcastHashJoin Inner BuildRight (6) :- Filter (2) : +- Scan parquet default.explain_temp1 (1) +- BroadcastExchange (5) @@ -572,21 +509,7 @@ struct -- !query output == Physical Plan == AdaptiveSparkPlan (13) -+- == Current Plan == - BroadcastHashJoin Inner BuildRight (12) - :- HashAggregate (5) - : +- Exchange (4) - : +- HashAggregate (3) - : +- Filter (2) - : +- Scan parquet default.explain_temp1 (1) - +- BroadcastExchange (11) - +- HashAggregate (10) - +- Exchange (9) - +- HashAggregate (8) - +- Filter (7) - +- Scan parquet default.explain_temp1 (6) -+- == Initial Plan == - BroadcastHashJoin Inner BuildRight (12) ++- BroadcastHashJoin Inner BuildRight (12) :- HashAggregate (5) : +- Exchange (4) : +- HashAggregate (3) @@ -710,13 +633,7 @@ struct -- !query output == Physical Plan == AdaptiveSparkPlan (5) -+- == Current Plan == - HashAggregate (4) - +- Exchange (3) - +- HashAggregate (2) - +- Scan parquet default.explain_temp1 (1) -+- == Initial Plan == - HashAggregate (4) ++- HashAggregate (4) +- Exchange (3) +- HashAggregate (2) +- Scan parquet default.explain_temp1 (1) @@ -761,13 +678,7 @@ struct -- !query output == Physical Plan == AdaptiveSparkPlan (5) -+- == Current Plan == - ObjectHashAggregate (4) - +- Exchange (3) - +- ObjectHashAggregate (2) - +- Scan parquet default.explain_temp4 (1) -+- == Initial Plan == - ObjectHashAggregate (4) ++- ObjectHashAggregate (4) +- Exchange (3) +- ObjectHashAggregate (2) +- Scan parquet default.explain_temp4 (1) @@ -812,15 +723,7 @@ struct -- !query output == Physical Plan == AdaptiveSparkPlan (7) -+- == Current Plan == - SortAggregate (6) - +- Sort (5) - +- Exchange (4) - +- SortAggregate (3) - +- Sort (2) - +- Scan parquet default.explain_temp4 (1) -+- == Initial Plan == - SortAggregate (6) ++- SortAggregate (6) +- Sort (5) +- Exchange (4) +- SortAggregate (3) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 8799dbb14ef34..0dfb1d2fd9eda 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -842,8 +842,8 @@ class AdaptiveQueryExecSuite withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { val df = sql("SELECT * FROM testData join testData2 ON key = a where value = '1'") val planBefore = df.queryExecution.executedPlan - assert(planBefore.toString.contains("== Current Plan ==")) - assert(planBefore.toString.contains("== Initial Plan ==")) + assert(!planBefore.toString.contains("== Current Plan ==")) + assert(!planBefore.toString.contains("== Initial Plan ==")) df.collect() val planAfter = df.queryExecution.executedPlan assert(planAfter.toString.contains("== Final Plan =="))