Skip to content

Commit

Permalink
[GLUTEN-5016][CH] fix simple aggregation sql exchange fallback
Browse files Browse the repository at this point in the history
  • Loading branch information
lwz9103 committed Mar 20, 2024
1 parent 7ad6b3f commit 7e63134
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,19 @@ class GlutenClickHouseTPCHNullableSuite extends GlutenClickHouseTPCHAbstractSuit
|""".stripMargin) { _ => }
assert(result(0).getLong(0) == 227302L)
}

test("test 'GLUTEN-5016'") {
withSQLConf(("spark.gluten.sql.columnar.preferColumnar", "false")) {
val sql =
"""
|SELECT
| sum(l_quantity) AS sum_qty
|FROM
| lineitem
|WHERE
| l_shipdate <= date'1998-09-02'
|""".stripMargin
runSql(sql, noFallBack = true) { _ => }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,27 @@ object MiscColumnarRules {
}
}

// Exchange transformation.
private case class ExchangeTransformRule() extends Rule[SparkPlan] with LogLevelUtil {
override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case plan: ShuffleExchangeExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val child = plan.child
if (
(child.supportsColumnar || GlutenConfig.getConf.enablePreferColumnar) &&
BackendsApiManager.getSettings.supportColumnarShuffleExec()
) {
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarShuffleExchange(plan, child)
} else {
plan.withNewChildren(Seq(child))
}
case plan: BroadcastExchangeExec =>
val child = plan.child
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ColumnarBroadcastExchangeExec(plan.mode, child)
}
}

// Filter transformation.
private case class FilterTransformRule() extends Rule[SparkPlan] with LogLevelUtil {
private val replace = new ReplaceSingleNode()
Expand Down Expand Up @@ -163,7 +184,6 @@ object MiscColumnarRules {
// Utility to replace single node within transformed Gluten node.
// Children will be preserved as they are as children of the output node.
class ReplaceSingleNode() extends LogLevelUtil with Logging {
private val columnarConf: GlutenConfig = GlutenConfig.getConf

def replaceWithTransformerPlan(p: SparkPlan): SparkPlan = {
val plan = p
Expand Down Expand Up @@ -283,17 +303,6 @@ object MiscColumnarRules {
plan.projectList,
child,
offset)
case plan: ShuffleExchangeExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val child = plan.child
if (
(child.supportsColumnar || columnarConf.enablePreferColumnar) &&
BackendsApiManager.getSettings.supportColumnarShuffleExec()
) {
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarShuffleExchange(plan, child)
} else {
plan.withNewChildren(Seq(child))
}
case plan: ShuffledHashJoinExec =>
val left = plan.left
val right = plan.right
Expand All @@ -320,10 +329,6 @@ object MiscColumnarRules {
left,
right,
plan.isSkewJoin)
case plan: BroadcastExchangeExec =>
val child = plan.child
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ColumnarBroadcastExchangeExec(plan.mode, child)
case plan: BroadcastHashJoinExec =>
val left = plan.left
val right = plan.right
Expand Down Expand Up @@ -492,7 +497,8 @@ object MiscColumnarRules {
private val subRules = List(
FilterTransformRule(),
RegularTransformRule(),
AggregationTransformRule()
AggregationTransformRule(),
ExchangeTransformRule()
)

@transient private val planChangeLogger = new PlanChangeLogger[SparkPlan]()
Expand Down

0 comments on commit 7e63134

Please sign in to comment.