diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 95a236056d55f..24a98645823d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -235,6 +235,18 @@ object SQLConf { .intConf .createWithDefault(4) + val LIMIT_FETCHER_FACTOR = buildConf("spark.sql.limit.fetchFactor") + .internal() + .doc("If source row count are few, we can fetch all result.") + .doubleConf + .createWithDefault(0.01) + + val LIMIT_MAX_FETCHER_ROWS = buildConf("spark.sql.limit.maxFetchRows") + .internal() + .doc("All result rows should less than this") + .longConf + .createWithDefault(2000000) + val ADVANCED_PARTITION_PREDICATE_PUSHDOWN = buildConf("spark.sql.hive.advancedPartitionPredicatePushdown.enabled") .internal() @@ -1769,6 +1781,10 @@ class SQLConf extends Serializable with Logging { def limitScaleUpFactor: Int = getConf(LIMIT_SCALE_UP_FACTOR) + def limitFetchFactor: Double = getConf(LIMIT_FETCHER_FACTOR) + + def maxFetchRows: Long = getConf(LIMIT_MAX_FETCHER_ROWS) + def advancedPartitionPredicatePushdownEnabled: Boolean = getConf(ADVANCED_PARTITION_PREDICATE_PUSHDOWN) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 2ad49e7b764e9..e627ae2cd5302 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -340,6 +340,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ val buf = new ArrayBuffer[InternalRow] val totalParts = childRDD.partitions.length + var partsScanned = 0 while (buf.size < n && partsScanned < totalParts) { // The number of partitions to try in this iteration. It is ok for this number to be @@ -360,7 +361,18 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } } - val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) + val p = sparkContext.getLocalProperty("source_scan_rows") match { + case x if x != null && x.toLong > 0 => + val estimateRows = x. toLong * sqlContext.conf.limitFetchFactor + if (estimateRows < n && estimateRows < sqlContext.conf.maxFetchRows) { + logInfo(s"Estimate Rows is $estimateRows, Scan all partition.") + partsScanned.until(totalParts.toInt) + } else { + partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) + } + case _ => + partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) + } val sc = sqlContext.sparkContext val res = sc.runJob(childRDD, (it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else Array.empty[Byte], p) @@ -369,7 +381,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ partsScanned += p.size } - + logInfo(s"Result size is ${buf.size}") if (buf.size > n) { buf.take(n).toArray } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index ded8dd30dc49e..1356a1f3ee592 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -125,8 +125,8 @@ trait CodegenSupport extends SparkPlan { ctx.currentVars = colVars val ev = GenerateUnsafeProjection.createCode(ctx, colExprs, false) val code = code""" - |$evaluateInputs - |${ev.code} + |$evaluateInputs + |${ev.code} """.stripMargin ExprCode(code, FalseLiteral, ev.value) } else { @@ -181,7 +181,7 @@ trait CodegenSupport extends SparkPlan { val requireAllOutput = output.forall(parent.usedInputs.contains(_)) val paramLength = CodeGenerator.calculateParamLength(output) + (if (row != null) 1 else 0) val consumeFunc = if (confEnabled && requireAllOutput - && CodeGenerator.isValidParamLength(paramLength)) { + && CodeGenerator.isValidParamLength(paramLength)) { constructDoConsumeFunction(ctx, inputVars, row) } else { parent.doConsume(ctx, inputVars, rowVar) @@ -198,9 +198,9 @@ trait CodegenSupport extends SparkPlan { * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. */ private def constructDoConsumeFunction( - ctx: CodegenContext, - inputVars: Seq[ExprCode], - row: String): String = { + ctx: CodegenContext, + inputVars: Seq[ExprCode], + row: String): String = { val (args, params, inputVarsInFunc) = constructConsumeParameters(ctx, output, inputVars, row) val rowVar = prepareRowVar(ctx, row, inputVarsInFunc) @@ -225,10 +225,10 @@ trait CodegenSupport extends SparkPlan { * And also returns the list of `ExprCode` for the parameters. */ private def constructConsumeParameters( - ctx: CodegenContext, - attributes: Seq[Attribute], - variables: Seq[ExprCode], - row: String): (Seq[String], Seq[String], Seq[ExprCode]) = { + ctx: CodegenContext, + attributes: Seq[Attribute], + variables: Seq[ExprCode], + row: String): (Seq[String], Seq[String], Seq[ExprCode]) = { val arguments = mutable.ArrayBuffer[String]() val parameters = mutable.ArrayBuffer[String]() val paramVars = mutable.ArrayBuffer[ExprCode]() @@ -274,9 +274,9 @@ trait CodegenSupport extends SparkPlan { * of evaluated variables, to prevent them to be evaluated twice. */ protected def evaluateRequiredVariables( - attributes: Seq[Attribute], - variables: Seq[ExprCode], - required: AttributeSet): String = { + attributes: Seq[Attribute], + variables: Seq[ExprCode], + required: AttributeSet): String = { val evaluateVars = new StringBuilder variables.zipWithIndex.foreach { case (ev, i) => if (ev.code.nonEmpty && required.contains(attributes(i))) { @@ -393,12 +393,12 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp } override def generateTreeString( - depth: Int, - lastChildren: Seq[Boolean], - builder: StringBuilder, - verbose: Boolean, - prefix: String = "", - addSuffix: Boolean = false): StringBuilder = { + depth: Int, + lastChildren: Seq[Boolean], + builder: StringBuilder, + verbose: Boolean, + prefix: String = "", + addSuffix: Boolean = false): StringBuilder = { child.generateTreeString(depth, lastChildren, builder, verbose, "") } @@ -502,7 +502,7 @@ object WholeStageCodegenId { * used to generated code for [[BoundReference]]. */ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) - extends UnaryExecNode with CodegenSupport { + extends UnaryExecNode with CodegenSupport { override def output: Seq[Attribute] = child.output @@ -545,9 +545,9 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) } ${ctx.registerComment( - s"""Codegend pipeline for stage (id=$codegenStageId) - |${this.treeString.trim}""".stripMargin, - "wsc_codegenPipeline")} + s"""Codegend pipeline for stage (id=$codegenStageId) + |${this.treeString.trim}""".stripMargin, + "wsc_codegenPipeline")} ${ctx.registerComment(s"codegenStageId=$codegenStageId", "wsc_codegenStageId", true)} final class $className extends ${classOf[BufferedRowIterator].getName} { @@ -663,18 +663,18 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) "" } s""" - |${row.code} - |append(${row.value}$doCopy); + |${row.code} + |append(${row.value}$doCopy); """.stripMargin.trim } override def generateTreeString( - depth: Int, - lastChildren: Seq[Boolean], - builder: StringBuilder, - verbose: Boolean, - prefix: String = "", - addSuffix: Boolean = false): StringBuilder = { + depth: Int, + lastChildren: Seq[Boolean], + builder: StringBuilder, + verbose: Boolean, + prefix: String = "", + addSuffix: Boolean = false): StringBuilder = { child.generateTreeString(depth, lastChildren, builder, verbose, s"*($codegenStageId) ") }