From 2026c68540318536ac1f47f9b77770a44b6ca3e8 Mon Sep 17 00:00:00 2001 From: forrestchen Date: Thu, 11 Jan 2018 15:17:45 +0800 Subject: [PATCH] emit shuffle pod watch exception Signed-off-by: forrestchen --- .../k8s/KubernetesExternalShuffleManager.scala | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala index 388e2b17f4fdd..7b6d7d345bc7a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala @@ -106,18 +106,14 @@ private[spark] class KubernetesExternalShuffleManagerImpl( } private def addShufflePodToCache(pod: Pod): Unit = shufflePodCache.synchronized { - if (shufflePodCache.contains(pod.getSpec.getNodeName)) { - val registeredPodName = shufflePodCache.get(pod.getSpec.getNodeName).get - logError(s"Ambiguous specification of shuffle service pod. " + - s"Found multiple matching pods: ${pod.getMetadata.getName}, " + - s"${registeredPodName} on ${pod.getSpec.getNodeName}") - - throw new SparkException(s"Ambiguous specification of shuffle service pod. " + - s"Found multiple matching pods: ${pod.getMetadata.getName}, " + - s"${registeredPodName} on ${pod.getSpec.getNodeName}") - } else { - shufflePodCache(pod.getSpec.getNodeName) = pod.getStatus.getPodIP + if (shufflePodCache.exists(kv => kv._1 == pod.getSpec.getNodeName + && kv._2 != pod.getStatus.getPodIP)) { + val registeredPodIP = shufflePodCache(pod.getSpec.getNodeName) + logWarning(s"Ambiguous specification of shuffle service pod. " + + s"Found multiple matching pods: ${pod.getMetadata.getName}(${pod.getStatus.getPodIP}), " + + s"$registeredPodIP on ${pod.getSpec.getNodeName}, will update") } + shufflePodCache(pod.getSpec.getNodeName) = pod.getStatus.getPodIP } override def stop(): Unit = {