Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

handle failed executor event #602

Open
wants to merge 1 commit into
base: branch-2.2-kubernetes
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import scala.concurrent.{ExecutionContext, Future}
import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.internal.config._
import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, RpcEnv}
import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, SparkAppConfig}
Expand All @@ -54,6 +55,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val RUNNING_EXECUTOR_PODS_LOCK = new Object
// Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
// Executors names with failed status and guarded by RUNNING_EXECUTOR_PODS_LOCK.
private val failedExecutors = new mutable.HashSet[String]
// Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK.
private val runningPodsToExecutors = new mutable.HashMap[String, String]
private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
Expand Down Expand Up @@ -114,19 +117,20 @@ private[spark] class KubernetesClusterSchedulerBackend(
override def run(): Unit = {
handleDisconnectedExecutors()
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
if (totalRegisteredExecutors.get() < runningExecutorsToPods.size) {
if (totalRegisteredExecutors.get() < runningExecutorSize()) {
logDebug("Waiting for pending executors before scaling")
} else if (totalExpectedExecutors.get() <= runningExecutorsToPods.size) {
} else if (totalExpectedExecutors.get() <= runningExecutorSize()) {
logDebug("Maximum allowed executor limit reached. Not scaling up further.")
} else {
val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
for (i <- 0 until math.min(
totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) {
totalExpectedExecutors.get - runningExecutorSize(), podAllocationSize)) {
val (executorId, pod) = allocateNewExecutorPod(nodeToLocalTaskCount)
runningExecutorsToPods.put(executorId, pod)
runningPodsToExecutors.put(pod.getMetadata.getName, executorId)
logInfo(
s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}")
s"Requesting a new executor $executorId, total executors is now " +
s"${runningExecutorSize()} (${failedExecutors.size} failed)")
}
}
}
Expand Down Expand Up @@ -172,9 +176,33 @@ private[spark] class KubernetesClusterSchedulerBackend(
runningExecutorsToPods.remove(executorId).map { pod =>
kubernetesClient.pods().delete(pod)
runningPodsToExecutors.remove(pod.getMetadata.getName)
failedExecutors -= pod.getMetadata.getName
}.getOrElse(logWarning(s"Unable to remove pod for unknown executor $executorId"))
}
}

// It represent current created executors exclude failed one.
// To avoid create too many failed executor,
// we limit the accounting size of failed executors to maxNumExecutorFailures
// So after create totalExpectedExecutors + maxNumExecutorFailures executors,
// we stop create more even if all of them failed
def runningExecutorSize(): Int = runningExecutorsToPods.size -
math.min(failedExecutors.size, maxNumExecutorFailures)

// Default to twice the number of executors (twice the maximum number of executors if dynamic
// allocation is enabled), with a minimum of 3.
val maxNumExecutorFailures = {
val effectiveNumExecutors =
if (Utils.isDynamicAllocationEnabled(conf)) {
conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
} else {
conf.get(EXECUTOR_INSTANCES).getOrElse(0)
}
// By default, effectiveNumExecutors is Int.MaxValue if dynamic allocation is enabled. We need
// avoid the integer overflow here.
math.max(3,
if (effectiveNumExecutors > Int.MaxValue / 2) Int.MaxValue else 2 * effectiveNumExecutors)
}
}

private def getInitialTargetExecutorNumber(defaultNumExecutors: Int = 1): Int = {
Expand Down Expand Up @@ -233,6 +261,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
runningExecutorsToPods.values.foreach(kubernetesClient.pods().delete(_))
runningExecutorsToPods.clear()
runningPodsToExecutors.clear()
failedExecutors.clear()
}
executorPodsByIPs.clear()
val resource = executorWatchResource.getAndSet(null)
Expand Down Expand Up @@ -311,6 +340,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
kubernetesClient.pods().delete(executorPod)
disconnectedPodsByExecutorIdPendingRemoval.put(executor, executorPod)
runningPodsToExecutors.remove(executorPod.getMetadata.getName)
failedExecutors -= executorPod.getMetadata.getName
}
if (maybeRemovedExecutor.isEmpty) {
logWarning(s"Unable to remove pod for unknown executor $executor")
Expand Down Expand Up @@ -354,6 +384,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
logInfo(s"Received delete pod $podName event. Reason: " + pod.getStatus.getReason)
handleDeletedPod(pod)
}
} else if (action == Action.MODIFIED && pod.getStatus.getPhase == "Failed") {
logError(s"Executor pod ${pod.getMetadata.getName} failed with container status " +
s"${pod.getStatus.getContainerStatuses}")
handleFailedPod(pod)
}
}

Expand Down Expand Up @@ -407,6 +441,13 @@ private[spark] class KubernetesClusterSchedulerBackend(
podsWithKnownExitReasons.put(pod.getMetadata.getName, exitReason)
}

def handleFailedPod(pod: Pod): Unit = {
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
failedExecutors += pod.getMetadata.getName
}
handleErroredPod(pod)
}

def handleDeletedPod(pod: Pod): Unit = {
val exitMessage = if (isPodAlreadyReleased(pod)) {
s"Container in pod ${pod.getMetadata.getName} exited from explicit termination request."
Expand Down