diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index fbf10c2ab53..36e60d89aaf 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -192,9 +192,10 @@ public void updateRegisterWorkerMeta( synchronized (workers) { if (!workers.contains(workerInfo)) { workers.add(workerInfo); - shutdownWorkers.remove(workerInfo); - lostWorkers.remove(workerInfo); } + shutdownWorkers.remove(workerInfo); + lostWorkers.remove(workerInfo); + excludedWorkers.remove(workerInfo); } } diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 39e030e9853..b06d4684c52 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -523,7 +523,9 @@ private[celeborn] class Master( if (workersSnapShot.contains(workerToRegister)) { logWarning(s"Receive RegisterWorker while worker" + s" ${workerToRegister.toString()} already exists, re-register.") + // TODO: remove `WorkerRemove` because we have improve register logic to cover `WorkerRemove` statusSystem.handleWorkerRemove(host, rpcPort, pushPort, fetchPort, replicatePort, requestId) + val newRequestId = MasterClient.genRequestId() statusSystem.handleRegisterWorker( host, rpcPort, @@ -532,7 +534,7 @@ private[celeborn] class Master( replicatePort, disks, userResourceConsumption, - requestId) + newRequestId) context.reply(RegisterWorkerResponse(true, "Worker in snapshot, re-register.")) } else if (statusSystem.workerLostEvents.contains(workerToRegister)) { logWarning(s"Receive RegisterWorker while worker $workerToRegister " +