Skip to content

Commit

Permalink
Include status information for failed event logs in core tool (#1187)
Browse files Browse the repository at this point in the history
* Add status reports for failed event logs

Signed-off-by: Partho Sarthi <[email protected]>

* Add copyrights

Signed-off-by: Partho Sarthi <[email protected]>

---------

Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa authored Jul 15, 2024
1 parent 8cc3fe0 commit 235f048
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.time.LocalDateTime
import java.util.zip.ZipOutputStream

import scala.collection.mutable.{LinkedHashMap, ListBuffer}
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, PathFilter}
Expand All @@ -29,6 +30,7 @@ import org.apache.spark.deploy.history.{EventLogFileReader, EventLogFileWriter}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
import org.apache.spark.sql.rapids.tool.util.FSUtils
import org.apache.spark.sql.rapids.tool.util.StringUtils

sealed trait EventLogInfo {
def eventLog: Path
Expand All @@ -37,6 +39,12 @@ sealed trait EventLogInfo {
case class ApacheSparkEventLog(override val eventLog: Path) extends EventLogInfo
case class DatabricksEventLog(override val eventLog: Path) extends EventLogInfo

case class FailedEventLog(override val eventLog: Path,
private val reason: String) extends EventLogInfo {
def getReason: String = {
StringUtils.renderStr(reason, doEscapeMetaCharacters = true, maxLength = 0)
}
}

object EventLogPathProcessor extends Logging {
// Apache Spark event log prefixes
Expand Down Expand Up @@ -125,7 +133,8 @@ object EventLogPathProcessor extends Logging {
processedLogs.toList
}

def getEventLogInfo(pathString: String, hadoopConf: Configuration): Map[EventLogInfo, Long] = {
def getEventLogInfo(pathString: String,
hadoopConf: Configuration): Map[EventLogInfo, Option[Long]] = {
val inputPath = new Path(pathString)
try {
// Note that some cloud storage APIs may throw FileNotFoundException when the pathPrefix
Expand All @@ -147,15 +156,17 @@ object EventLogPathProcessor extends Logging {
"Skipping this file."
}
logWarning(msg)
Map.empty[EventLogInfo, Long]
// Return an empty map as this is a skip due to unsupported file type, not an exception.
// Returning FailedEventLog would clutter the status report with unnecessary entries.
Map.empty[EventLogInfo, Option[Long]]
} else if (fileStatus.isDirectory && isEventLogDir(fileStatus)) {
// either event logDir v2 directory or regular event log
val info = ApacheSparkEventLog(fileStatus.getPath).asInstanceOf[EventLogInfo]
Map(info -> fileStatus.getModificationTime)
Map(info -> Some(fileStatus.getModificationTime))
} else if (fileStatus.isDirectory &&
isDatabricksEventLogDir(fileStatus, fs)) {
val dbinfo = DatabricksEventLog(fileStatus.getPath).asInstanceOf[EventLogInfo]
Map(dbinfo -> fileStatus.getModificationTime)
Map(dbinfo -> Some(fileStatus.getModificationTime))
} else {
// assume either single event log or directory with event logs in it, we don't
// support nested dirs, so if event log dir within another one we skip it
Expand All @@ -179,19 +190,21 @@ object EventLogPathProcessor extends Logging {
}
logsSupported.map { s =>
if (s.isFile || (s.isDirectory && isEventLogDir(s.getPath().getName()))) {
(ApacheSparkEventLog(s.getPath).asInstanceOf[EventLogInfo] -> s.getModificationTime)
(ApacheSparkEventLog(s.getPath).asInstanceOf[EventLogInfo]
-> Some(s.getModificationTime))
} else {
(DatabricksEventLog(s.getPath).asInstanceOf[EventLogInfo] -> s.getModificationTime)
(DatabricksEventLog(s.getPath).asInstanceOf[EventLogInfo]
-> Some(s.getModificationTime))
}
}.toMap
}
} catch {
case _: FileNotFoundException =>
case fnfEx: FileNotFoundException =>
logWarning(s"$pathString not found, skipping!")
Map.empty[EventLogInfo, Long]
case e: Exception =>
Map(FailedEventLog(new Path(pathString), fnfEx.getMessage) -> None)
case NonFatal(e) =>
logWarning(s"Unexpected exception occurred reading $pathString, skipping!", e)
Map.empty[EventLogInfo, Long]
Map(FailedEventLog(new Path(pathString), e.getMessage) -> None)
}
}

Expand Down Expand Up @@ -226,10 +239,15 @@ object EventLogPathProcessor extends Logging {
val filteredInfo = filterNLogs.get.split("-")
val numberofEventLogs = filteredInfo(0).toInt
val criteria = filteredInfo(1)
// Before filtering based on user criteria, remove the failed event logs
// (i.e. logs without timestamp) from the list.
val validMatchedLogs = matchedLogs.collect {
case (info, Some(ts)) => info -> ts
}
val matched = if (criteria.equals("newest")) {
LinkedHashMap(matchedLogs.toSeq.sortWith(_._2 > _._2): _*)
LinkedHashMap(validMatchedLogs.toSeq.sortWith(_._2 > _._2): _*)
} else if (criteria.equals("oldest")) {
LinkedHashMap(matchedLogs.toSeq.sortWith(_._2 < _._2): _*)
LinkedHashMap(validMatchedLogs.toSeq.sortWith(_._2 < _._2): _*)
} else {
logError("Criteria should be either newest-filesystem or oldest-filesystem")
Map.empty[EventLogInfo, Long]
Expand Down
59 changes: 59 additions & 0 deletions core/src/main/scala/com/nvidia/spark/rapids/tool/ToolBase.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool

import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}

import com.google.common.util.concurrent.ThreadFactoryBuilder

import org.apache.spark.sql.rapids.tool.ui.ConsoleProgressBar
import org.apache.spark.sql.rapids.tool.util.{AppResult, FailureAppResult, RuntimeReporter}

/**
* Base class for Profiling and Qualification tools.
*/
abstract class ToolBase(timeout: Option[Long]) extends RuntimeReporter {

protected val simpleName: String
protected val waitTimeInSec: Long = timeout.getOrElse(60 * 60 * 24L)
protected val threadFactory: ThreadFactory = new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat(getClass.getSimpleName + "-%d").build()
protected val threadPool: ThreadPoolExecutor =
Executors.newFixedThreadPool(getNumThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
protected var progressBar: Option[ConsoleProgressBar] = None
// Store application status reports indexed by event log path.
protected val appStatusReporter: ConcurrentHashMap[String, AppResult] =
new ConcurrentHashMap[String, AppResult]

def getNumThreads: Int

logInfo(s"Threadpool size is $getNumThreads")

/**
* Handles a failed event log by logging the failure, adding it to appStatusReporter,
* and updating the progress bar.
*
* @param failedEventLog The event log that failed to process.
*/
final def handleFailedEventLogs(failedEventLog: FailedEventLog): Unit = {
val pathStr = failedEventLog.eventLog.toString
val failureAppResult = FailureAppResult(pathStr, failedEventLog.getReason)
failureAppResult.logMessage()
appStatusReporter.put(pathStr, failureAppResult)
progressBar.foreach(_.reportFailedProcess())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@

package com.nvidia.spark.rapids.tool.profiling

import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Executors, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal

import com.nvidia.spark.rapids.ThreadFactoryBuilder
import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, EventLogInfo, EventLogPathProcessor, PlatformFactory}
import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, EventLogInfo, EventLogPathProcessor, FailedEventLog, PlatformFactory, ToolBase}
import com.nvidia.spark.rapids.tool.views._
import org.apache.hadoop.conf.Configuration

Expand All @@ -33,33 +32,19 @@ import org.apache.spark.sql.rapids.tool.ui.ConsoleProgressBar
import org.apache.spark.sql.rapids.tool.util._

class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolean)
extends RuntimeReporter {
extends ToolBase(appArgs.timeout.toOption) {

private val nThreads = appArgs.numThreads.getOrElse(
Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
private val timeout = appArgs.timeout.toOption
private val waitTimeInSec = timeout.getOrElse(60 * 60 * 24L)

private val threadFactory = new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("profileTool" + "-%d").build()
private val threadPool = Executors.newFixedThreadPool(nThreads, threadFactory)
.asInstanceOf[ThreadPoolExecutor]
override val simpleName: String = "profileTool"
override val outputDir: String = appArgs.outputDirectory().stripSuffix("/") +
s"/${Profiler.SUBDIR}"
private val numOutputRows = appArgs.numOutputRows.getOrElse(1000)

private val outputCSV: Boolean = appArgs.csv()
private val outputCombined: Boolean = appArgs.combined()

private val useAutoTuner: Boolean = appArgs.autoTuner()
private var progressBar: Option[ConsoleProgressBar] = None
// Store application status reports indexed by event log path.
private val appStatusReporter = new ConcurrentHashMap[String, AppResult]

private val outputAlignedSQLIds: Boolean = appArgs.outputSqlIdsAligned()

override val outputDir = appArgs.outputDirectory().stripSuffix("/") +
s"/${Profiler.SUBDIR}"

logInfo(s"Threadpool size is $nThreads")
override def getNumThreads: Int = appArgs.numThreads.getOrElse(
Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)

/**
* Profiles application according to the mode requested. The main difference in processing for
Expand Down Expand Up @@ -166,6 +151,13 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
processSuccessApp: ApplicationInfo => Unit): Unit = {
val pathStr = path.eventLog.toString
try {
// Early handling of failed event logs
path match {
case failedEventLog: FailedEventLog =>
handleFailedEventLogs(failedEventLog)
return
case _ => // No action needed for other cases
}
val startTime = System.currentTimeMillis()
val appOpt = createApp(path, index, hadoopConf)
val profAppResult = appOpt match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

package com.nvidia.spark.rapids.tool.qualification

import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Executors, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}

import scala.collection.JavaConverters._

import com.nvidia.spark.rapids.ThreadFactoryBuilder
import com.nvidia.spark.rapids.tool.EventLogInfo
import com.nvidia.spark.rapids.tool.{EventLogInfo, FailedEventLog, ToolBase}
import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.DEFAULT_JOB_FREQUENCY
import com.nvidia.spark.rapids.tool.tuning.TunerContext
import com.nvidia.spark.rapids.tool.views.QualRawReportGenerator
Expand All @@ -38,24 +37,14 @@ class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration,
printStdout: Boolean, uiEnabled: Boolean, enablePB: Boolean,
reportSqlLevel: Boolean, maxSQLDescLength: Int, mlOpsEnabled:Boolean,
penalizeTransitions: Boolean, tunerContext: Option[TunerContext],
clusterReport: Boolean) extends RuntimeReporter {
clusterReport: Boolean) extends ToolBase(timeout) {

override val simpleName: String = "qualTool"
override val outputDir = s"$outputPath/rapids_4_spark_qualification_output"
private val allApps = new ConcurrentLinkedQueue[QualificationSummaryInfo]()

// default is 24 hours
private val waitTimeInSec = timeout.getOrElse(60 * 60 * 24L)

private val threadFactory = new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("qualTool" + "-%d").build()
logInfo(s"Threadpool size is $nThreads")
private val threadPool = Executors.newFixedThreadPool(nThreads, threadFactory)
.asInstanceOf[ThreadPoolExecutor]

private var progressBar: Option[ConsoleProgressBar] = None
// Store application status reports indexed by event log path.
private val appStatusReporter = new ConcurrentHashMap[String, AppResult]
override def getNumThreads: Int = nThreads

override val outputDir = s"$outputPath/rapids_4_spark_qualification_output"
private class QualifyThread(path: EventLogInfo) extends Runnable {
def run: Unit = qualifyApp(path, hadoopConf)
}
Expand Down Expand Up @@ -147,6 +136,13 @@ class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration,
hadoopConf: Configuration): Unit = {
val pathStr = path.eventLog.toString
try {
// Early handling of failed event logs
path match {
case failedEventLog: FailedEventLog =>
handleFailedEventLogs(failedEventLog)
return
case _ => // No action needed for other cases
}
val startTime = System.currentTimeMillis()
val appResult = QualificationAppInfo.createApp(path, hadoopConf, pluginTypeChecker,
reportSqlLevel, mlOpsEnabled, penalizeTransitions)
Expand Down

0 comments on commit 235f048

Please sign in to comment.