diff --git a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala index 2e243e4..7557f4a 100644 --- a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala +++ b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala @@ -223,8 +223,9 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { if (!stageMap.get(stageSubmitted.stageInfo.stageId).isDefined) { + val isSql = stageSubmitted.stageInfo.details.contains("org.apache.spark.sql") val stageTimeSpan = new StageTimeSpan(stageSubmitted.stageInfo.stageId, - stageSubmitted.stageInfo.numTasks) + stageSubmitted.stageInfo.numTasks, isSql) stageTimeSpan.setParentStageIDs(stageSubmitted.stageInfo.parentIds) if (stageSubmitted.stageInfo.submissionTime.isDefined) { stageTimeSpan.setStartTime(stageSubmitted.stageInfo.submissionTime.get) diff --git a/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala b/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala index 3b29893..0dd0d6e 100644 --- a/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala +++ b/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala @@ -81,7 +81,6 @@ class QuboleNotebookListener(sparkConf: SparkConf) extends QuboleJobListener(spa * @param applicationEnd */ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { - stageMap.map(x => x._2).foreach(x => x.tempTaskTimes.clear()) appInfo.endTime = applicationEnd.time } diff --git a/src/main/scala/com/qubole/sparklens/analyzer/StageSkewAnalyzer.scala b/src/main/scala/com/qubole/sparklens/analyzer/StageSkewAnalyzer.scala index cadba0b..0cff55c 100644 --- a/src/main/scala/com/qubole/sparklens/analyzer/StageSkewAnalyzer.scala +++ b/src/main/scala/com/qubole/sparklens/analyzer/StageSkewAnalyzer.scala @@ -34,6 +34,17 @@ class StageSkewAnalyzer extends AppAnalyzer { out.toString() } + override def analyzeAndSuggest(appContext: AppContext, startTime: Long, endTime: Long): + (String, Map[String, String]) = { + // Only make suggestions on a "full" window + if (appContext.appInfo.startTime == startTime && + appContext.appInfo.endTime == endTime) { + (analyze(appContext, startTime, endTime), computeSuggestions(appContext)) + } else { + (analyze(appContext, startTime, endTime), Map.empty[String, String]) + } + } + def bytesToString(size: Long): String = { val TB = 1L << 40 @@ -55,7 +66,70 @@ class StageSkewAnalyzer extends AppAnalyzer { "%.1f %s".formatLocal(Locale.US, value, unit) } + def computeSuggestions(ac: AppContext): Map[String, String] = { + val conf = ac.initialSparkProperties.getOrElse { + Map.empty[String, String] + } + var suggested = new mutable.HashMap[String, String] + val shufflePartitions = conf.getOrElse("spark.sql.shuffle.partitions", "200").toInt + val aqeCoalesce = conf.getOrElse("spark.sql.adaptive.coalescePartitions.enabled", "true") + // In theory if the compute time for stages is generally "large" & skew is low + // we can suggest increasing the parallelism especially if it's bellow max execs* + // For now we want to filter for stages which are + // SQL shuffle reads (where the shuffle.partitions config is the one we change) + val newScaleFactor = ac.stageMap.values.map ( + v => { + val numTasks = v.taskExecutionTimes.length + val minTime = v.taskExecutionTimes.min + val maxTime = v.taskExecutionTimes.max + // If this is less than 10% diff between min and max (e.g. limited skew) & it's "slow" + // consider increasing the number of partitions. + // We also check that numTasks is ~= shuffle partitions + // otherwise it's probably being configured through AQE target size / coalesce. + if (minTime - maxTime < 0.1 * maxTime && + v.sqlTask && + Math.abs(numTasks - shufflePartitions) < 10) { + // Try and figure out how many tasks to add to reach ~10 minutes + Math.max(Math.min((minTime / 60000), 4.0), 1.0) + } else { + 1.0 + } + } + ).max + if (newScaleFactor > 1.0) { + suggested += (("spark.sql.shuffle.partitions", + (newScaleFactor * shufflePartitions).toInt.toString)) + } + + // Future: We might want to suggest increasing both max execs & parallelism if they're equal. + + // We also may want to suggest turning of coalesce in AQE if we see "slow" stages + // with small data. + // TODO: Verify the AQE ran on that particular stage, right now we assume if we've got + // a shuffle read and num tasks is less than the default shuffle partitions then + // it's AQEs doing. + val problamaticCoalesces = ac.stageMap.values.find ( + v => { + val minTaskLength = v.taskExecutionTimes.min + val numTasks = v.taskExecutionTimes.length + if (v.sqlTask && v.shuffleRead && numTasks < shufflePartitions && + minTaskLength > 60000) { + true + } else { + false + } + } + ) + if (! problamaticCoalesces.isEmpty) { + suggested += (("spark.sql.adaptive.coalescePartitions.enabled", "false")) + } + suggested.toMap + } + def computePerStageEfficiencyStatistics(ac: AppContext, out: mutable.StringBuilder): Unit = { + val conf = ac.initialSparkProperties.getOrElse { + out.println("WARNING: No config found using empty config.") + } val totalTasks = ac.stageMap.map(x => x._2.taskExecutionTimes.length).sum diff --git a/src/main/scala/com/qubole/sparklens/timespan/StageTimeSpan.scala b/src/main/scala/com/qubole/sparklens/timespan/StageTimeSpan.scala index b2bc2d4..09baf41 100644 --- a/src/main/scala/com/qubole/sparklens/timespan/StageTimeSpan.scala +++ b/src/main/scala/com/qubole/sparklens/timespan/StageTimeSpan.scala @@ -29,9 +29,12 @@ import scala.collection.mutable This keeps track of data per stage */ -class StageTimeSpan(val stageID: Int, numberOfTasks: Long) extends TimeSpan { +class StageTimeSpan(val stageID: Int, numberOfTasks: Long, val sqlTask: Boolean) extends TimeSpan { var stageMetrics = new AggregateMetrics() - var tempTaskTimes = new mutable.ListBuffer[( Long, Long, Long)] + // We actually want to keep the start times so we can suggest increasing + // max execs if we see very different start times since that can + // indicate a stall. + var taskTimes = new mutable.ListBuffer[( Long, Long, Long)] var minTaskLaunchTime = Long.MaxValue var maxTaskFinishTime = 0L var parentStageIDs:Seq[Int] = null @@ -40,6 +43,9 @@ class StageTimeSpan(val stageID: Int, numberOfTasks: Long) extends TimeSpan { var taskExecutionTimes = Array.emptyIntArray var taskPeakMemoryUsage = Array.emptyLongArray + // Is there any shuffle read happening + var shuffleRead = false + def updateAggregateTaskMetrics (taskMetrics: TaskMetrics, taskInfo: TaskInfo): Unit = { stageMetrics.update(taskMetrics, taskInfo) } @@ -49,14 +55,18 @@ class StageTimeSpan(val stageID: Int, numberOfTasks: Long) extends TimeSpan { } def updateTasks(taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit = { - if (taskInfo != null && taskMetrics != null) { - tempTaskTimes += ((taskInfo.taskId, taskMetrics.executorRunTime, taskMetrics.peakExecutionMemory)) + // Ignore speculative tasks + if (taskInfo != null && taskMetrics != null && !taskInfo.speculative) { + taskTimes += ((taskInfo.taskId, taskMetrics.executorRunTime, taskMetrics.peakExecutionMemory)) if (taskInfo.launchTime < minTaskLaunchTime) { minTaskLaunchTime = taskInfo.launchTime } if (taskInfo.finishTime > maxTaskFinishTime) { maxTaskFinishTime = taskInfo.finishTime } + if (taskMetrics.shuffleReadMetrics != null) { + shuffleRead = true + } } } @@ -65,33 +75,18 @@ class StageTimeSpan(val stageID: Int, numberOfTasks: Long) extends TimeSpan { setStartTime(minTaskLaunchTime) setEndTime(maxTaskFinishTime) - taskExecutionTimes = new Array[Int](tempTaskTimes.size) + taskExecutionTimes = new Array[Int](taskTimes.size) var currentIndex = 0 - tempTaskTimes.sortWith(( left, right) => left._1 < right._1) + taskTimes.sortWith(( left, right) => left._1 < right._1) .foreach( x => { taskExecutionTimes( currentIndex) = x._2.toInt currentIndex += 1 }) - val countPeakMemoryUsage = { - if (tempTaskTimes.size > 64) { - 64 - }else { - tempTaskTimes.size - } - } - - taskPeakMemoryUsage = tempTaskTimes + taskPeakMemoryUsage = taskTimes .map( x => x._3) - .sortWith( (a, b) => a > b) - .take(countPeakMemoryUsage).toArray - - /* - Clean the tempTaskTimes. We don't want to keep all this objects hanging around for - long time - */ - tempTaskTimes.clear() + .sortWith( (a, b) => a > b).toArray } override def getMap(): Map[String, _ <: Any] = { @@ -109,38 +104,3 @@ class StageTimeSpan(val stageID: Int, numberOfTasks: Long) extends TimeSpan { ) ++ super.getStartEndTime() } } - -object StageTimeSpan { - - def getTimeSpan(json: Map[String, JValue]): mutable.HashMap[Int, StageTimeSpan] = { - implicit val formats = DefaultFormats - - val map = new mutable.HashMap[Int, StageTimeSpan] - - json.keys.map(key => { - val value = json.get(key).get - val timeSpan = new StageTimeSpan( - (value \ "stageID").extract[Int], - (value \ "numberOfTasks").extract[Long] - ) - timeSpan.stageMetrics = AggregateMetrics.getAggregateMetrics((value \ "stageMetrics") - .extract[JValue]) - timeSpan.minTaskLaunchTime = (value \ "minTaskLaunchTime").extract[Long] - timeSpan.maxTaskFinishTime = (value \ "maxTaskFinishTime").extract[Long] - - - timeSpan.parentStageIDs = Json4sWrapper.parse((value \ "parentStageIDs").extract[String]).extract[List[Int]] - timeSpan.taskExecutionTimes = Json4sWrapper.parse((value \ "taskExecutionTimes").extract[String]) - .extract[List[Int]].toArray - - timeSpan.taskPeakMemoryUsage = Json4sWrapper.parse((value \ "taskPeakMemoryUsage").extract[String]) - .extract[List[Long]].toArray - - timeSpan.addStartEnd(value) - - map.put(key.toInt, timeSpan) - }) - map - } - -} diff --git a/src/test/scala/com/qubole/sparklens/analyzer/StageSkewAnalyzerSuite.scala b/src/test/scala/com/qubole/sparklens/analyzer/StageSkewAnalyzerSuite.scala new file mode 100644 index 0000000..a5b048e --- /dev/null +++ b/src/test/scala/com/qubole/sparklens/analyzer/StageSkewAnalyzerSuite.scala @@ -0,0 +1,107 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package com.qubole.sparklens.analyzer + +import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo} +import com.qubole.sparklens.timespan.{ExecutorTimeSpan, HostTimeSpan, JobTimeSpan, StageTimeSpan} +import com.qubole.sparklens.helper.JobOverlapHelper + +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.spark.SparkConf + +import scala.collection.mutable + +class StageSkewAnalyzerSuite extends AnyFunSuite { + + val startTime = 0 + val endTime = 60000000000L + // Make a non-SQL task + val stage1 = new StageTimeSpan(1, 1, false) + // SQL tasks + // stage 2 should impact the target number of partitions + val stage2 = new StageTimeSpan(2, 2, true) + // stage 3 should have us turn off AQE if present + val stage3 = new StageTimeSpan(3, 1, true) + stage1.shuffleRead = true + stage1.taskExecutionTimes = Array[Int](6000000) + stage2.shuffleRead = true + stage2.taskExecutionTimes = 0.to(200).map(x => 60000 * 2).toArray + stage3.shuffleRead = true + // One really long task + stage3.taskExecutionTimes = Array[Int](60000000) + + def createDummyAppContext(stageTimeSpans: mutable.HashMap[Int, StageTimeSpan]): AppContext = { + + val jobMap = new mutable.HashMap[Long, JobTimeSpan] + + val jobSQLExecIDMap = new mutable.HashMap[Long, Long] + + val execStartTimes = new mutable.HashMap[String, ExecutorTimeSpan]() + + val appInfo = new ApplicationInfo() + appInfo.startTime = startTime + appInfo.endTime = endTime + + + val conf = new SparkConf() + + new AppContext(appInfo, + new AggregateMetrics(), + mutable.HashMap[String, HostTimeSpan](), + mutable.HashMap[String, ExecutorTimeSpan](), + jobMap, + jobSQLExecIDMap, + stageTimeSpans, + mutable.HashMap[Int, Long](), + Some(conf.getAll.toMap)) + } + + test("Change number of partitions when not skewed but long") { + val stageTimeSpans = new mutable.HashMap[Int, StageTimeSpan] + stageTimeSpans(1) = stage1 + stageTimeSpans(2) = stage2 + val ac = createDummyAppContext(stageTimeSpans) + val sska = new StageSkewAnalyzer() + val suggestions = sska.computeSuggestions(ac) + assert(suggestions.get("spark.sql.adaptive.coalescePartitions.enabled") == None, + "Leave AQE on if we 'just' have long partitions") + assert(suggestions.get("spark.sql.shuffle.partitions") == Some("400"), + "We aim for lots of partitions") + } + + test("Turn off AQE if stage 3 is present") { + val stageTimeSpans = new mutable.HashMap[Int, StageTimeSpan] + stageTimeSpans(1) = stage1 + stageTimeSpans(3) = stage3 + val ac = createDummyAppContext(stageTimeSpans) + val sska = new StageSkewAnalyzer() + val suggestions = sska.computeSuggestions(ac) + assert(suggestions.get("spark.sql.adaptive.coalescePartitions.enabled") == Some("false"), + "Turn of AQE when bad coalesce occurs") + } + + test("stage 1 should do nothing") { + val stageTimeSpans = new mutable.HashMap[Int, StageTimeSpan] + stageTimeSpans(1) = stage1 + val ac = createDummyAppContext(stageTimeSpans) + val sska = new StageSkewAnalyzer() + val suggestions = sska.computeSuggestions(ac) + assert(suggestions == Map.empty[String, String], "Don't suggest on non-SQL stages") + } +} diff --git a/src/test/scala/com/qubole/sparklens/scheduler/PQParallelStageSchedulerSuite.scala b/src/test/scala/com/qubole/sparklens/scheduler/PQParallelStageSchedulerSuite.scala index fd109ef..94516d0 100644 --- a/src/test/scala/com/qubole/sparklens/scheduler/PQParallelStageSchedulerSuite.scala +++ b/src/test/scala/com/qubole/sparklens/scheduler/PQParallelStageSchedulerSuite.scala @@ -28,7 +28,7 @@ class PQParallelStageSchedulerSuite extends AnyFunSuite { def createStageTimeSpan(stageID: Int, taskCount: Int, minTaskLaunchTime: Long, maxTaskFinishTime: Long, parentStages: Seq[Int]): StageTimeSpan = { - val stageTimeSpan = new StageTimeSpan(stageID, taskCount) + val stageTimeSpan = new StageTimeSpan(stageID, taskCount, false) stageTimeSpan.minTaskLaunchTime = minTaskLaunchTime stageTimeSpan.maxTaskFinishTime = maxTaskFinishTime