From ac162f071e4f7099ee13bc262eb0fad157cd5c34 Mon Sep 17 00:00:00 2001 From: cindyyuanjiang Date: Wed, 30 Oct 2024 15:26:12 -0700 Subject: [PATCH] address review feedback Signed-off-by: cindyyuanjiang --- .../analysis/AppSparkMetricsAggTrait.scala | 3 +++ .../analysis/AppSparkMetricsAnalyzer.scala | 23 +++++++++++-------- .../spark/rapids/tool/views/package.scala | 2 +- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala index bf06839ff..1da9637c5 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala @@ -26,6 +26,9 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait { * object to aggregate the Raw metrics and returns the result * @param app the AppBase to be analyzed * @param index the application index + * @param sqlAnalyzer optional AppSQLPlanAnalyzer used to aggregate diagnostic metrics, + * this is already present in ApplicationInfo for Profiler, but for + * Qualification this argument needs to be provided. * @return a single record of AggRawMetricsResult containing all the raw aggregated Spark * metrics */ diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala index aa33c17c1..7204eb3e3 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala @@ -325,6 +325,9 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { /** * Aggregates the diagnostic SparkMetrics by stage. * @param index the App-index (used by the profiler tool) + * @param analyzerInput optional AppSQLPlanAnalyzer which is used to pull stage level + * information like node names and diagnostic metrics results, only + * Qualification needs to provide this argument. * @return sequence of StageDiagnosticAggTaskMetricsProfileResult */ def aggregateDiagnosticSparkMetricsByStage(index: Int, @@ -335,7 +338,9 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { val sqlAnalyzer = analyzerInput match { case Some(res) => res - case None => app.asInstanceOf[ApplicationInfo].planMetricProcessor + case None => + // for Profiler this is present in ApplicationInfo + app.asInstanceOf[ApplicationInfo].planMetricProcessor } // TODO: this has stage attempts. we should handle different attempts @@ -358,7 +363,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { val srFetchWaitTimeMetrics = disgnosticMetrics.getOrElse(SR_FETCH_WAIT_TIME_METRIC, zeroStats) val swWriteTimeMetrics = disgnosticMetrics.getOrElse(SW_WRITE_TIME_METRIC, zeroStats) val gpuSemaphoreMetrics = disgnosticMetrics.getOrElse(GPU_SEMAPHORE_WAIT_METRIC, zeroStats) - val (srBytesMin, srBytesMed, srBytesMax, srBytesSum) = + val srTotalBytesMetrics = AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sr_totalBytesRead)) StageDiagnosticResult(index, @@ -383,10 +388,10 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { outputBytesMetrics.med, outputBytesMetrics.max, outputBytesMetrics.total, - srBytesMin, - srBytesMed, - srBytesMax, - srBytesSum, + srTotalBytesMetrics.min, + srTotalBytesMetrics.med, + srTotalBytesMetrics.max, + srTotalBytesMetrics.total, swTotalBytesMetrics.min, swTotalBytesMetrics.med, swTotalBytesMetrics.max, @@ -521,9 +526,9 @@ object AppSparkMetricsAnalyzer { /** * Given an input iterable, returns its min, median, max and sum. */ - def getStatistics(arr: Iterable[Long]): (Long, Long, Long, Long) = { + def getStatistics(arr: Iterable[Long]): StatisticsMetrics = { if (arr.isEmpty) { - (0L, 0L, 0L, 0L) + StatisticsMetrics(0L, 0L, 0L, 0L) } val sortedArr = arr.toSeq.sorted val len = sortedArr.size @@ -532,7 +537,7 @@ object AppSparkMetricsAnalyzer { } else { sortedArr(len / 2) } - (sortedArr.head, med, sortedArr(len - 1), sortedArr.sum) + StatisticsMetrics(sortedArr.head, med, sortedArr(len - 1), sortedArr.sum) } def maxWithEmptyHandling(arr: Iterable[Long]): Long = { diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/package.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/package.scala index c3796ec93..a509ae3f7 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/package.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/package.scala @@ -28,7 +28,7 @@ package object views { val IO_LABEL = "IO Metrics" val SQL_DUR_LABEL = "SQL Duration and Executor CPU Time Percent" val SQL_MAX_INPUT_SIZE = "SQL Max Task Input Size" - val STAGE_DIAGNOSTICS_LABEL = "Stage level diagnostic metrics" + val STAGE_DIAGNOSTICS_LABEL = "Stage Level Diagnostic Metrics" val AGG_DESCRIPTION = Map( STAGE_AGG_LABEL -> "Stage metrics",