From 0f2a38a8530960cda21be682b66b7798f94ececb Mon Sep 17 00:00:00 2001 From: Shekhar Gupta Date: Wed, 19 Apr 2017 22:24:49 -0700 Subject: [PATCH] Updates Spark configuration heuristic severity calculations (#229) --- .../heuristics/ConfigurationHeuristic.scala | 51 ++++++-- .../ConfigurationHeuristicTest.scala | 118 +++++++++++++++--- 2 files changed, 143 insertions(+), 26 deletions(-) diff --git a/app/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristic.scala index 6c8f7a351..c4aab51df 100644 --- a/app/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristic.scala +++ b/app/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristic.scala @@ -16,6 +16,8 @@ package com.linkedin.drelephant.spark.heuristics +import java.util.ArrayList + import com.linkedin.drelephant.math.Statistics import scala.collection.JavaConverters @@ -42,9 +44,6 @@ class ConfigurationHeuristic(private val heuristicConfigurationData: HeuristicCo Option(heuristicConfigurationData.getParamMap.get(SERIALIZER_IF_NON_NULL_RECOMMENDATION_KEY)) .getOrElse(DEFAULT_SERIALIZER_IF_NON_NULL_RECOMMENDATION) - val serializerIfNonNullSeverityIfRecommendationUnmet: Severity = - DEFAULT_SERIALIZER_IF_NON_NULL_SEVERITY_IF_RECOMMENDATION_UNMET - override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData override def apply(data: SparkApplicationData): HeuristicResult = { @@ -75,17 +74,27 @@ class ConfigurationHeuristic(private val heuristicConfigurationData: HeuristicCo evaluator.applicationDuration.toString + " Seconds" ), new HeuristicResultDetails( - SPARK_SERIALIZER_KEY, - formatProperty(evaluator.serializer) + SPARK_DYNAMIC_ALLOCATION_ENABLED, + formatProperty(evaluator.isDynamicAllocationEnabled.map(_.toString)) ) ) + // Constructing a mutable ArrayList for resultDetails, otherwise addResultDetail method HeuristicResult cannot be used. + val mutableResultDetailsArrayList = new ArrayList(resultDetails.asJava) val result = new HeuristicResult( heuristicConfigurationData.getClassName, heuristicConfigurationData.getHeuristicName, evaluator.severity, 0, - resultDetails.asJava + mutableResultDetailsArrayList ) + if (evaluator.serializerSeverity != Severity.NONE) { + result.addResultDetail(SPARK_SERIALIZER_KEY, formatProperty(evaluator.serializer), + "KyroSerializer is Not Enabled.") + } + if (evaluator.shuffleAndDynamicAllocationSeverity != Severity.NONE) { + result.addResultDetail(SPARK_SHUFFLE_SERVICE_ENABLED, formatProperty(evaluator.isShuffleServiceEnabled.map(_.toString)), + "Spark shuffle service is not enabled.") + } result } } @@ -102,6 +111,8 @@ object ConfigurationHeuristic { val SPARK_EXECUTOR_CORES_KEY = "spark.executor.cores" val SPARK_SERIALIZER_KEY = "spark.serializer" val SPARK_APPLICATION_DURATION = "spark.application.duration" + val SPARK_SHUFFLE_SERVICE_ENABLED = "spark.shuffle.service.enabled" + val SPARK_DYNAMIC_ALLOCATION_ENABLED = "spark.dynamicAllocation.enabled" class Evaluator(configurationHeuristic: ConfigurationHeuristic, data: SparkApplicationData) { lazy val appConfigurationProperties: Map[String, String] = @@ -127,18 +138,34 @@ object ConfigurationHeuristic { lazy val serializer: Option[String] = getProperty(SPARK_SERIALIZER_KEY) + /** + * If the serializer is either not configured or not equal to KryoSerializer, then the severity will be moderate. + */ + lazy val serializerSeverity: Severity = serializer match { - case None => Severity.NONE + case None => Severity.MODERATE case Some(`serializerIfNonNullRecommendation`) => Severity.NONE - case Some(_) => serializerIfNonNullSeverityIfRecommendationUnmet + case Some(_) => DEFAULT_SERIALIZER_IF_NON_NULL_SEVERITY_IF_RECOMMENDATION_UNMET } - lazy val severity: Severity = serializerSeverity + /** + * The following logic computes severity based on shuffle service and dynamic allocation flags. + * If dynamic allocation is disabled, then the severity will be MODERATE if shuffle service is disabled or not specified. + * If dynamic allocation is enabled, then the severity will be SEVERE if shuffle service is disabled or not specified. + */ - private val serializerIfNonNullRecommendation: String = configurationHeuristic.serializerIfNonNullRecommendation + lazy val isDynamicAllocationEnabled: Option[Boolean] = Some(getProperty(SPARK_DYNAMIC_ALLOCATION_ENABLED).exists(_.toBoolean == true)) + lazy val isShuffleServiceEnabled: Option[Boolean] = Some(getProperty(SPARK_SHUFFLE_SERVICE_ENABLED).exists(_.toBoolean == true)) + + lazy val shuffleAndDynamicAllocationSeverity = (isDynamicAllocationEnabled, isShuffleServiceEnabled) match { + case (_, Some(true)) => Severity.NONE + case (Some(false), Some(false)) => Severity.MODERATE + case (Some(true), Some(false)) => Severity.SEVERE + } - private val serializerIfNonNullSeverityIfRecommendationUnmet: Severity = - configurationHeuristic.serializerIfNonNullSeverityIfRecommendationUnmet + lazy val severity: Severity = Severity.max(serializerSeverity, shuffleAndDynamicAllocationSeverity) + + private val serializerIfNonNullRecommendation: String = configurationHeuristic.serializerIfNonNullRecommendation private def getProperty(key: String): Option[String] = appConfigurationProperties.get(key) } diff --git a/test/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristicTest.scala index e10870883..8d189a89e 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristicTest.scala @@ -41,20 +41,26 @@ class ConfigurationHeuristicTest extends FunSpec with Matchers { val configurationHeuristic = new ConfigurationHeuristic(heuristicConfigurationData) - describe(".apply") { + describe("apply with NO Severity") { val configurationProperties = Map( "spark.serializer" -> "org.apache.spark.serializer.KryoSerializer", "spark.storage.memoryFraction" -> "0.3", "spark.driver.memory" -> "2G", "spark.executor.instances" -> "900", "spark.executor.memory" -> "1g", - "spark.shuffle.memoryFraction" -> "0.5" + "spark.shuffle.memoryFraction" -> "0.5", + "spark.shuffle.service.enabled" -> "true", + "spark.dynamicAllocation.enabled" -> "true" ) val data = newFakeSparkApplicationData(configurationProperties) val heuristicResult = configurationHeuristic.apply(data) val heuristicResultDetails = heuristicResult.getHeuristicResultDetails + it("returns the size of result details") { + heuristicResultDetails.size() should be(6) + } + it("returns the severity") { heuristicResult.getSeverity should be(Severity.NONE) } @@ -89,10 +95,50 @@ class ConfigurationHeuristicTest extends FunSpec with Matchers { details.getValue should include("10") } - it("returns the serializer") { + it("returns the dynamic allocation flag") { + val details = heuristicResultDetails.get(5) + details.getName should include("spark.dynamicAllocation.enabled") + details.getValue should be("true") + } + } + + describe("apply with Severity") { + val configurationProperties = Map( + "spark.serializer" -> "dummySerializer", + "spark.shuffle.service.enabled" -> "false", + "spark.dynamicAllocation.enabled" -> "true" + ) + + val data = newFakeSparkApplicationData(configurationProperties) + val heuristicResult = configurationHeuristic.apply(data) + val heuristicResultDetails = heuristicResult.getHeuristicResultDetails + + it("returns the size of result details") { + heuristicResultDetails.size() should be(8) + } + + it("returns the severity") { + heuristicResult.getSeverity should be(Severity.SEVERE) + } + + it("returns the dynamic allocation flag") { val details = heuristicResultDetails.get(5) + details.getName should include("spark.dynamicAllocation.enabled") + details.getValue should be("true") + } + + it("returns the serializer") { + val details = heuristicResultDetails.get(6) details.getName should include("spark.serializer") - details.getValue should be("org.apache.spark.serializer.KryoSerializer") + details.getValue should be("dummySerializer") + details.getDetails should be("KyroSerializer is Not Enabled.") + } + + it("returns the shuffle service flag") { + val details = heuristicResultDetails.get(7) + details.getName should include("spark.shuffle.service.enabled") + details.getValue should be("false") + details.getDetails should be("Spark shuffle service is not enabled.") } } @@ -148,34 +194,78 @@ class ConfigurationHeuristicTest extends FunSpec with Matchers { evaluator.serializer should be(Some("org.apache.spark.serializer.KryoSerializer")) } - it("has no serializer when it's absent") { + it("has no serializer, dynamic allocation flag, and shuffle flag when they are absent") { val evaluator = newEvaluatorWithConfigurationProperties(Map.empty) evaluator.serializer should be(None) + evaluator.isDynamicAllocationEnabled should be(Some(false)) + evaluator.isShuffleServiceEnabled should be(Some(false)) + evaluator.serializerSeverity should be(Severity.MODERATE) + evaluator.shuffleAndDynamicAllocationSeverity should be(Severity.MODERATE) + evaluator.severity should be(Severity.MODERATE) } - it("has the severity of the serializer setting when it matches our recommendation") { + it("has no dynamic allocation flag and shuffle flag, and serializer setting matches our recommendation") { val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.serializer" -> "org.apache.spark.serializer.KryoSerializer")) + evaluator.serializer should be(Some("org.apache.spark.serializer.KryoSerializer")) + evaluator.isDynamicAllocationEnabled should be(Some(false)) + evaluator.isShuffleServiceEnabled should be(Some(false)) evaluator.serializerSeverity should be(Severity.NONE) + evaluator.shuffleAndDynamicAllocationSeverity should be(Severity.MODERATE) + evaluator.severity should be(Severity.MODERATE) } - it("has the severity of the serializer setting when it doesn't match our recommendation and is non-null") { + it("has no dynamic allocation flag and shuffle flag, and serializer setting doesn't match our recommendation and is non-null") { val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.serializer" -> "org.apache.spark.serializer.FooSerializer")) + evaluator.serializer should be(Some("org.apache.spark.serializer.FooSerializer")) + evaluator.isDynamicAllocationEnabled should be(Some(false)) + evaluator.isShuffleServiceEnabled should be(Some(false)) evaluator.serializerSeverity should be(Severity.MODERATE) + evaluator.shuffleAndDynamicAllocationSeverity should be(Severity.MODERATE) + evaluator.severity should be(Severity.MODERATE) } - it("has the severity of the serializer setting when it is null") { - val evaluator = newEvaluatorWithConfigurationProperties(Map.empty) + it("true dynamic allocation flag and shuffle flag, and serializer setting matches our recommendation") { + val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.serializer" -> "org.apache.spark.serializer.KryoSerializer", + "spark.shuffle.service.enabled" -> "true", "spark.dynamicAllocation.enabled" -> "true")) + evaluator.serializer should be(Some("org.apache.spark.serializer.KryoSerializer")) + evaluator.isDynamicAllocationEnabled should be(Some(true)) + evaluator.isShuffleServiceEnabled should be(Some(true)) evaluator.serializerSeverity should be(Severity.NONE) + evaluator.shuffleAndDynamicAllocationSeverity should be(Severity.NONE) + evaluator.severity should be(Severity.NONE) } - it("computes the overall severity when there are some issues") { - val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.serializer" -> "org.apache.spark.serializer.FooSerializer")) + it("true dynamic allocation flag and shuffle flag, and serializer setting is absent") { + val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.shuffle.service.enabled" -> "true", + "spark.dynamicAllocation.enabled" -> "true")) + evaluator.serializer should be(None) + evaluator.isDynamicAllocationEnabled should be(Some(true)) + evaluator.isShuffleServiceEnabled should be(Some(true)) + evaluator.serializerSeverity should be(Severity.MODERATE) + evaluator.shuffleAndDynamicAllocationSeverity should be(Severity.NONE) evaluator.severity should be(Severity.MODERATE) } - it("computes the overall severity when there are no issues") { - val evaluator = newEvaluatorWithConfigurationProperties(Map.empty) - evaluator.severity should be(Severity.NONE) + it("true dynamic allocation flag and false shuffle flag, and serializer setting matches our recommendation") { + val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.serializer" -> "org.apache.spark.serializer.KryoSerializer", + "spark.shuffle.service.enabled" -> "false", "spark.dynamicAllocation.enabled" -> "true")) + evaluator.serializer should be(Some("org.apache.spark.serializer.KryoSerializer")) + evaluator.isDynamicAllocationEnabled should be(Some(true)) + evaluator.isShuffleServiceEnabled should be(Some(false)) + evaluator.serializerSeverity should be(Severity.NONE) + evaluator.shuffleAndDynamicAllocationSeverity should be(Severity.SEVERE) + evaluator.severity should be(Severity.SEVERE) + } + + it("false dynamic allocation flag and shuffle flag, and serializer setting matches our recommendation") { + val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.serializer" -> "org.apache.spark.serializer.KryoSerializer", + "spark.shuffle.service.enabled" -> "false", "spark.dynamicAllocation.enabled" -> "false")) + evaluator.serializer should be(Some("org.apache.spark.serializer.KryoSerializer")) + evaluator.isDynamicAllocationEnabled should be(Some(false)) + evaluator.isShuffleServiceEnabled should be(Some(false)) + evaluator.serializerSeverity should be(Severity.NONE) + evaluator.shuffleAndDynamicAllocationSeverity should be(Severity.MODERATE) + evaluator.severity should be(Severity.MODERATE) } } }