Skip to content

Commit

Permalink
address review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: cindyyuanjiang <[email protected]>
  • Loading branch information
cindyyuanjiang committed Oct 30, 2024
1 parent 6931086 commit 2dcdb9b
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids.tool.analysis

import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticMetricsProfileResult}
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult}

/**
* The result of the aggregation of the raw metrics. It contains the aggregated metrics for an
Expand All @@ -42,4 +42,4 @@ case class AggRawMetricsResult(
ioAggs: Seq[IOAnalysisProfileResult],
sqlDurAggs: Seq[SQLDurationExecutorTimeProfileResult],
maxTaskInputSizes: Seq[SQLMaxTaskInputSizes],
stageDiagnostics: Seq[StageDiagnosticMetricsProfileResult])
stageDiagnostics: Seq[StageDiagnosticResult])
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.nvidia.spark.rapids.tool.analysis

import scala.collection.mutable.{AbstractSet, ArrayBuffer, HashMap, LinkedHashSet}

import com.nvidia.spark.rapids.tool.analysis.AnalysisUtils._
import com.nvidia.spark.rapids.tool.planparser.SQLPlanParser
import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults}
import com.nvidia.spark.rapids.tool.qualification.QualSQLPlanAnalyzer
Expand All @@ -44,7 +45,6 @@ import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph
* @param app the Application info objects that contains the SQL plans to be processed
*/
class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(app) {
val GPU_SEMAPHORE_WAIT_METRIC_NAME = "gpuSemaphoreWait"
// A map between (SQL ID, Node ID) and the set of stage IDs
// TODO: The Qualification should use this map instead of building a new set for each exec.
private val sqlPlanNodeIdToStageIds: HashMap[(Long, Long), Set[Int]] =
Expand All @@ -59,8 +59,31 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
var allSQLMetrics: ArrayBuffer[SQLMetricInfoCase] = ArrayBuffer[SQLMetricInfoCase]()
// A map between stage ID and the set of node names
val stageToNodeNames: HashMap[Long, Seq[String]] = HashMap.empty[Long, Seq[String]]
// A map between stage ID and total GPU semaphore wait time
val stageToGpuSemaphoreWaitTime: HashMap[Long, Long] = HashMap.empty[Long, Long]
// A map between stage ID and diagnostic metrics values
val stageToDiagnosticMetrics: HashMap[Long, HashMap[String, StatisticsMetrics]] =
HashMap.empty[Long, HashMap[String, StatisticsMetrics]]

/**
* Check if the input is one of the diagnostic metric names and update the mapping between
* stage ID and diagnostic metrics.
* @param stageId stage ID of the metric
* @param metricName name of the metric
* @param statistics contains min, median, max and total of the input metric
*/
private def updateStageDiagnosticMetrics(stageId: Long, metricName: String,
statistics: StatisticsMetrics): Unit = {
metricName match {
case MEMORY_SPILLED_METRIC | DISK_SPILLED_METRIC | INPUT_BYTES_READ_METRIC |
OUTPUT_BYTES_WRITTEN_METRIC | SW_TOTAL_BYTES_METRIC | SR_FETCH_WAIT_TIME_METRIC |
SW_WRITE_TIME_METRIC | GPU_SEMAPHORE_WAIT_METRIC => {
if (!stageToDiagnosticMetrics.contains(stageId)) {
stageToDiagnosticMetrics(stageId) = HashMap.empty[String, StatisticsMetrics]
}
stageToDiagnosticMetrics(stageId)(metricName) = statistics
}
case _ => ()
}
}

/**
* Connects Operators to Stages using AccumulatorIDs.
Expand Down Expand Up @@ -345,9 +368,8 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
} else {
taskUpatesSubset(taskUpatesSubset.size / 2)
}
if (accumInfo.infoRef.getName.contains(GPU_SEMAPHORE_WAIT_METRIC_NAME)) {
stageToGpuSemaphoreWaitTime(stageId) = sum
}
updateStageDiagnosticMetrics(stageId, accumInfo.infoRef.getName,
StatisticsMetrics(min, median, max, sum))
Some(AccumProfileResults(
appIndex,
stageId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import java.util.concurrent.TimeUnit

import scala.collection.mutable

import com.nvidia.spark.rapids.tool.analysis.AnalysisUtils._
import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticMetricsProfileResult}
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult}

import org.apache.spark.sql.rapids.tool.{AppBase, ToolUtils}
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
Expand Down Expand Up @@ -328,78 +329,77 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
*/
def aggregateDiagnosticSparkMetricsByStage(index: Int,
analyzerInput: Option[AppSQLPlanAnalyzer] = None):
Seq[StageDiagnosticMetricsProfileResult] = {
Seq[StageDiagnosticResult] = {
def bytesToMB(numBytes: Long): Long = numBytes / (1024 * 1024)
def nanoToMilliSec(numNano: Long): Long = numNano / 1000000

val sqlAnalyzer = analyzerInput match {
case Some(res) => res
case None => app.asInstanceOf[ApplicationInfo].planMetricProcessor
}

// TODO: this has stage attempts. we should handle different attempts
app.stageManager.getAllStages.map { sm =>
// TODO: Should we only consider successful tasks?
val tasksInStage = app.taskManager.getTasks(sm.stageInfo.stageId,
sm.stageInfo.attemptNumber())
// count duplicate task attempts
val numAttempts = tasksInStage.size
val (diskSpilledMin, diskSpilledMed, diskSpilledMax, diskSpilledSum) =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.diskBytesSpilled))
val (memSpilledMin, memSpilledMed, memSpilledMax, memSpilledSum) =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.memoryBytesSpilled))
val (inputBytesMin, inputBytesMed, inputBytesMax, inputBytesSum) =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.input_bytesRead))
val (ouputBytesMin, ouputBytesMed, ouputBytesMax, ouputBytesSum) =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.output_bytesWritten))
val (srBytesMin, srBytesMed, srBytesMax, srBytesSum) =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sr_totalBytesRead))
val (swBytesMin, swBytesMed, swBytesMax, swBytesSum) =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sw_bytesWritten))
val (srFetchWaitTimeMin, srFetchWaitTimeMed, srFetchWaitTimeMax, srFetchWaitTimeSum) =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sr_fetchWaitTime))
val (swWriteTimeMin, swWriteTimeMed, swWriteTimeMax, swWriteTimeSum) =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sw_writeTime))
val nodeNames = sqlAnalyzer.stageToNodeNames.
getOrElse(sm.stageInfo.stageId, Seq.empty[String])
val gpuSemaphoreWaitSum = sqlAnalyzer.stageToGpuSemaphoreWaitTime.
getOrElse(sm.stageInfo.stageId, 0L)
StageDiagnosticMetricsProfileResult(index,
val disgnosticMetrics = sqlAnalyzer.stageToDiagnosticMetrics.
getOrElse(sm.stageInfo.stageId, mutable.HashMap.empty[String, StatisticsMetrics])
val zeroStats = StatisticsMetrics.ZERO_RECORD
val memorySpilledMetrics = disgnosticMetrics.getOrElse(MEMORY_SPILLED_METRIC, zeroStats)
val diskSpilledMetrics = disgnosticMetrics.getOrElse(DISK_SPILLED_METRIC, zeroStats)
val inputBytesReadMetrics = disgnosticMetrics.getOrElse(INPUT_BYTES_READ_METRIC, zeroStats)
val outputBytesMetrics = disgnosticMetrics.getOrElse(OUTPUT_BYTES_WRITTEN_METRIC, zeroStats)
val swTotalBytesMetrics = disgnosticMetrics.getOrElse(SW_TOTAL_BYTES_METRIC, zeroStats)
val srFetchWaitTimeMetrics = disgnosticMetrics.getOrElse(SR_FETCH_WAIT_TIME_METRIC, zeroStats)
val swWriteTimeMetrics = disgnosticMetrics.getOrElse(SW_WRITE_TIME_METRIC, zeroStats)
val gpuSemaphoreMetrics = disgnosticMetrics.getOrElse(GPU_SEMAPHORE_WAIT_METRIC, zeroStats)
val (srBytesMin, srBytesMed, srBytesMax, srBytesSum) =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sr_totalBytesRead))

StageDiagnosticResult(index,
app.getAppName,
app.appId,
sm.stageInfo.stageId,
sm.duration,
numAttempts, // TODO: why is this numAttempts and not numTasks?
bytesToMB(memSpilledMin),
bytesToMB(memSpilledMed),
bytesToMB(memSpilledMax),
bytesToMB(memSpilledSum),
bytesToMB(diskSpilledMin),
bytesToMB(diskSpilledMed),
bytesToMB(diskSpilledMax),
bytesToMB(diskSpilledSum),
inputBytesMin,
inputBytesMed,
inputBytesMax,
inputBytesSum,
ouputBytesMin,
ouputBytesMed,
ouputBytesMax,
ouputBytesSum,
bytesToMB(memorySpilledMetrics.min),
bytesToMB(memorySpilledMetrics.med),
bytesToMB(memorySpilledMetrics.max),
bytesToMB(memorySpilledMetrics.total),
bytesToMB(diskSpilledMetrics.min),
bytesToMB(diskSpilledMetrics.med),
bytesToMB(diskSpilledMetrics.max),
bytesToMB(diskSpilledMetrics.total),
inputBytesReadMetrics.min,
inputBytesReadMetrics.med,
inputBytesReadMetrics.max,
inputBytesReadMetrics.total,
outputBytesMetrics.min,
outputBytesMetrics.med,
outputBytesMetrics.max,
outputBytesMetrics.total,
srBytesMin,
srBytesMed,
srBytesMax,
srBytesSum,
swBytesMin,
swBytesMed,
swBytesMax,
swBytesSum,
srFetchWaitTimeMin,
srFetchWaitTimeMed,
srFetchWaitTimeMax,
srFetchWaitTimeSum,
swWriteTimeMin,
swWriteTimeMed,
swWriteTimeMax,
swWriteTimeSum,
gpuSemaphoreWaitSum,
swTotalBytesMetrics.min,
swTotalBytesMetrics.med,
swTotalBytesMetrics.max,
swTotalBytesMetrics.total,
nanoToMilliSec(srFetchWaitTimeMetrics.min),
nanoToMilliSec(srFetchWaitTimeMetrics.med),
nanoToMilliSec(srFetchWaitTimeMetrics.max),
nanoToMilliSec(srFetchWaitTimeMetrics.total),
nanoToMilliSec(swWriteTimeMetrics.min),
nanoToMilliSec(swWriteTimeMetrics.med),
nanoToMilliSec(swWriteTimeMetrics.max),
nanoToMilliSec(swWriteTimeMetrics.total),
gpuSemaphoreMetrics.total,
nodeNames)
}.toSeq
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ case class ApplicationSummaryInfo(
sysProps: Seq[RapidsPropertyProfileResult],
sqlCleanedAlignedIds: Seq[SQLCleanAndAlignIdsProfileResult],
sparkRapidsBuildInfo: Seq[SparkRapidsBuildInfoEvent],
stageDiagnostics: Seq[StageDiagnosticMetricsProfileResult])
stageDiagnostics: Seq[StageDiagnosticResult])

trait AppInfoPropertyGetter {
// returns all the properties (i.e., spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ case class StageAggTaskMetricsProfileResult(
override def idHeader = "stageId"
}

case class StageDiagnosticMetricsProfileResult(
case class StageDiagnosticResult(
appIndex: Int,
appName: String,
appId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids.tool.views

import com.nvidia.spark.rapids.tool.profiling.{BaseJobStageAggTaskMetricsProfileResult, IOAnalysisProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLTaskAggMetricsProfileResult, StageDiagnosticMetricsProfileResult}
import com.nvidia.spark.rapids.tool.profiling.{BaseJobStageAggTaskMetricsProfileResult, IOAnalysisProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLTaskAggMetricsProfileResult, StageDiagnosticResult}

/**
* Contains the sort logic for the aggregated Spark RawMetrics.
Expand Down Expand Up @@ -93,8 +93,8 @@ object AggMetricsResultSorter {
}

def sortStageDiagnostics(
rows: Seq[StageDiagnosticMetricsProfileResult]):
Seq[StageDiagnosticMetricsProfileResult] = {
rows: Seq[StageDiagnosticResult]):
Seq[StageDiagnosticResult] = {
if (rows.isEmpty) {
Seq.empty
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.nvidia.spark.rapids.tool.views

import com.nvidia.spark.rapids.tool.analysis.ProfSparkMetricsAnalyzer
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticMetricsProfileResult}
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult}

import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo

Expand All @@ -31,7 +31,7 @@ case class ProfilerAggregatedView(
ioAggs: Seq[IOAnalysisProfileResult],
sqlDurAggs: Seq[SQLDurationExecutorTimeProfileResult],
maxTaskInputSizes: Seq[SQLMaxTaskInputSizes],
stageDiagnostics: Seq[StageDiagnosticMetricsProfileResult])
stageDiagnostics: Seq[StageDiagnosticResult])

object RawMetricProfilerView {
def getAggMetrics(apps: Seq[ApplicationInfo]): ProfilerAggregatedView = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
appIndex,appName,appId,stageId,stageDurationMs,numTasks,memoryBytesSpilledMBMin,memoryBytesSpilledMBMedian,memoryBytesSpilledMBMax,memoryBytesSpilledMBTotal,diskBytesSpilledMBMin,diskBytesSpilledMBMedian,diskBytesSpilledMBMax,diskBytesSpilledMBTotal,inputBytesReadMin,inputBytesReadMedian,inputBytesReadMax,inputBytesReadTotal,outputBytesWrittenMin,outputBytesWrittenMedian,outputBytesWrittenMax,outputBytesWrittenTotal,shuffleReadBytesMin,shuffleReadBytesMedian,shuffleReadBytesMax,shuffleReadBytesTotal,shuffleWriteBytesMin,shuffleWriteBytesMedian,shuffleWriteBytesMax,shuffleWriteBytesTotal,shuffleReadFetchWaitTimeMin,shuffleReadFetchWaitTimeMedian,shuffleReadFetchWaitTimeMax,shuffleReadFetchWaitTimeTotal,shuffleWriteWriteTimeMin,shuffleWriteWriteTimeMedian,shuffleWriteWriteTimeMax,shuffleWriteWriteTimeTotal,gpuSemaphoreWaitTimeTotal,SQL Nodes(IDs)
1,Spark shell,local-1622814619968,0,1743,6,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,6688608,6688702,6688825,40132250,0,0,0,0,41,60,100,397,0,"GpuColumnarExchange(16),GpuProject(17),GpuRowToColumnar(18),WholeStageCodegen (2)(19),Scan(21)"
1,Spark shell,local-1622814619968,1,1631,6,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,6688602,6688708,6688833,40132258,0,0,0,0,37,91,108,505,0,"GpuColumnarExchange(8),GpuProject(9),GpuRowToColumnar(10),WholeStageCodegen (1)(11),Scan(13)"
1,Spark shell,local-1622814619968,2,688,200,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,397220,401479,405854,80264508,77,77,77,15400,0,0,0,0,0,0,9,42,0,"GpuColumnarExchange(3),GpuHashAggregate(4),GpuProject(5),GpuShuffledHashJoin(6),GpuShuffleCoalesce(7),GpuColumnarExchange(8),GpuCoalesceBatches(14),GpuShuffleCoalesce(15),GpuColumnarExchange(16)"
1,Spark shell,local-1622814619968,0,1743,6,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,6688608,6688702,6688825,40132250,0,0,0,0,41,60,100,400,0,"GpuColumnarExchange(16),GpuProject(17),GpuRowToColumnar(18),WholeStageCodegen (2)(19),Scan(21)"
1,Spark shell,local-1622814619968,1,1631,6,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,6688602,6688708,6688833,40132258,0,0,0,0,37,92,108,508,0,"GpuColumnarExchange(8),GpuProject(9),GpuRowToColumnar(10),WholeStageCodegen (1)(11),Scan(13)"
1,Spark shell,local-1622814619968,2,688,200,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,397220,401479,405854,80264508,77,77,77,15400,0,0,0,0,0,0,9,93,0,"GpuColumnarExchange(3),GpuHashAggregate(4),GpuProject(5),GpuShuffledHashJoin(6),GpuShuffleCoalesce(7),GpuColumnarExchange(8),GpuCoalesceBatches(14),GpuShuffleCoalesce(15),GpuColumnarExchange(16)"
1,Spark shell,local-1622814619968,3,83,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,15400,15400,15400,15400,0,0,0,0,0,0,0,0,0,0,0,0,0,"GpuColumnarToRow(0),GpuHashAggregate(1),GpuShuffleCoalesce(2),GpuColumnarExchange(3)"
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ class AnalysisSuite extends FunSuite {
val apps = ToolTestUtils.processProfileApps(logs, sparkSession)
assert(apps.size == logs.size)

// This step is to compute stage to node names mapping
// This step is to compute stage to node names and diagnostic metrics mappings,
// which is used in collecting diagnostic metrics.
val collect = new CollectInformation(apps)
collect.getSQLToStage
collect.getStageLevelMetrics

val aggResults = RawMetricProfilerView.getAggMetrics(apps)
import org.apache.spark.sql.functions._
Expand Down

0 comments on commit 2dcdb9b

Please sign in to comment.