Skip to content

Commit

Permalink
Spark Heuristic Fixes for Dr. Elephant (#324)
Browse files Browse the repository at this point in the history
Separated out Driver checks into a separate Driver Metrics heuristic: Checks driver configurations, driver GC time and JVM used memory.
  • Loading branch information
skakker authored and akshayrai committed Apr 6, 2018
1 parent 019a9f4 commit 25c07bb
Show file tree
Hide file tree
Showing 29 changed files with 674 additions and 227 deletions.
5 changes: 2 additions & 3 deletions app-conf/FetcherConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,12 @@
<applicationtype>mapreduce</applicationtype>
<classname>com.linkedin.drelephant.mapreduce.fetchers.MapReduceFSFetcherHadoop2</classname>
<params>
<sampling_enabled>false</sampling_enabled>
<history_log_size_limit_in_mb>500</history_log_size_limit_in_mb>
<history_server_time_zone>PST</history_server_time_zone>
<sampling_enabled>false</sampling_enabled>
<history_server_time_zone>UTC</history_server_time_zone>
</params>
</fetcher>


<!--
FSFetcher for Spark. Loads the eventlog from HDFS and replays to get the metrics and application properties
Expand Down
46 changes: 42 additions & 4 deletions app-conf/HeuristicConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -292,53 +292,91 @@
<classname>com.linkedin.drelephant.spark.heuristics.ConfigurationHeuristic</classname>
<viewname>views.html.help.spark.helpConfigurationHeuristic</viewname>
</heuristic>
<heuristic>

<!--<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark Executor Metrics</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.ExecutorsHeuristic</classname>
<viewname>views.html.help.spark.helpExecutorsHeuristic</viewname>
</heuristic>
</heuristic>-->

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark Job Metrics</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.JobsHeuristic</classname>
<viewname>views.html.help.spark.helpJobsHeuristic</viewname>
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark Stage Metrics</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.StagesHeuristic</classname>
<viewname>views.html.help.spark.helpStagesHeuristic</viewname>
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Peak Unified Memory</heuristicname>
<heuristicname>Executor Peak Unified Memory</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.UnifiedMemoryHeuristic</classname>
<viewname>views.html.help.spark.helpUnifiedMemoryHeuristic</viewname>
<!--<params>
<peak_unified_memory_threshold>0.7,0.6,0.4,0.2</peak_unified_memory_threshold>
<spark_executor_memory_threshold_key>2G</spark_executor_memory_threshold_key>
</params>-->
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>JVM Used Memory</heuristicname>
<heuristicname>Executor JVM Used Memory</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.JvmUsedMemoryHeuristic</classname>
<viewname>views.html.help.spark.helpJvmUsedMemoryHeuristic</viewname>
<!--<params>
<executor_peak_jvm_memory_threshold>1.25,1.5,2,3</executor_peak_jvm_memory_threshold>
<spark_executor_memory_threshold_key>2G</spark_executor_memory_threshold_key>
</params>-->
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Stages with failed tasks</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.StagesWithFailedTasksHeuristic</classname>
<viewname>views.html.help.spark.helpStagesWithFailedTasks</viewname>
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Executor GC</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.ExecutorGcHeuristic</classname>
<viewname>views.html.help.spark.helpExecutorGcHeuristic</viewname>
<!--<params>
<gc_severity_A_threshold>0.08,0.09,0.1,0.15</gc_severity_A_threshold>
<gc_severity_D_threshold>0.05,0.04,0.03,0.01</gc_severity_D_threshold>
</params>-->
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Executor spill</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.ExecutorStorageSpillHeuristic</classname>
<viewname>views.html.help.spark.helpExecutorStorageSpillHeuristic</viewname>
<!--<params>
<spill_fraction_of_executors_threshold>0.2</spill_fraction_of_executors_threshold>
<spill_max_memory_threshold>0.05</spill_max_memory_threshold>
<spark_executor_cores_threshold>4</spark_executor_cores_threshold>
<spark_executor_memory_threshold>10GB</spark_executor_memory_threshold>
</params>-->
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Driver Metrics</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.DriverHeuristic</classname>
<viewname>views.html.help.spark.helpDriverHeuristic</viewname>
<!--<params>
<driver_peak_jvm_memory_threshold>1.25,1.5,2,3</driver_peak_jvm_memory_threshold>
<gc_severity_threshold>0.08,0.09,0.1,0.15</gc_severity_threshold>
<spark_executor_memory_threshold_key>2G</spark_executor_memory_threshold_key>
</params>-->
</heuristic>

</heuristics>
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ class SparkRestClient(sparkConf: SparkConf) {
}

private def getExecutorSummaries(attemptTarget: WebTarget): Seq[ExecutorSummaryImpl] = {
val target = attemptTarget.path("executors")
val target = attemptTarget.path("allexecutors")
try {
get(target, SparkRestObjectMapper.readValue[Seq[ExecutorSummaryImpl]])
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import com.linkedin.drelephant.math.Statistics
* A heuristic based on an app's known configuration.
*
* The results from this heuristic primarily inform users about key app configuration settings, including
* driver memory, driver cores, executor cores, executor instances, executor memory, and the serializer.
* executor cores, executor instances, executor memory, and the serializer.
*
* It also checks whether the values specified are within threshold.
*/
Expand All @@ -55,10 +55,6 @@ class ConfigurationHeuristic(private val heuristicConfigurationData: HeuristicCo
property.getOrElse("Not presented. Using default.")

val resultDetails = Seq(
new HeuristicResultDetails(
SPARK_DRIVER_MEMORY_KEY,
formatProperty(evaluator.driverMemoryBytes.map(MemoryFormatUtils.bytesToString))
),
new HeuristicResultDetails(
SPARK_EXECUTOR_MEMORY_KEY,
formatProperty(evaluator.executorMemoryBytes.map(MemoryFormatUtils.bytesToString))
Expand All @@ -80,16 +76,16 @@ class ConfigurationHeuristic(private val heuristicConfigurationData: HeuristicCo
formatProperty(evaluator.isDynamicAllocationEnabled.map(_.toString))
),
new HeuristicResultDetails(
SPARK_DRIVER_CORES_KEY,
formatProperty(evaluator.driverCores.map(_.toString))
SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD,
evaluator.sparkYarnExecutorMemoryOverhead
),
new HeuristicResultDetails(
SPARK_YARN_DRIVER_MEMORY_OVERHEAD,
evaluator.sparkYarnDriverMemoryOverhead
SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS,
evaluator.dynamicMinExecutors.getOrElse(0).toString
),
new HeuristicResultDetails(
SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD,
evaluator.sparkYarnExecutorMemoryOverhead
SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS,
evaluator.dynamicMaxExecutors.getOrElse(0).toString
)
)
// Constructing a mutable ArrayList for resultDetails, otherwise addResultDetail method HeuristicResult cannot be used.
Expand All @@ -109,21 +105,24 @@ class ConfigurationHeuristic(private val heuristicConfigurationData: HeuristicCo
result.addResultDetail(SPARK_SHUFFLE_SERVICE_ENABLED, formatProperty(evaluator.isShuffleServiceEnabled.map(_.toString)),
"Spark shuffle service is not enabled.")
}
if (evaluator.severityMinExecutors == Severity.CRITICAL) {
result.addResultDetail("Minimum Executors", "The minimum executors for Dynamic Allocation should be <=1. Please change it in the " + SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS + " field.")
if (evaluator.severityMinExecutors != Severity.NONE) {
result.addResultDetail("Minimum Executors", "The minimum executors for Dynamic Allocation should be "+ THRESHOLD_MIN_EXECUTORS + ". Please change it in the " + SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS + " field.")
}
if (evaluator.severityMaxExecutors == Severity.CRITICAL) {
result.addResultDetail("Maximum Executors", "The maximum executors for Dynamic Allocation should be <=900. Please change it in the " + SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS + " field.")
if (evaluator.severityMaxExecutors != Severity.NONE) {
result.addResultDetail("Maximum Executors", "The maximum executors for Dynamic Allocation should be <=" + THRESHOLD_MAX_EXECUTORS + ". Please change it in the " + SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS + " field.")
}
if (evaluator.jarsSeverity == Severity.CRITICAL) {
if (evaluator.jarsSeverity != Severity.NONE) {
result.addResultDetail("Jars notation", "It is recommended to not use * notation while specifying jars in the field " + SPARK_YARN_JARS)
}
if(evaluator.severityDriverMemoryOverhead.getValue >= Severity.SEVERE.getValue) {
result.addResultDetail("Driver Overhead Memory", "Please do not specify excessive amount of overhead memory for Driver. Change it in the field " + SPARK_YARN_DRIVER_MEMORY_OVERHEAD)
}
if(evaluator.severityExecutorMemoryOverhead.getValue >= Severity.SEVERE.getValue) {
if(evaluator.severityExecutorMemoryOverhead != Severity.NONE) {
result.addResultDetail("Executor Overhead Memory", "Please do not specify excessive amount of overhead memory for Executors. Change it in the field " + SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD)
}
if(evaluator.severityExecutorCores != Severity.NONE) {
result.addResultDetail("Executor cores", "The number of executor cores should be <=" + evaluator.DEFAULT_SPARK_CORES_THRESHOLDS.low + ". Please change it in the field " + SPARK_EXECUTOR_CORES_KEY)
}
if(evaluator.severityExecutorMemory != Severity.NONE) {
result.addResultDetail("Executor memory", "Please do not specify excessive amount of executor memory. Change it in the field " + SPARK_EXECUTOR_MEMORY_KEY)
}
result
}
}
Expand All @@ -134,20 +133,17 @@ object ConfigurationHeuristic {

val SERIALIZER_IF_NON_NULL_RECOMMENDATION_KEY = "serializer_if_non_null_recommendation"

val SPARK_DRIVER_MEMORY_KEY = "spark.driver.memory"
val SPARK_EXECUTOR_MEMORY_KEY = "spark.executor.memory"
val SPARK_EXECUTOR_INSTANCES_KEY = "spark.executor.instances"
val SPARK_EXECUTOR_CORES_KEY = "spark.executor.cores"
val SPARK_SERIALIZER_KEY = "spark.serializer"
val SPARK_APPLICATION_DURATION = "spark.application.duration"
val SPARK_SHUFFLE_SERVICE_ENABLED = "spark.shuffle.service.enabled"
val SPARK_DYNAMIC_ALLOCATION_ENABLED = "spark.dynamicAllocation.enabled"
val SPARK_DRIVER_CORES_KEY = "spark.driver.cores"
val SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS = "spark.dynamicAllocation.minExecutors"
val SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS = "spark.dynamicAllocation.maxExecutors"
val SPARK_YARN_JARS = "spark.yarn.secondary.jars"
val SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD = "spark.yarn.executor.memoryOverhead"
val SPARK_YARN_DRIVER_MEMORY_OVERHEAD = "spark.yarn.driver.memoryOverhead"
val THRESHOLD_MIN_EXECUTORS: Int = 1
val THRESHOLD_MAX_EXECUTORS: Int = 900
val SPARK_OVERHEAD_MEMORY_THRESHOLD_KEY = "spark.overheadMemory.thresholds.key"
Expand All @@ -159,9 +155,6 @@ object ConfigurationHeuristic {
lazy val appConfigurationProperties: Map[String, String] =
data.appConfigurationProperties

lazy val driverMemoryBytes: Option[Long] =
Try(getProperty(SPARK_DRIVER_MEMORY_KEY).map(MemoryFormatUtils.stringToBytes)).getOrElse(None)

lazy val executorMemoryBytes: Option[Long] =
Try(getProperty(SPARK_EXECUTOR_MEMORY_KEY).map(MemoryFormatUtils.stringToBytes)).getOrElse(None)

Expand All @@ -171,9 +164,6 @@ object ConfigurationHeuristic {
lazy val executorCores: Option[Int] =
Try(getProperty(SPARK_EXECUTOR_CORES_KEY).map(_.toInt)).getOrElse(None)

lazy val driverCores: Option[Int] =
Try(getProperty(SPARK_DRIVER_CORES_KEY).map(_.toInt)).getOrElse(None)

lazy val dynamicMinExecutors: Option[Int] =
Try(getProperty(SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS).map(_.toInt)).getOrElse(None)

Expand All @@ -196,8 +186,6 @@ object ConfigurationHeuristic {

lazy val sparkYarnExecutorMemoryOverhead: String = if (getProperty(SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD).getOrElse("0").matches("(.*)[0-9]"))
MemoryFormatUtils.bytesToString(MemoryFormatUtils.stringToBytes(getProperty(SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD).getOrElse("0") + "MB")) else (getProperty(SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD).getOrElse("0"))
lazy val sparkYarnDriverMemoryOverhead: String = if (getProperty(SPARK_YARN_DRIVER_MEMORY_OVERHEAD).getOrElse("0").matches("(.*)[0-9]"))
MemoryFormatUtils.bytesToString(MemoryFormatUtils.stringToBytes(getProperty(SPARK_YARN_DRIVER_MEMORY_OVERHEAD).getOrElse("0") + "MB")) else getProperty(SPARK_YARN_DRIVER_MEMORY_OVERHEAD).getOrElse("0")

lazy val serializer: Option[String] = getProperty(SPARK_SERIALIZER_KEY)

Expand All @@ -211,16 +199,14 @@ object ConfigurationHeuristic {
case Some(_) => DEFAULT_SERIALIZER_IF_NON_NULL_SEVERITY_IF_RECOMMENDATION_UNMET
}

//The following thresholds are for checking if the memory and cores values (executor and driver) are above normal. These thresholds are experimental, and may change in the future.
//The following thresholds are for checking if the memory and cores values of executors are above normal. These thresholds are experimental, and may change in the future.
val DEFAULT_SPARK_MEMORY_THRESHOLDS =
SeverityThresholds(low = MemoryFormatUtils.stringToBytes("10G"), MemoryFormatUtils.stringToBytes("15G"),
severe = MemoryFormatUtils.stringToBytes("20G"), critical = MemoryFormatUtils.stringToBytes("25G"), ascending = true)
val DEFAULT_SPARK_CORES_THRESHOLDS =
SeverityThresholds(low = 4, moderate = 6, severe = 8, critical = 10, ascending = true)

val severityExecutorMemory = DEFAULT_SPARK_MEMORY_THRESHOLDS.severityOf(executorMemoryBytes.getOrElse(0).asInstanceOf[Number].longValue)
val severityDriverMemory = DEFAULT_SPARK_MEMORY_THRESHOLDS.severityOf(driverMemoryBytes.getOrElse(0).asInstanceOf[Number].longValue)
val severityDriverCores = DEFAULT_SPARK_CORES_THRESHOLDS.severityOf(driverCores.getOrElse(0).asInstanceOf[Number].intValue)
val severityExecutorCores = DEFAULT_SPARK_CORES_THRESHOLDS.severityOf(executorCores.getOrElse(0).asInstanceOf[Number].intValue)
val severityMinExecutors = if (dynamicMinExecutors.getOrElse(0).asInstanceOf[Number].intValue > THRESHOLD_MIN_EXECUTORS) {
Severity.CRITICAL
Expand All @@ -233,12 +219,10 @@ object ConfigurationHeuristic {
Severity.NONE
}
val severityExecutorMemoryOverhead = configurationHeuristic.sparkOverheadMemoryThreshold.severityOf(MemoryFormatUtils.stringToBytes(sparkYarnExecutorMemoryOverhead))
val severityDriverMemoryOverhead = configurationHeuristic.sparkOverheadMemoryThreshold.severityOf(MemoryFormatUtils.stringToBytes(sparkYarnDriverMemoryOverhead))


//Severity for the configuration thresholds
val severityConfThresholds: Severity = Severity.max(severityDriverCores, severityDriverMemory, severityExecutorCores, severityExecutorMemory,
severityMinExecutors, severityMaxExecutors, jarsSeverity, severityExecutorMemoryOverhead, severityDriverMemoryOverhead)
val severityConfThresholds: Severity = Severity.max(severityExecutorCores, severityExecutorMemory,
severityMinExecutors, severityMaxExecutors, jarsSeverity, severityExecutorMemoryOverhead)

/**
* The following logic computes severity based on shuffle service and dynamic allocation flags.
Expand Down
Loading

0 comments on commit 25c07bb

Please sign in to comment.