Skip to content

Commit

Permalink
address review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: cindyyuanjiang <[email protected]>
  • Loading branch information
cindyyuanjiang committed Oct 30, 2024
1 parent 2dcdb9b commit ac162f0
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
* object to aggregate the Raw metrics and returns the result
* @param app the AppBase to be analyzed
* @param index the application index
* @param sqlAnalyzer optional AppSQLPlanAnalyzer used to aggregate diagnostic metrics,
* this is already present in ApplicationInfo for Profiler, but for
* Qualification this argument needs to be provided.
* @return a single record of AggRawMetricsResult containing all the raw aggregated Spark
* metrics
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,9 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
/**
* Aggregates the diagnostic SparkMetrics by stage.
* @param index the App-index (used by the profiler tool)
* @param analyzerInput optional AppSQLPlanAnalyzer which is used to pull stage level
* information like node names and diagnostic metrics results, only
* Qualification needs to provide this argument.
* @return sequence of StageDiagnosticAggTaskMetricsProfileResult
*/
def aggregateDiagnosticSparkMetricsByStage(index: Int,
Expand All @@ -335,7 +338,9 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {

val sqlAnalyzer = analyzerInput match {
case Some(res) => res
case None => app.asInstanceOf[ApplicationInfo].planMetricProcessor
case None =>
// for Profiler this is present in ApplicationInfo
app.asInstanceOf[ApplicationInfo].planMetricProcessor
}

// TODO: this has stage attempts. we should handle different attempts
Expand All @@ -358,7 +363,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
val srFetchWaitTimeMetrics = disgnosticMetrics.getOrElse(SR_FETCH_WAIT_TIME_METRIC, zeroStats)
val swWriteTimeMetrics = disgnosticMetrics.getOrElse(SW_WRITE_TIME_METRIC, zeroStats)
val gpuSemaphoreMetrics = disgnosticMetrics.getOrElse(GPU_SEMAPHORE_WAIT_METRIC, zeroStats)
val (srBytesMin, srBytesMed, srBytesMax, srBytesSum) =
val srTotalBytesMetrics =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sr_totalBytesRead))

StageDiagnosticResult(index,
Expand All @@ -383,10 +388,10 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
outputBytesMetrics.med,
outputBytesMetrics.max,
outputBytesMetrics.total,
srBytesMin,
srBytesMed,
srBytesMax,
srBytesSum,
srTotalBytesMetrics.min,
srTotalBytesMetrics.med,
srTotalBytesMetrics.max,
srTotalBytesMetrics.total,
swTotalBytesMetrics.min,
swTotalBytesMetrics.med,
swTotalBytesMetrics.max,
Expand Down Expand Up @@ -521,9 +526,9 @@ object AppSparkMetricsAnalyzer {
/**
* Given an input iterable, returns its min, median, max and sum.
*/
def getStatistics(arr: Iterable[Long]): (Long, Long, Long, Long) = {
def getStatistics(arr: Iterable[Long]): StatisticsMetrics = {
if (arr.isEmpty) {
(0L, 0L, 0L, 0L)
StatisticsMetrics(0L, 0L, 0L, 0L)
}
val sortedArr = arr.toSeq.sorted
val len = sortedArr.size
Expand All @@ -532,7 +537,7 @@ object AppSparkMetricsAnalyzer {
} else {
sortedArr(len / 2)
}
(sortedArr.head, med, sortedArr(len - 1), sortedArr.sum)
StatisticsMetrics(sortedArr.head, med, sortedArr(len - 1), sortedArr.sum)
}

def maxWithEmptyHandling(arr: Iterable[Long]): Long = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ package object views {
val IO_LABEL = "IO Metrics"
val SQL_DUR_LABEL = "SQL Duration and Executor CPU Time Percent"
val SQL_MAX_INPUT_SIZE = "SQL Max Task Input Size"
val STAGE_DIAGNOSTICS_LABEL = "Stage level diagnostic metrics"
val STAGE_DIAGNOSTICS_LABEL = "Stage Level Diagnostic Metrics"

val AGG_DESCRIPTION = Map(
STAGE_AGG_LABEL -> "Stage metrics",
Expand Down

0 comments on commit ac162f0

Please sign in to comment.