Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Skip Databricks Photon jobs at app level in Qualification tool #886

Merged
merged 5 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,12 @@ class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration,
val appResult = QualificationAppInfo.createApp(path, hadoopConf, pluginTypeChecker,
reportSqlLevel, mlOpsEnabled, penalizeTransitions)
val qualAppResult = appResult match {
case Left(errorMessage: String) =>
// Case when an error occurred during QualificationAppInfo creation
case Left(FailureApp("skipped", errorMessage)) =>
// Case to be skipped, e.g. encountered Databricks Photon event log
progressBar.foreach(_.reportSkippedProcess())
SkippedQualAppResult(pathStr, errorMessage)
case Left(FailureApp(_, errorMessage)) =>
// Case when other error occurred during QualificationAppInfo creation
progressBar.foreach(_.reportUnkownStatusProcess())
UnknownQualAppResult(pathStr, "", errorMessage)
case Right(app: QualificationAppInfo) =>
Expand Down Expand Up @@ -213,6 +217,8 @@ class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration,
StatusSummaryInfo(path, "SUCCESS", appId, message)
case FailureQualAppResult(path, message) =>
StatusSummaryInfo(path, "FAILURE", "", message)
case SkippedQualAppResult(path, message) =>
StatusSummaryInfo(path, "SKIPPED", "", message)
case UnknownQualAppResult(path, appId, message) =>
StatusSummaryInfo(path, "UNKNOWN", appId, message)
case qualAppResult: QualAppResult =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ object SupportedMLFuncsName {

case class GpuEventLogException(message: String) extends Exception(message)

case class PhotonEventLogException(message: String) extends Exception(message)

// Class used a container to hold the information of the Tuple<sqlID, PlanInfo, SparkGraph>
// to simplify arguments of methods and caching.
case class SqlPlanInfoGraphEntry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerEnvironmentUpdate, SparkListenerEvent, SparkListenerJobStart}
import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.rapids.tool.{AppBase, ClusterSummary, GpuEventLogException, SqlPlanInfoGraphBuffer, SupportedMLFuncsName, ToolUtils}
import org.apache.spark.sql.rapids.tool.{AppBase, ClusterSummary, GpuEventLogException, PhotonEventLogException, SqlPlanInfoGraphBuffer, SupportedMLFuncsName, ToolUtils}


class QualificationAppInfo(
Expand Down Expand Up @@ -795,6 +795,10 @@ class QualificationAppInfo(
val sqlPlanInfoGraphEntry = SqlPlanInfoGraphBuffer.createEntry(sqlID, planInfo)
checkMetadataForReadSchema(sqlPlanInfoGraphEntry)
for (node <- sqlPlanInfoGraphEntry.sparkPlanGraph.allNodes) {
if (node.name.startsWith("Photon")) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that we could detect the Photon in an earlier phase as we process the eventlogs.
For example, doSparkListenerSQLExecutionStart() could check for certain conditions to disqualify the entire app.
The pros of having the code in doSparkListenerSQLExecutionStart():

  • For Qualification tool, the bottleneck of performance is actually in reading and processing the evntlogs. Therefore, if we fail during that stage, then we are saving significant amount of useless processing and computation.
  • If we need to propagate that logic to Profiling, it becomes much easier as we need only to move the implementation to the EvenProcessorBase.doSparkListenerSQLExecutionStart instead of QualificationEventProcessor.doSparkListenerSQLExecutionStart. Qualification if we need

throw PhotonEventLogException(
"Encountered Databricks Photon event log: skipping this file!")
}
checkGraphNodeForReads(sqlID, node)
val issues = findPotentialIssues(node.desc)
if (issues.nonEmpty) {
Expand Down Expand Up @@ -949,6 +953,12 @@ case class StageQualSummaryInfo(
stageWallclockDuration: Long = 0,
unsupportedExecs: Seq[ExecInfo] = Seq.empty)

// Case class to represent a failed QualificationAppInfo creation
case class FailureApp(
status: String,
message: String
)

object QualificationAppInfo extends Logging {
// define recommendation constants
val RECOMMENDED = "Recommended"
Expand All @@ -963,20 +973,22 @@ object QualificationAppInfo extends Logging {
// based on the testing on few candidate eventlogs.
val CPU_GPU_TRANSFER_RATE = 1000000000L

private def handleException(e: Exception, path: EventLogInfo): String = {
val message: String = e match {
private def handleException(e: Exception, path: EventLogInfo): FailureApp = {
val (status, message): (String, String) = e match {
case photonLog: PhotonEventLogException =>
("skipped", photonLog.message)
case gpuLog: GpuEventLogException =>
gpuLog.message
("unknown", gpuLog.message)
case _: com.fasterxml.jackson.core.JsonParseException =>
s"Error parsing JSON: ${path.eventLog.toString}"
("unknown", s"Error parsing JSON: ${path.eventLog.toString}")
case _: IllegalArgumentException =>
s"Error parsing file: ${path.eventLog.toString}"
("unknown", s"Error parsing file: ${path.eventLog.toString}")
case _: Exception =>
// catch all exceptions and skip that file
s"Got unexpected exception processing file: ${path.eventLog.toString}"
("unknown", s"Got unexpected exception processing file: ${path.eventLog.toString}")
}

s"${e.getClass.getSimpleName}: $message"
FailureApp(status, s"${e.getClass.getSimpleName}: $message")
}

def getRecommendation(totalSpeedup: Double,
Expand Down Expand Up @@ -1055,15 +1067,15 @@ object QualificationAppInfo extends Logging {
pluginTypeChecker: PluginTypeChecker,
reportSqlLevel: Boolean,
mlOpsEnabled: Boolean,
penalizeTransitions: Boolean): Either[String, QualificationAppInfo] = {
penalizeTransitions: Boolean): Either[FailureApp, QualificationAppInfo] = {
try {
val app = new QualificationAppInfo(Some(path), Some(hadoopConf), pluginTypeChecker,
reportSqlLevel, false, mlOpsEnabled, penalizeTransitions)
logInfo(s"${path.eventLog.toString} has App: ${app.appId}")
Right(app)
} catch {
case e: Exception =>
Left(handleException(e, path))
}
val app = new QualificationAppInfo(Some(path), Some(hadoopConf), pluginTypeChecker,
reportSqlLevel, false, mlOpsEnabled, penalizeTransitions)
logInfo(s"${path.eventLog.toString} has App: ${app.appId}")
Right(app)
} catch {
case e: Exception =>
parthosa marked this conversation as resolved.
Show resolved Hide resolved
Left(handleException(e, path))
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,14 +27,14 @@ import org.apache.spark.internal.Logging
* ConsoleProgressBar shows the progress of the tool (Qualification/Profiler) in the console.
* This class is inspired from org.apache.spark.ui.ConsoleProgressBar.
*
* The implementation defines three counters: succeeded, failed, and N/A.
* The implementation defines four counters: succeeded, failed, skipped, and N/A.
* The list can be extended to add new counters during runtime which gives flexibility to have
* custom statistics for different contexts.
*
* By default, the progress bar will be shown in the stdout.
* Sample generated line:
* (.)+ Progress (\\d+)% [==> ] ((\\d+) succeeded + (\\d+) failed + (\\d+) N/A) / [(\\d+)]
* toolName Progress 71% [=======> ] (83 succeeded + 2 failed + 0 N/A) / 119
* .+ Progress (\d+)% [==> ] ((\d+) succeeded + (\d+) failed + (\d+) skipped + (\d+) N/A) / (\d+)
* toolName Progress 71% [=======> ] (81 succeeded + 3 failed + 1 skipped + 0 N/A) / 119
*
* At the end of execution, it dumps all the defined the counters.
*
Expand All @@ -50,12 +50,14 @@ class ConsoleProgressBar(
import ConsoleProgressBar._
private val successCounter = new AtomicLong(0)
private val failureCounter = new AtomicLong(0)
private val skippedCounter = new AtomicLong(0)
private val statusNotAvailableCounter = new AtomicLong(0)
private val totalCounter = new AtomicLong(0)

private val metrics = mutable.LinkedHashMap[String, AtomicLong](
PROCESS_SUCCESS_COUNT -> successCounter,
PROCESS_FAILURE_COUNT -> failureCounter,
PROCESS_SKIPPED_COUNT -> skippedCounter,
PROCESS_NOT_AVAILABLE_COUNT -> statusNotAvailableCounter,
EXECUTION_TOTAL_COUNTER -> totalCounter)

Expand Down Expand Up @@ -127,6 +129,11 @@ class ConsoleProgressBar(
totalCounter.incrementAndGet()
}

def reportSkippedProcess(): Unit = {
skippedCounter.incrementAndGet()
totalCounter.incrementAndGet()
}

def reportUnkownStatusProcess(): Unit = {
statusNotAvailableCounter.incrementAndGet()
totalCounter.incrementAndGet()
Expand All @@ -140,6 +147,10 @@ class ConsoleProgressBar(
(1 to n).foreach(_ => reportFailedProcess())
}

def reportSkippedProcesses(n: Int): Unit = {
(1 to n).foreach(_ => reportSkippedProcess())
}

def reportUnknownStatusProcesses(n: Int): Unit = {
(1 to n).foreach(_ => reportUnkownStatusProcess())
}
Expand All @@ -158,6 +169,7 @@ class ConsoleProgressBar(
val tailer =
s"] (${successCounter.longValue()} succeeded + " +
s"${failureCounter.longValue()} failed + " +
s"${skippedCounter.longValue()} skipped + " +
s"${statusNotAvailableCounter.longValue()} N/A) / $totalCount"
val w = TerminalWidth - header.length - tailer.length
val bar = if (w > 0) {
Expand Down Expand Up @@ -231,6 +243,7 @@ object ConsoleProgressBar {

val PROCESS_SUCCESS_COUNT = "process.success.count"
val PROCESS_FAILURE_COUNT = "process.failure.count"
val PROCESS_SKIPPED_COUNT = "process.skipped.count"
val PROCESS_NOT_AVAILABLE_COUNT = "process.NA.count"
val EXECUTION_TOTAL_COUNTER = "execution.total.count"
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,3 +49,6 @@ case class FailureQualAppResult(path: String, message: String)

case class UnknownQualAppResult(path: String, appId: String, message: String)
extends QualAppResult(path, message) {}

case class SkippedQualAppResult(path: String, message: String)
extends QualAppResult(path, message) {}