From 4ebd4b958cbdc6848694d893367c18d847b87278 Mon Sep 17 00:00:00 2001 From: skakker Date: Fri, 6 Apr 2018 17:59:22 +0530 Subject: [PATCH] adding failedTasks value (#363) --- .../spark/fetchers/SparkRestClient.scala | 33 ++++++++++++++----- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala index 21c4fb1a3..4b4ff8a9d 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala @@ -90,20 +90,37 @@ class SparkRestClient(sparkConf: SparkConf) { val futureExecutorSummaries = Future { getExecutorSummaries(attemptTarget) } + + val futureFailedTaskData = Future { + getStagesWithFailedTasks(attemptTarget) + } val futureLogData = if (fetchLogs) { Future { getLogData(attemptTarget) } } else Future.successful(None) - SparkRestDerivedData( - applicationInfo, - Await.result(futureJobDatas, DEFAULT_TIMEOUT), - Await.result(futureStageDatas, DEFAULT_TIMEOUT), - Await.result(futureExecutorSummaries, Duration(5, SECONDS)), - Await.result(futureLogData, Duration(5, SECONDS)) - ) - + if (fetchFailedTasks) { + val futureFailedTasksDatas = Future { + getStagesWithFailedTasks(attemptTarget) + } + SparkRestDerivedData( + applicationInfo, + Await.result(futureJobDatas, DEFAULT_TIMEOUT), + Await.result(futureStageDatas, DEFAULT_TIMEOUT), + Await.result(futureExecutorSummaries, DEFAULT_TIMEOUT), + Await.result(futureFailedTasksDatas, DEFAULT_TIMEOUT), + Await.result(futureLogData, DEFAULT_TIMEOUT)) + } else { + SparkRestDerivedData( + applicationInfo, + Await.result(futureJobDatas, DEFAULT_TIMEOUT), + Await.result(futureStageDatas, DEFAULT_TIMEOUT), + Await.result(futureExecutorSummaries, DEFAULT_TIMEOUT), + Seq.empty, + Await.result(futureLogData, DEFAULT_TIMEOUT) + ) + } } }