Skip to content

Commit

Permalink
adding failedTasks value (#363)
Browse files Browse the repository at this point in the history
  • Loading branch information
skakker authored and akshayrai committed Apr 6, 2018
1 parent 638eb77 commit 4ebd4b9
Showing 1 changed file with 25 additions and 8 deletions.
33 changes: 25 additions & 8 deletions app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,37 @@ class SparkRestClient(sparkConf: SparkConf) {
val futureExecutorSummaries = Future {
getExecutorSummaries(attemptTarget)
}

val futureFailedTaskData = Future {
getStagesWithFailedTasks(attemptTarget)
}
val futureLogData = if (fetchLogs) {
Future {
getLogData(attemptTarget)
}
} else Future.successful(None)

SparkRestDerivedData(
applicationInfo,
Await.result(futureJobDatas, DEFAULT_TIMEOUT),
Await.result(futureStageDatas, DEFAULT_TIMEOUT),
Await.result(futureExecutorSummaries, Duration(5, SECONDS)),
Await.result(futureLogData, Duration(5, SECONDS))
)

if (fetchFailedTasks) {
val futureFailedTasksDatas = Future {
getStagesWithFailedTasks(attemptTarget)
}
SparkRestDerivedData(
applicationInfo,
Await.result(futureJobDatas, DEFAULT_TIMEOUT),
Await.result(futureStageDatas, DEFAULT_TIMEOUT),
Await.result(futureExecutorSummaries, DEFAULT_TIMEOUT),
Await.result(futureFailedTasksDatas, DEFAULT_TIMEOUT),
Await.result(futureLogData, DEFAULT_TIMEOUT))
} else {
SparkRestDerivedData(
applicationInfo,
Await.result(futureJobDatas, DEFAULT_TIMEOUT),
Await.result(futureStageDatas, DEFAULT_TIMEOUT),
Await.result(futureExecutorSummaries, DEFAULT_TIMEOUT),
Seq.empty,
Await.result(futureLogData, DEFAULT_TIMEOUT)
)
}
}
}

Expand Down

0 comments on commit 4ebd4b9

Please sign in to comment.