Skip to content

Commit

Permalink
KE-42300 optimize split strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
Yu Gan committed Oct 11, 2023
1 parent 2f0808b commit 011c30d
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 272 deletions.
12 changes: 2 additions & 10 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.rdd

import java.util.Random

import scala.collection.{mutable, Iterator, Map}
import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
import scala.io.Codec
import scala.language.implicitConversions
Expand All @@ -45,7 +45,7 @@ import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
import org.apache.spark.util.{BoundedPriorityQueue, LongSliceIterator, Utils}
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap, Utils => collectionUtils}
import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler, SamplingUtils}

Expand Down Expand Up @@ -336,14 +336,6 @@ abstract class RDD[T: ClassTag](
}
}

def sliceIterator(from: Long,
until: Long,
split: Partition,
context: TaskContext): Iterator[T] = {
val iter = iterator(split, context)
new LongSliceIterator(iter, from, until)
}

/**
* Return the ancestors of the given RDD that are related to it only through a sequence of
* narrow dependencies. This traverses the given RDD's dependency tree using DFS, but maintains
Expand Down
60 changes: 0 additions & 60 deletions core/src/main/scala/org/apache/spark/util/LongSliceIterator.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -557,34 +557,27 @@ object SQLConf {
.checkValue(_ > 0, "advisoryPartitionSizeInBytes must be positive")
.createWithDefaultString("64MB")

val SPLIT_SOURCE_PARTITION_ENABLED = buildConf("spark.sql.splitSourcePartition.enabled")
.doc("When true, split source partition.")
.version("3.2.0")
.booleanConf
.createWithDefault(false)
val SPLIT_SOURCE_PARTITION_ENABLED =
buildConf("spark.sql.splitSourcePartition.enabled")
.doc("When true, split source partition.")
.version("3.2.0")
.booleanConf
.createWithDefault(false)

val SPLIT_SOURCE_PARTITION_PREFERSHUFFLE =
buildConf("spark.sql.splitSourcePartition.preferShuffle")
.doc("Rebalance partitions preferring shuffle, with internal row skipping as the alternative.")
.version("3.2.0")
.booleanConf
.createWithDefault(true)
val SPLIT_SOURCE_PARTITION_THRESHOLD =
buildConf("spark.sql.splitSourcePartition.thresholdInBytes")
.doc("A partition is considered to be split.")
.version("3.0.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1MB")

val SPLIT_SOURCE_PARTITION_ROWCOUNT =
buildConf("spark.sql.splitSourcePartition.rowCount")
.internal()
.doc("The suggestion row count of split partition.")
val SPLIT_SOURCE_PARTITION_EXPANDNUM =
buildConf("spark.sql.splitSourcePartition.expandNum")
.doc("The suggestion partition num to split.")
.version("3.2.0")
.longConf
.checkValue(_ > 0, "The row count must be positive")
.createWithDefault(20000L)

val SPLIT_SOURCE_PARTITION_NUM = buildConf("spark.sql.splitSourcePartition.num")
.doc("The suggestion partition num to split.")
.version("3.2.0")
.intConf
.checkValue(v => v > 0, "The partition number must be a positive integer.")
.createWithDefault(10)
.intConf
.checkValue(v => v > 0, "The partition number must be a positive integer.")
.createWithDefault(10)

val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
.doc("When true, enable adaptive query execution, which re-optimizes the query plan in the " +
Expand Down Expand Up @@ -3898,11 +3891,9 @@ class SQLConf extends Serializable with Logging {

def splitSourcePartitionEnabled: Boolean = getConf(SPLIT_SOURCE_PARTITION_ENABLED)

def splitSourcePartitionPreferShuffle: Boolean = getConf(SPLIT_SOURCE_PARTITION_PREFERSHUFFLE)

def splitSourcePartitionRowCount: Long = getConf(SPLIT_SOURCE_PARTITION_ROWCOUNT)
def splitSourcePartitionThreshold: Long = getConf(SPLIT_SOURCE_PARTITION_THRESHOLD)

def splitSourcePartitionNum: Int = getConf(SPLIT_SOURCE_PARTITION_NUM)
def splitSourcePartitionExpandNum: Int = getConf(SPLIT_SOURCE_PARTITION_EXPANDNUM)

def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ trait DataSourceScanExec extends LeafExecNode {
s"Scan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}"
}

override def isSplittableScan: Boolean = true

// Metadata that describes more details of this scan.
protected def metadata: Map[String, String]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
*/
def supportsColumnar: Boolean = false


def isSplittableScan: Boolean = false

/**
* The exact java types of the columns that are output in columnar processing mode. This
* is a performance optimization for code generation and is optional.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,9 @@ class FileScanRDD(
expectedTargets
}
}

def partitionFilesTotalLength: Long = {
filePartitions.map(_.files.map(_.length).sum).sum
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,30 @@

package org.apache.spark.sql.execution.split

import scala.annotation.tailrec
import scala.reflect.ClassTag

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryExecNode}
import org.apache.spark.sql.execution.datasources.FileScanRDD
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}

case class SplitExec(rowCount: Long, partitionNum: Int, preferShuffle: Boolean, child: SparkPlan)
extends UnaryExecNode {
case class SplitExec(expandNum: Int, threshSize: Long, child: SparkPlan) extends UnaryExecNode {

/**
* @return All metrics containing metrics of this SparkPlan.
*/
override lazy val metrics: Map[String, SQLMetric] = Map(
"originPartNum" -> SQLMetrics.createMetric(sparkContext, "origin partition num"),
"expandPartNum" -> SQLMetrics.createMetric(sparkContext, "expand partition num"))

/**
* Returns the name of this type of TreeNode. Defaults to the class name.
* Note that we remove the "Exec" suffix for physical operators here.
*/
override def nodeName: String = "SplitSourcePartition"

override def output: Seq[Attribute] = child.output

Expand All @@ -42,20 +57,38 @@ case class SplitExec(rowCount: Long, partitionNum: Int, preferShuffle: Boolean,
}

private def doSplit[U: ClassTag](prev: RDD[U]): RDD[U] = {
val parallelism = sparkContext.defaultParallelism
val partNum = (sparkContext.defaultMinPartitions max
(parallelism / partitionNum)) max (parallelism min partitionNum)
if (prev.getNumPartitions < partNum) {
if (preferShuffle) {
// Currently not supported: supportsColumnar, because `ColumnarBatch` is not serializable.
// But maybe we can fix this in the future.
prev.coalesce(partNum, shuffle = true)
} else {
new SplitRDD[U](prev, rowCount, partNum)
val sourceSize = getSourceSize(prev)
sourceSize
.map { size =>
if (threshSize <= size) {
val prevPartNum = prev.getNumPartitions
val parallelism = sparkContext.defaultParallelism
val expandPartNum = (prevPartNum << 1) max expandNum
val partNum = (parallelism / expandPartNum) max (parallelism min expandPartNum)
if ((prevPartNum << 1) <= partNum) {
metrics("originPartNum").add(prevPartNum)
metrics("expandPartNum").add(partNum)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
prev.coalesce(partNum, shuffle = true)
} else {
// If expansion is small scale, split will not be profitable.
prev
}
} else {
// If source size is small, split will not be profitable.
prev
}
}
} else {
prev
}
.getOrElse(prev)
}

@tailrec
private def getSourceSize[U: ClassTag](prev: RDD[U]): Option[Long] =
prev match {
case f: FileScanRDD => Some(f.partitionFilesTotalLength)
case r if r.dependencies.isEmpty => None
case other => getSourceSize(other.firstParent)
}

}
Loading

0 comments on commit 011c30d

Please sign in to comment.