Skip to content

Commit

Permalink
Spark: Implement SupportsPushDownTopN
Browse files Browse the repository at this point in the history
  • Loading branch information
pan3793 committed Jan 10, 2023
1 parent dd6ee03 commit 9ca4bfb
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,28 @@ object ExprUtils extends SQLConfHelper {
case unsupported => throw CHClientException(s"Unsupported ClickHouse expression: $unsupported")
}

def toClickHouse(transform: Transform): Expr = transform match {
def toClickHouseOpt(v2Expr: V2Expression): Option[Expr] = Try(toClickHouse(v2Expr)).toOption

def toClickHouse(v2Expr: V2Expression): Expr = v2Expr match {
// sort order
case sortOrder: SortOrder =>
val asc = sortOrder.direction == SortDirection.ASCENDING
val nullFirst = sortOrder.nullOrdering == NullOrdering.NULLS_FIRST
OrderExpr(toClickHouse(sortOrder.expression), asc, nullFirst)
// transform
case YearsTransform(FieldReference(Seq(col))) => FuncExpr("toYear", List(FieldRef(col)))
case MonthsTransform(FieldReference(Seq(col))) => FuncExpr("toYYYYMM", List(FieldRef(col)))
case DaysTransform(FieldReference(Seq(col))) => FuncExpr("toYYYYMMDD", List(FieldRef(col)))
case HoursTransform(FieldReference(Seq(col))) => FuncExpr("toHour", List(FieldRef(col)))
case IdentityTransform(fieldRefs) => FieldRef(fieldRefs.describe)
case ApplyTransform(name, args) => FuncExpr(name, args.map(arg => SQLExpr(arg.describe())).toList)
case bucket: BucketTransform => throw CHClientException(s"Bucket transform not support yet: $bucket")
case other: Transform => throw CHClientException(s"Unsupported transform: $other")
// others
case l: Literal[_] => SQLExpr(l.toString)
case FieldReference(Seq(col)) => FieldRef(col)
case gse: GeneralScalarExpression => SQLExpr(gse.toString) // TODO: excluding unsupported
// unsupported
case unsupported: V2Expression => throw CHClientException(s"Unsupported expression: $unsupported")
}

def inferTransformSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ package xenon.clickhouse.read

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.clickhouse.ClickHouseSQLConf._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.clickhouse.ExprUtils
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
import org.apache.spark.sql.connector.expressions.{SortOrder, Transform}
import org.apache.spark.sql.connector.metric.CustomMetric
import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.connector.read.partitioning.{Partitioning, UnknownPartitioning}
Expand All @@ -38,6 +39,7 @@ class ClickHouseScanBuilder(
metadataSchema: StructType,
partitionTransforms: Array[Transform]
) extends ScanBuilder
with SupportsPushDownTopN
with SupportsPushDownLimit
with SupportsPushDownFilters
with SupportsPushDownAggregates
Expand All @@ -56,13 +58,25 @@ class ClickHouseScanBuilder(
physicalSchema.fields ++ reservedMetadataSchema.fields
)

private var _orders: Option[String] = None

private var _limit: Option[Int] = None

override def pushLimit(limit: Int): Boolean = {
this._limit = Some(limit)
true
}

override def pushTopN(orders: Array[SortOrder], limit: Int): Boolean = {
val translated = orders.map(sortOrder => ExprUtils.toClickHouseOpt(sortOrder))
if (translated.exists(_.isEmpty)) {
return false
}
this._orders = Some(translated.flatten.mkString(" "))
this._limit = Some(limit)
true
}

private var _pushedFilters = Array.empty[Filter]

override def pushedFilters: Array[Filter] = this._pushedFilters
Expand Down Expand Up @@ -121,6 +135,7 @@ class ClickHouseScanBuilder(
readSchema = _readSchema,
filtersExpr = compileFilters(AlwaysTrue :: pushedFilters.toList),
groupByClause = _groupByClause,
orderByClause = _orders.map(_.mkString("ORDER BY", " ", "")),
limit = _limit
))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ abstract class ClickHouseReader[Record](
|FROM `$database`.`$table`
|WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr})
|${scanJob.groupByClause.getOrElse("")}
|${scanJob.orderByClause.getOrElse("")}
|${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")}
|""".stripMargin
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ case class ScanJobDescription(
// into Scan tasks because the check happens in planing phase on driver side.
filtersExpr: String = "1=1",
groupByClause: Option[String] = None,
orderByClause: Option[String] = None,
limit: Option[Int] = None
) {

Expand Down

0 comments on commit 9ca4bfb

Please sign in to comment.