Skip to content

Commit

Permalink
#96 optimiize limit when query contain count distinct measure
Browse files Browse the repository at this point in the history
  • Loading branch information
hn5092 committed Jan 13, 2020
1 parent 8594290 commit 89ad847
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,18 @@ object SQLConf {
.intConf
.createWithDefault(4)

val LIMIT_FETCHER_FACTOR = buildConf("spark.sql.limit.fetchFactor")
.internal()
.doc("If source row count are few, we can fetch all result.")
.doubleConf
.createWithDefault(0.01)

val LIMIT_MAX_FETCHER_ROWS = buildConf("spark.sql.limit.maxFetchRows")
.internal()
.doc("All result rows should less than this")
.longConf
.createWithDefault(2000000)

val ADVANCED_PARTITION_PREDICATE_PUSHDOWN =
buildConf("spark.sql.hive.advancedPartitionPredicatePushdown.enabled")
.internal()
Expand Down Expand Up @@ -1769,6 +1781,10 @@ class SQLConf extends Serializable with Logging {

def limitScaleUpFactor: Int = getConf(LIMIT_SCALE_UP_FACTOR)

def limitFetchFactor: Double = getConf(LIMIT_FETCHER_FACTOR)

def maxFetchRows: Long = getConf(LIMIT_MAX_FETCHER_ROWS)

def advancedPartitionPredicatePushdownEnabled: Boolean =
getConf(ADVANCED_PARTITION_PREDICATE_PUSHDOWN)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ

val buf = new ArrayBuffer[InternalRow]
val totalParts = childRDD.partitions.length

var partsScanned = 0
while (buf.size < n && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
Expand All @@ -360,7 +361,18 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}
}

val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
val p = sparkContext.getLocalProperty("source_scan_rows") match {
case x if x != null && x.toLong > 0 =>
val estimateRows = x. toLong * sqlContext.conf.limitFetchFactor
if (estimateRows < n && estimateRows < sqlContext.conf.maxFetchRows) {
logInfo(s"Estimate Rows is $estimateRows, Scan all partition.")
partsScanned.until(totalParts.toInt)
} else {
partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
}
case _ =>
partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
}
val sc = sqlContext.sparkContext
val res = sc.runJob(childRDD,
(it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else Array.empty[Byte], p)
Expand All @@ -369,7 +381,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ

partsScanned += p.size
}

logInfo(s"Result size is ${buf.size}")
if (buf.size > n) {
buf.take(n).toArray
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ trait CodegenSupport extends SparkPlan {
ctx.currentVars = colVars
val ev = GenerateUnsafeProjection.createCode(ctx, colExprs, false)
val code = code"""
|$evaluateInputs
|${ev.code}
|$evaluateInputs
|${ev.code}
""".stripMargin
ExprCode(code, FalseLiteral, ev.value)
} else {
Expand Down Expand Up @@ -181,7 +181,7 @@ trait CodegenSupport extends SparkPlan {
val requireAllOutput = output.forall(parent.usedInputs.contains(_))
val paramLength = CodeGenerator.calculateParamLength(output) + (if (row != null) 1 else 0)
val consumeFunc = if (confEnabled && requireAllOutput
&& CodeGenerator.isValidParamLength(paramLength)) {
&& CodeGenerator.isValidParamLength(paramLength)) {
constructDoConsumeFunction(ctx, inputVars, row)
} else {
parent.doConsume(ctx, inputVars, rowVar)
Expand All @@ -198,9 +198,9 @@ trait CodegenSupport extends SparkPlan {
* parent's `doConsume` codes of a `CodegenSupport` operator into a function to call.
*/
private def constructDoConsumeFunction(
ctx: CodegenContext,
inputVars: Seq[ExprCode],
row: String): String = {
ctx: CodegenContext,
inputVars: Seq[ExprCode],
row: String): String = {
val (args, params, inputVarsInFunc) = constructConsumeParameters(ctx, output, inputVars, row)
val rowVar = prepareRowVar(ctx, row, inputVarsInFunc)

Expand All @@ -225,10 +225,10 @@ trait CodegenSupport extends SparkPlan {
* And also returns the list of `ExprCode` for the parameters.
*/
private def constructConsumeParameters(
ctx: CodegenContext,
attributes: Seq[Attribute],
variables: Seq[ExprCode],
row: String): (Seq[String], Seq[String], Seq[ExprCode]) = {
ctx: CodegenContext,
attributes: Seq[Attribute],
variables: Seq[ExprCode],
row: String): (Seq[String], Seq[String], Seq[ExprCode]) = {
val arguments = mutable.ArrayBuffer[String]()
val parameters = mutable.ArrayBuffer[String]()
val paramVars = mutable.ArrayBuffer[ExprCode]()
Expand Down Expand Up @@ -274,9 +274,9 @@ trait CodegenSupport extends SparkPlan {
* of evaluated variables, to prevent them to be evaluated twice.
*/
protected def evaluateRequiredVariables(
attributes: Seq[Attribute],
variables: Seq[ExprCode],
required: AttributeSet): String = {
attributes: Seq[Attribute],
variables: Seq[ExprCode],
required: AttributeSet): String = {
val evaluateVars = new StringBuilder
variables.zipWithIndex.foreach { case (ev, i) =>
if (ev.code.nonEmpty && required.contains(attributes(i))) {
Expand Down Expand Up @@ -393,12 +393,12 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp
}

override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
builder: StringBuilder,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false): StringBuilder = {
depth: Int,
lastChildren: Seq[Boolean],
builder: StringBuilder,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false): StringBuilder = {
child.generateTreeString(depth, lastChildren, builder, verbose, "")
}

Expand Down Expand Up @@ -502,7 +502,7 @@ object WholeStageCodegenId {
* used to generated code for [[BoundReference]].
*/
case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
extends UnaryExecNode with CodegenSupport {
extends UnaryExecNode with CodegenSupport {

override def output: Seq[Attribute] = child.output

Expand Down Expand Up @@ -545,9 +545,9 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
}

${ctx.registerComment(
s"""Codegend pipeline for stage (id=$codegenStageId)
|${this.treeString.trim}""".stripMargin,
"wsc_codegenPipeline")}
s"""Codegend pipeline for stage (id=$codegenStageId)
|${this.treeString.trim}""".stripMargin,
"wsc_codegenPipeline")}
${ctx.registerComment(s"codegenStageId=$codegenStageId", "wsc_codegenStageId", true)}
final class $className extends ${classOf[BufferedRowIterator].getName} {

Expand Down Expand Up @@ -663,18 +663,18 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
""
}
s"""
|${row.code}
|append(${row.value}$doCopy);
|${row.code}
|append(${row.value}$doCopy);
""".stripMargin.trim
}

override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
builder: StringBuilder,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false): StringBuilder = {
depth: Int,
lastChildren: Seq[Boolean],
builder: StringBuilder,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false): StringBuilder = {
child.generateTreeString(depth, lastChildren, builder, verbose, s"*($codegenStageId) ")
}

Expand Down

0 comments on commit 89ad847

Please sign in to comment.