From bf69e8503d845b6144d7ba7f42a0f63e590dd4b2 Mon Sep 17 00:00:00 2001 From: mingji Date: Mon, 7 Aug 2023 16:27:14 +0800 Subject: [PATCH] [CELEBORN-846][FOLLOWUP] Fix broken link cause by unknown RPC. --- common/src/main/proto/TransportMessages.proto | 12 ++++++- .../protocol/message/ControlMessages.scala | 34 +++++++++++++++++++ .../service/deploy/master/Master.scala | 3 ++ 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index b59ab2c2a92..9855d64aae3 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -25,7 +25,8 @@ enum MessageType { REGISTER_SHUFFLE = 4; REGISTER_SHUFFLE_RESPONSE = 5; REQUEST_SLOTS = 6; - // RELEASE_SLOTS = 7; + // keep it for compatible reason + RELEASE_SLOTS = 7; // RELEASE_SLOTS_RESPONSE = 8; REQUEST_SLOTS_RESPONSE = 9; CHANGE_LOCATION = 10; @@ -192,6 +193,15 @@ message PbSlotInfo { map slot = 1; } +// keep it for compatible reason +message PbReleaseSlots { + string applicationId = 1; + int32 shuffleId = 2; + repeated string workerIds = 3; + repeated PbSlotInfo slots = 4; + string requestId = 6; +} + message PbRequestSlotsResponse { int32 status = 1; map workerResource = 2; diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index c9c6a2f49d3..763160131af 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -170,6 +170,16 @@ object ControlMessages extends Logging { override var requestId: String = ZERO_UUID) extends MasterRequestMessage + @Deprecated + // Keep it for compatible reason + case class ReleaseSlots( + applicationId: String, + shuffleId: Int, + workerIds: util.List[String], + slots: util.List[util.Map[String, Integer]], + override var requestId: String = ZERO_UUID) + extends MasterRequestMessage + case class RequestSlotsResponse( status: StatusCode, workerResource: WorkerResource) @@ -487,6 +497,18 @@ object ControlMessages extends Logging { .build().toByteArray new TransportMessage(MessageType.REQUEST_SLOTS, payload) + case ReleaseSlots(applicationId, shuffleId, workerIds, slots, requestId) => + val pbSlots = slots.asScala.map(slot => + PbSlotInfo.newBuilder().putAllSlot(slot).build()).toList + val payload = PbReleaseSlots.newBuilder() + .setApplicationId(applicationId) + .setShuffleId(shuffleId) + .setRequestId(requestId) + .addAllWorkerIds(workerIds) + .addAllSlots(pbSlots.asJava) + .build().toByteArray + new TransportMessage(MessageType.RELEASE_SLOTS, payload) + case RequestSlotsResponse(status, workerResource) => val builder = PbRequestSlotsResponse.newBuilder() .setStatus(status.getValue) @@ -746,6 +768,18 @@ object ControlMessages extends Logging { logError(msg) throw new UnsupportedOperationException(msg) + // keep it for compatible reason + case RELEASE_SLOTS_VALUE => + val pbReleaseSlots = PbReleaseSlots.parseFrom(message.getPayload) + val slotsList = pbReleaseSlots.getSlotsList.asScala.map(pbSlot => + new util.HashMap[String, Integer](pbSlot.getSlotMap)).toList.asJava + ReleaseSlots( + pbReleaseSlots.getApplicationId, + pbReleaseSlots.getShuffleId, + new util.ArrayList[String](pbReleaseSlots.getWorkerIdsList), + new util.ArrayList[util.Map[String, Integer]](slotsList), + pbReleaseSlots.getRequestId) + case REGISTER_WORKER_VALUE => PbRegisterWorker.parseFrom(message.getPayload) diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index d5ce628275b..bd16f89c4ac 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -303,6 +303,9 @@ private[celeborn] class Master( userResourceConsumption, requestId)) + case ReleaseSlots(_, _, _, _, _) => + // keep it for compatible reason + case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _) => logTrace(s"Received RequestSlots request $requestSlots.") executeWithLeaderChecker(context, handleRequestSlots(context, requestSlots))