Skip to content

Commit

Permalink
Make it so that we can work with our forked NFLX and upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed Apr 22, 2024
1 parent e89b903 commit cd818c5
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class InternalEventHistoryReplayer(sparkConf: SparkConf, appId: String, attemptI
val replayListenerBus = new ReplayListenerBus()
replayListenerBus.addListener(new QuboleJobListener(sparkConf))

def allCodecOptions: Seq[Option[String]] = {
def possibleCodecOptions: Seq[Option[String]] = {
// Because CompressionCodec has codec list as private
val codecs: Seq[Option[String]] = List(
None, Some("lz4"), Some("lzf"), Some("snappy"), Some("zstd"))
Expand All @@ -53,7 +53,7 @@ class InternalEventHistoryReplayer(sparkConf: SparkConf, appId: String, attemptI
}
}

private def getLogPath(
protected def getLogPath(
logBaseDir: URI,
appId: String,
appAttemptId: Option[String],
Expand All @@ -67,13 +67,13 @@ class InternalEventHistoryReplayer(sparkConf: SparkConf, appId: String, attemptI
}
}

def getLogPaths(appId: String, attemptId: Option[String]) = {
def possibleLogPaths(appId: String, attemptId: Option[String]) = {
val logBaseDir = Utils.resolveURI(sparkConf.get(EVENT_LOG_DIR))
val rollingPath = RollingEventLogFilesWriter.getAppEventLogDirPath(
logBaseDir, appId, attemptId).toString()
val singlePath = SingleEventLogFileWriter.getLogPath(
logBaseDir, appId, attemptId)
val paths: Seq[String] = allCodecOptions.flatMap { codec =>
val paths: Seq[String] = possibleCodecOptions.flatMap { codec =>
val logPath = getLogPath(Utils.resolveURI(logDir), appId, attemptId, codec)
val inProgressPath = logPath + EventLogFileWriter.IN_PROGRESS
Seq(logPath, inProgressPath)
Expand All @@ -82,7 +82,7 @@ class InternalEventHistoryReplayer(sparkConf: SparkConf, appId: String, attemptI
paths
}

getLogPaths(appId, attemptId)
possibleLogPaths(appId, attemptId)
.map(new Path(_))
.flatMap(tryGetFileStatus)
.headOption // find the first valid log path amongst all possible log paths
Expand Down

0 comments on commit cd818c5

Please sign in to comment.