diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index d30c88fcc74bf..9646decded844 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -32,6 +32,7 @@ import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkEnv, SparkException} import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.internal.config._ import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, RpcEnv} import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, SparkAppConfig} @@ -54,6 +55,8 @@ private[spark] class KubernetesClusterSchedulerBackend( private val RUNNING_EXECUTOR_PODS_LOCK = new Object // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Executors names with failed status and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val failedExecutors = new mutable.HashSet[String] // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. private val runningPodsToExecutors = new mutable.HashMap[String, String] private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() @@ -114,19 +117,20 @@ private[spark] class KubernetesClusterSchedulerBackend( override def run(): Unit = { handleDisconnectedExecutors() RUNNING_EXECUTOR_PODS_LOCK.synchronized { - if (totalRegisteredExecutors.get() < runningExecutorsToPods.size) { + if (totalRegisteredExecutors.get() < runningExecutorSize()) { logDebug("Waiting for pending executors before scaling") - } else if (totalExpectedExecutors.get() <= runningExecutorsToPods.size) { + } else if (totalExpectedExecutors.get() <= runningExecutorSize()) { logDebug("Maximum allowed executor limit reached. Not scaling up further.") } else { val nodeToLocalTaskCount = getNodesWithLocalTaskCounts for (i <- 0 until math.min( - totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) { + totalExpectedExecutors.get - runningExecutorSize(), podAllocationSize)) { val (executorId, pod) = allocateNewExecutorPod(nodeToLocalTaskCount) runningExecutorsToPods.put(executorId, pod) runningPodsToExecutors.put(pod.getMetadata.getName, executorId) logInfo( - s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}") + s"Requesting a new executor $executorId, total executors is now " + + s"${runningExecutorSize()} (${failedExecutors.size} failed)") } } } @@ -172,9 +176,33 @@ private[spark] class KubernetesClusterSchedulerBackend( runningExecutorsToPods.remove(executorId).map { pod => kubernetesClient.pods().delete(pod) runningPodsToExecutors.remove(pod.getMetadata.getName) + failedExecutors -= pod.getMetadata.getName }.getOrElse(logWarning(s"Unable to remove pod for unknown executor $executorId")) } } + + // It represent current created executors exclude failed one. + // To avoid create too many failed executor, + // we limit the accounting size of failed executors to maxNumExecutorFailures + // So after create totalExpectedExecutors + maxNumExecutorFailures executors, + // we stop create more even if all of them failed + def runningExecutorSize(): Int = runningExecutorsToPods.size - + math.min(failedExecutors.size, maxNumExecutorFailures) + + // Default to twice the number of executors (twice the maximum number of executors if dynamic + // allocation is enabled), with a minimum of 3. + val maxNumExecutorFailures = { + val effectiveNumExecutors = + if (Utils.isDynamicAllocationEnabled(conf)) { + conf.get(DYN_ALLOCATION_MAX_EXECUTORS) + } else { + conf.get(EXECUTOR_INSTANCES).getOrElse(0) + } + // By default, effectiveNumExecutors is Int.MaxValue if dynamic allocation is enabled. We need + // avoid the integer overflow here. + math.max(3, + if (effectiveNumExecutors > Int.MaxValue / 2) Int.MaxValue else 2 * effectiveNumExecutors) + } } private def getInitialTargetExecutorNumber(defaultNumExecutors: Int = 1): Int = { @@ -233,6 +261,7 @@ private[spark] class KubernetesClusterSchedulerBackend( runningExecutorsToPods.values.foreach(kubernetesClient.pods().delete(_)) runningExecutorsToPods.clear() runningPodsToExecutors.clear() + failedExecutors.clear() } executorPodsByIPs.clear() val resource = executorWatchResource.getAndSet(null) @@ -311,6 +340,7 @@ private[spark] class KubernetesClusterSchedulerBackend( kubernetesClient.pods().delete(executorPod) disconnectedPodsByExecutorIdPendingRemoval.put(executor, executorPod) runningPodsToExecutors.remove(executorPod.getMetadata.getName) + failedExecutors -= executorPod.getMetadata.getName } if (maybeRemovedExecutor.isEmpty) { logWarning(s"Unable to remove pod for unknown executor $executor") @@ -354,6 +384,10 @@ private[spark] class KubernetesClusterSchedulerBackend( logInfo(s"Received delete pod $podName event. Reason: " + pod.getStatus.getReason) handleDeletedPod(pod) } + } else if (action == Action.MODIFIED && pod.getStatus.getPhase == "Failed") { + logError(s"Executor pod ${pod.getMetadata.getName} failed with container status " + + s"${pod.getStatus.getContainerStatuses}") + handleFailedPod(pod) } } @@ -407,6 +441,13 @@ private[spark] class KubernetesClusterSchedulerBackend( podsWithKnownExitReasons.put(pod.getMetadata.getName, exitReason) } + def handleFailedPod(pod: Pod): Unit = { + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + failedExecutors += pod.getMetadata.getName + } + handleErroredPod(pod) + } + def handleDeletedPod(pod: Pod): Unit = { val exitMessage = if (isPodAlreadyReleased(pod)) { s"Container in pod ${pod.getMetadata.getName} exited from explicit termination request."