Skip to content

Commit

Permalink
Updates Spark configuration heuristic severity calculations (#229)
Browse files Browse the repository at this point in the history
  • Loading branch information
shkhrgpt authored and akshayrai committed Apr 20, 2017
1 parent fa166d3 commit 0f2a38a
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.linkedin.drelephant.spark.heuristics

import java.util.ArrayList

import com.linkedin.drelephant.math.Statistics

import scala.collection.JavaConverters
Expand All @@ -42,9 +44,6 @@ class ConfigurationHeuristic(private val heuristicConfigurationData: HeuristicCo
Option(heuristicConfigurationData.getParamMap.get(SERIALIZER_IF_NON_NULL_RECOMMENDATION_KEY))
.getOrElse(DEFAULT_SERIALIZER_IF_NON_NULL_RECOMMENDATION)

val serializerIfNonNullSeverityIfRecommendationUnmet: Severity =
DEFAULT_SERIALIZER_IF_NON_NULL_SEVERITY_IF_RECOMMENDATION_UNMET

override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData

override def apply(data: SparkApplicationData): HeuristicResult = {
Expand Down Expand Up @@ -75,17 +74,27 @@ class ConfigurationHeuristic(private val heuristicConfigurationData: HeuristicCo
evaluator.applicationDuration.toString + " Seconds"
),
new HeuristicResultDetails(
SPARK_SERIALIZER_KEY,
formatProperty(evaluator.serializer)
SPARK_DYNAMIC_ALLOCATION_ENABLED,
formatProperty(evaluator.isDynamicAllocationEnabled.map(_.toString))
)
)
// Constructing a mutable ArrayList for resultDetails, otherwise addResultDetail method HeuristicResult cannot be used.
val mutableResultDetailsArrayList = new ArrayList(resultDetails.asJava)
val result = new HeuristicResult(
heuristicConfigurationData.getClassName,
heuristicConfigurationData.getHeuristicName,
evaluator.severity,
0,
resultDetails.asJava
mutableResultDetailsArrayList
)
if (evaluator.serializerSeverity != Severity.NONE) {
result.addResultDetail(SPARK_SERIALIZER_KEY, formatProperty(evaluator.serializer),
"KyroSerializer is Not Enabled.")
}
if (evaluator.shuffleAndDynamicAllocationSeverity != Severity.NONE) {
result.addResultDetail(SPARK_SHUFFLE_SERVICE_ENABLED, formatProperty(evaluator.isShuffleServiceEnabled.map(_.toString)),
"Spark shuffle service is not enabled.")
}
result
}
}
Expand All @@ -102,6 +111,8 @@ object ConfigurationHeuristic {
val SPARK_EXECUTOR_CORES_KEY = "spark.executor.cores"
val SPARK_SERIALIZER_KEY = "spark.serializer"
val SPARK_APPLICATION_DURATION = "spark.application.duration"
val SPARK_SHUFFLE_SERVICE_ENABLED = "spark.shuffle.service.enabled"
val SPARK_DYNAMIC_ALLOCATION_ENABLED = "spark.dynamicAllocation.enabled"

class Evaluator(configurationHeuristic: ConfigurationHeuristic, data: SparkApplicationData) {
lazy val appConfigurationProperties: Map[String, String] =
Expand All @@ -127,18 +138,34 @@ object ConfigurationHeuristic {

lazy val serializer: Option[String] = getProperty(SPARK_SERIALIZER_KEY)

/**
* If the serializer is either not configured or not equal to KryoSerializer, then the severity will be moderate.
*/

lazy val serializerSeverity: Severity = serializer match {
case None => Severity.NONE
case None => Severity.MODERATE
case Some(`serializerIfNonNullRecommendation`) => Severity.NONE
case Some(_) => serializerIfNonNullSeverityIfRecommendationUnmet
case Some(_) => DEFAULT_SERIALIZER_IF_NON_NULL_SEVERITY_IF_RECOMMENDATION_UNMET
}

lazy val severity: Severity = serializerSeverity
/**
* The following logic computes severity based on shuffle service and dynamic allocation flags.
* If dynamic allocation is disabled, then the severity will be MODERATE if shuffle service is disabled or not specified.
* If dynamic allocation is enabled, then the severity will be SEVERE if shuffle service is disabled or not specified.
*/

private val serializerIfNonNullRecommendation: String = configurationHeuristic.serializerIfNonNullRecommendation
lazy val isDynamicAllocationEnabled: Option[Boolean] = Some(getProperty(SPARK_DYNAMIC_ALLOCATION_ENABLED).exists(_.toBoolean == true))
lazy val isShuffleServiceEnabled: Option[Boolean] = Some(getProperty(SPARK_SHUFFLE_SERVICE_ENABLED).exists(_.toBoolean == true))

lazy val shuffleAndDynamicAllocationSeverity = (isDynamicAllocationEnabled, isShuffleServiceEnabled) match {
case (_, Some(true)) => Severity.NONE
case (Some(false), Some(false)) => Severity.MODERATE
case (Some(true), Some(false)) => Severity.SEVERE
}

private val serializerIfNonNullSeverityIfRecommendationUnmet: Severity =
configurationHeuristic.serializerIfNonNullSeverityIfRecommendationUnmet
lazy val severity: Severity = Severity.max(serializerSeverity, shuffleAndDynamicAllocationSeverity)

private val serializerIfNonNullRecommendation: String = configurationHeuristic.serializerIfNonNullRecommendation

private def getProperty(key: String): Option[String] = appConfigurationProperties.get(key)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,26 @@ class ConfigurationHeuristicTest extends FunSpec with Matchers {

val configurationHeuristic = new ConfigurationHeuristic(heuristicConfigurationData)

describe(".apply") {
describe("apply with NO Severity") {
val configurationProperties = Map(
"spark.serializer" -> "org.apache.spark.serializer.KryoSerializer",
"spark.storage.memoryFraction" -> "0.3",
"spark.driver.memory" -> "2G",
"spark.executor.instances" -> "900",
"spark.executor.memory" -> "1g",
"spark.shuffle.memoryFraction" -> "0.5"
"spark.shuffle.memoryFraction" -> "0.5",
"spark.shuffle.service.enabled" -> "true",
"spark.dynamicAllocation.enabled" -> "true"
)

val data = newFakeSparkApplicationData(configurationProperties)
val heuristicResult = configurationHeuristic.apply(data)
val heuristicResultDetails = heuristicResult.getHeuristicResultDetails

it("returns the size of result details") {
heuristicResultDetails.size() should be(6)
}

it("returns the severity") {
heuristicResult.getSeverity should be(Severity.NONE)
}
Expand Down Expand Up @@ -89,10 +95,50 @@ class ConfigurationHeuristicTest extends FunSpec with Matchers {
details.getValue should include("10")
}

it("returns the serializer") {
it("returns the dynamic allocation flag") {
val details = heuristicResultDetails.get(5)
details.getName should include("spark.dynamicAllocation.enabled")
details.getValue should be("true")
}
}

describe("apply with Severity") {
val configurationProperties = Map(
"spark.serializer" -> "dummySerializer",
"spark.shuffle.service.enabled" -> "false",
"spark.dynamicAllocation.enabled" -> "true"
)

val data = newFakeSparkApplicationData(configurationProperties)
val heuristicResult = configurationHeuristic.apply(data)
val heuristicResultDetails = heuristicResult.getHeuristicResultDetails

it("returns the size of result details") {
heuristicResultDetails.size() should be(8)
}

it("returns the severity") {
heuristicResult.getSeverity should be(Severity.SEVERE)
}

it("returns the dynamic allocation flag") {
val details = heuristicResultDetails.get(5)
details.getName should include("spark.dynamicAllocation.enabled")
details.getValue should be("true")
}

it("returns the serializer") {
val details = heuristicResultDetails.get(6)
details.getName should include("spark.serializer")
details.getValue should be("org.apache.spark.serializer.KryoSerializer")
details.getValue should be("dummySerializer")
details.getDetails should be("KyroSerializer is Not Enabled.")
}

it("returns the shuffle service flag") {
val details = heuristicResultDetails.get(7)
details.getName should include("spark.shuffle.service.enabled")
details.getValue should be("false")
details.getDetails should be("Spark shuffle service is not enabled.")
}
}

Expand Down Expand Up @@ -148,34 +194,78 @@ class ConfigurationHeuristicTest extends FunSpec with Matchers {
evaluator.serializer should be(Some("org.apache.spark.serializer.KryoSerializer"))
}

it("has no serializer when it's absent") {
it("has no serializer, dynamic allocation flag, and shuffle flag when they are absent") {
val evaluator = newEvaluatorWithConfigurationProperties(Map.empty)
evaluator.serializer should be(None)
evaluator.isDynamicAllocationEnabled should be(Some(false))
evaluator.isShuffleServiceEnabled should be(Some(false))
evaluator.serializerSeverity should be(Severity.MODERATE)
evaluator.shuffleAndDynamicAllocationSeverity should be(Severity.MODERATE)
evaluator.severity should be(Severity.MODERATE)
}

it("has the severity of the serializer setting when it matches our recommendation") {
it("has no dynamic allocation flag and shuffle flag, and serializer setting matches our recommendation") {
val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.serializer" -> "org.apache.spark.serializer.KryoSerializer"))
evaluator.serializer should be(Some("org.apache.spark.serializer.KryoSerializer"))
evaluator.isDynamicAllocationEnabled should be(Some(false))
evaluator.isShuffleServiceEnabled should be(Some(false))
evaluator.serializerSeverity should be(Severity.NONE)
evaluator.shuffleAndDynamicAllocationSeverity should be(Severity.MODERATE)
evaluator.severity should be(Severity.MODERATE)
}

it("has the severity of the serializer setting when it doesn't match our recommendation and is non-null") {
it("has no dynamic allocation flag and shuffle flag, and serializer setting doesn't match our recommendation and is non-null") {
val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.serializer" -> "org.apache.spark.serializer.FooSerializer"))
evaluator.serializer should be(Some("org.apache.spark.serializer.FooSerializer"))
evaluator.isDynamicAllocationEnabled should be(Some(false))
evaluator.isShuffleServiceEnabled should be(Some(false))
evaluator.serializerSeverity should be(Severity.MODERATE)
evaluator.shuffleAndDynamicAllocationSeverity should be(Severity.MODERATE)
evaluator.severity should be(Severity.MODERATE)
}

it("has the severity of the serializer setting when it is null") {
val evaluator = newEvaluatorWithConfigurationProperties(Map.empty)
it("true dynamic allocation flag and shuffle flag, and serializer setting matches our recommendation") {
val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.serializer" -> "org.apache.spark.serializer.KryoSerializer",
"spark.shuffle.service.enabled" -> "true", "spark.dynamicAllocation.enabled" -> "true"))
evaluator.serializer should be(Some("org.apache.spark.serializer.KryoSerializer"))
evaluator.isDynamicAllocationEnabled should be(Some(true))
evaluator.isShuffleServiceEnabled should be(Some(true))
evaluator.serializerSeverity should be(Severity.NONE)
evaluator.shuffleAndDynamicAllocationSeverity should be(Severity.NONE)
evaluator.severity should be(Severity.NONE)
}

it("computes the overall severity when there are some issues") {
val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.serializer" -> "org.apache.spark.serializer.FooSerializer"))
it("true dynamic allocation flag and shuffle flag, and serializer setting is absent") {
val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.shuffle.service.enabled" -> "true",
"spark.dynamicAllocation.enabled" -> "true"))
evaluator.serializer should be(None)
evaluator.isDynamicAllocationEnabled should be(Some(true))
evaluator.isShuffleServiceEnabled should be(Some(true))
evaluator.serializerSeverity should be(Severity.MODERATE)
evaluator.shuffleAndDynamicAllocationSeverity should be(Severity.NONE)
evaluator.severity should be(Severity.MODERATE)
}

it("computes the overall severity when there are no issues") {
val evaluator = newEvaluatorWithConfigurationProperties(Map.empty)
evaluator.severity should be(Severity.NONE)
it("true dynamic allocation flag and false shuffle flag, and serializer setting matches our recommendation") {
val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.serializer" -> "org.apache.spark.serializer.KryoSerializer",
"spark.shuffle.service.enabled" -> "false", "spark.dynamicAllocation.enabled" -> "true"))
evaluator.serializer should be(Some("org.apache.spark.serializer.KryoSerializer"))
evaluator.isDynamicAllocationEnabled should be(Some(true))
evaluator.isShuffleServiceEnabled should be(Some(false))
evaluator.serializerSeverity should be(Severity.NONE)
evaluator.shuffleAndDynamicAllocationSeverity should be(Severity.SEVERE)
evaluator.severity should be(Severity.SEVERE)
}

it("false dynamic allocation flag and shuffle flag, and serializer setting matches our recommendation") {
val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.serializer" -> "org.apache.spark.serializer.KryoSerializer",
"spark.shuffle.service.enabled" -> "false", "spark.dynamicAllocation.enabled" -> "false"))
evaluator.serializer should be(Some("org.apache.spark.serializer.KryoSerializer"))
evaluator.isDynamicAllocationEnabled should be(Some(false))
evaluator.isShuffleServiceEnabled should be(Some(false))
evaluator.serializerSeverity should be(Severity.NONE)
evaluator.shuffleAndDynamicAllocationSeverity should be(Severity.MODERATE)
evaluator.severity should be(Severity.MODERATE)
}
}
}
Expand Down

0 comments on commit 0f2a38a

Please sign in to comment.