Skip to content

Commit

Permalink
[SPARK-26138][SQL] Pushdown limit through InnerLike when condition is…
Browse files Browse the repository at this point in the history
… empty

### What changes were proposed in this pull request?

This pr pushdown limit through InnerLike when condition is empty(Origin pr: apache#23104). For example:
```sql
CREATE TABLE t1 using parquet AS SELECT id AS a, id AS b FROM range(2);
CREATE TABLE t2 using parquet AS SELECT id AS d FROM range(2);
SELECT * FROM t1 CROSS JOIN t2 LIMIT 10;
```
Before this pr:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- CollectLimit 10
   +- BroadcastNestedLoopJoin BuildRight, Cross
      :- FileScan parquet default.t1[a#5L,b#6L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/tg/f5mz46090wg7swzgdc69f8q03965_0/T/warehous..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
      +- BroadcastExchange IdentityBroadcastMode, [id=Kyligence#43]
         +- FileScan parquet default.t2[d#7L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/tg/f5mz46090wg7swzgdc69f8q03965_0/T/warehous..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<d:bigint>
```
After this pr:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- CollectLimit 10
   +- BroadcastNestedLoopJoin BuildRight, Cross
      :- LocalLimit 10
      :  +- FileScan parquet default.t1[a#5L,b#6L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/tg/f5mz46090wg7swzgdc69f8q03965_0/T/warehous..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
      +- BroadcastExchange IdentityBroadcastMode, [id=Kyligence#51]
         +- LocalLimit 10
            +- FileScan parquet default.t2[d#7L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/tg/f5mz46090wg7swzgdc69f8q03965_0/T/warehous..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<d:bigint>
```

### Why are the changes needed?

Improve query performance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes apache#31567 from wangyum/SPARK-26138.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
  • Loading branch information
wangyum committed Feb 24, 2021
1 parent 2e31e2c commit b5afff5
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -539,17 +539,22 @@ object LimitPushDown extends Rule[LogicalPlan] {
// pushdown Limit.
case LocalLimit(exp, u: Union) =>
LocalLimit(exp, u.copy(children = u.children.map(maybePushLocalLimit(exp, _))))
// Add extra limits below OUTER JOIN. For LEFT OUTER and RIGHT OUTER JOIN we push limits to
// the left and right sides, respectively. It's not safe to push limits below FULL OUTER
// JOIN in the general case without a more invasive rewrite.
// Add extra limits below JOIN. For LEFT OUTER and RIGHT OUTER JOIN we push limits to
// the left and right sides, respectively. For INNER and CROSS JOIN we push limits to
// both the left and right sides if join condition is empty. It's not safe to push limits
// below FULL OUTER JOIN in the general case without a more invasive rewrite.
// We also need to ensure that this limit pushdown rule will not eventually introduce limits
// on both sides if it is applied multiple times. Therefore:
// - If one side is already limited, stack another limit on top if the new limit is smaller.
// The redundant limit will be collapsed by the CombineLimits rule.
case LocalLimit(exp, join @ Join(left, right, joinType, _, _)) =>
case LocalLimit(exp, join @ Join(left, right, joinType, conditionOpt, _)) =>
val newJoin = joinType match {
case RightOuter => join.copy(right = maybePushLocalLimit(exp, right))
case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left))
case _: InnerLike if conditionOpt.isEmpty =>
join.copy(
left = maybePushLocalLimit(exp, left),
right = maybePushLocalLimit(exp, right))
case _ => join
}
LocalLimit(exp, newJoin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Add
import org.apache.spark.sql.catalyst.plans.{FullOuter, LeftOuter, PlanTest, RightOuter}
import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, PlanTest, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._

Expand Down Expand Up @@ -194,4 +194,22 @@ class LimitPushdownSuite extends PlanTest {
LocalLimit(1, y.groupBy(Symbol("b"))(count(1))))).analyze
comparePlans(expected2, optimized2)
}

test("SPARK-26138: pushdown limit through InnerLike when condition is empty") {
Seq(Cross, Inner).foreach { joinType =>
val originalQuery = x.join(y, joinType).limit(1)
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = Limit(1, LocalLimit(1, x).join(LocalLimit(1, y), joinType)).analyze
comparePlans(optimized, correctAnswer)
}
}

test("SPARK-26138: Should not pushdown limit through InnerLike when condition is not empty") {
Seq(Cross, Inner).foreach { joinType =>
val originalQuery = x.join(y, joinType, Some("x.a".attr === "y.b".attr)).limit(1)
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = Limit(1, x.join(y, joinType, Some("x.a".attr === "y.b".attr))).analyze
comparePlans(optimized, correctAnswer)
}
}
}
17 changes: 15 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, Partial}
import org.apache.spark.sql.catalyst.optimizer.{ConvertToLocalRelation, NestedColumnAliasingSuite}
import org.apache.spark.sql.catalyst.plans.logical.{Project, RepartitionByExpression}
import org.apache.spark.sql.catalyst.plans.logical.{LocalLimit, Project, RepartitionByExpression}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.UnionExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.FunctionsCommand
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
import org.apache.spark.sql.execution.datasources.{LogicalRelation, SchemaColumnConvertNotSupportedException}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
Expand Down Expand Up @@ -4021,6 +4021,19 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
}
}

test("SPARK-26138 Pushdown limit through InnerLike when condition is empty") {
withTable("t1", "t2") {
spark.range(5).repartition(1).write.saveAsTable("t1")
spark.range(5).repartition(1).write.saveAsTable("t2")
val df = spark.sql("SELECT * FROM t1 CROSS JOIN t2 LIMIT 3")
val pushedLocalLimits = df.queryExecution.optimizedPlan.collect {
case l @ LocalLimit(_, _: LogicalRelation) => l
}
assert(pushedLocalLimits.length === 2)
checkAnswer(df, Row(0, 0) :: Row(0, 1) :: Row(0, 2) :: Nil)
}
}
}

case class Foo(bar: Option[String])

0 comments on commit b5afff5

Please sign in to comment.