Skip to content

Commit

Permalink
[CELEBORN-846][FOLLOWUP] Fix broken link cause by unknown RPC.
Browse files Browse the repository at this point in the history
  • Loading branch information
FMX committed Aug 7, 2023
1 parent 6ea1ee2 commit bf69e85
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 1 deletion.
12 changes: 11 additions & 1 deletion common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ enum MessageType {
REGISTER_SHUFFLE = 4;
REGISTER_SHUFFLE_RESPONSE = 5;
REQUEST_SLOTS = 6;
// RELEASE_SLOTS = 7;
// keep it for compatible reason
RELEASE_SLOTS = 7;
// RELEASE_SLOTS_RESPONSE = 8;
REQUEST_SLOTS_RESPONSE = 9;
CHANGE_LOCATION = 10;
Expand Down Expand Up @@ -192,6 +193,15 @@ message PbSlotInfo {
map<string, int32> slot = 1;
}

// keep it for compatible reason
message PbReleaseSlots {
string applicationId = 1;
int32 shuffleId = 2;
repeated string workerIds = 3;
repeated PbSlotInfo slots = 4;
string requestId = 6;
}

message PbRequestSlotsResponse {
int32 status = 1;
map<string, PbWorkerResource> workerResource = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ object ControlMessages extends Logging {
override var requestId: String = ZERO_UUID)
extends MasterRequestMessage

@Deprecated
// Keep it for compatible reason
case class ReleaseSlots(
applicationId: String,
shuffleId: Int,
workerIds: util.List[String],
slots: util.List[util.Map[String, Integer]],
override var requestId: String = ZERO_UUID)
extends MasterRequestMessage

case class RequestSlotsResponse(
status: StatusCode,
workerResource: WorkerResource)
Expand Down Expand Up @@ -487,6 +497,18 @@ object ControlMessages extends Logging {
.build().toByteArray
new TransportMessage(MessageType.REQUEST_SLOTS, payload)

case ReleaseSlots(applicationId, shuffleId, workerIds, slots, requestId) =>
val pbSlots = slots.asScala.map(slot =>
PbSlotInfo.newBuilder().putAllSlot(slot).build()).toList
val payload = PbReleaseSlots.newBuilder()
.setApplicationId(applicationId)
.setShuffleId(shuffleId)
.setRequestId(requestId)
.addAllWorkerIds(workerIds)
.addAllSlots(pbSlots.asJava)
.build().toByteArray
new TransportMessage(MessageType.RELEASE_SLOTS, payload)

case RequestSlotsResponse(status, workerResource) =>
val builder = PbRequestSlotsResponse.newBuilder()
.setStatus(status.getValue)
Expand Down Expand Up @@ -746,6 +768,18 @@ object ControlMessages extends Logging {
logError(msg)
throw new UnsupportedOperationException(msg)

// keep it for compatible reason
case RELEASE_SLOTS_VALUE =>
val pbReleaseSlots = PbReleaseSlots.parseFrom(message.getPayload)
val slotsList = pbReleaseSlots.getSlotsList.asScala.map(pbSlot =>
new util.HashMap[String, Integer](pbSlot.getSlotMap)).toList.asJava
ReleaseSlots(
pbReleaseSlots.getApplicationId,
pbReleaseSlots.getShuffleId,
new util.ArrayList[String](pbReleaseSlots.getWorkerIdsList),
new util.ArrayList[util.Map[String, Integer]](slotsList),
pbReleaseSlots.getRequestId)

case REGISTER_WORKER_VALUE =>
PbRegisterWorker.parseFrom(message.getPayload)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ private[celeborn] class Master(
userResourceConsumption,
requestId))

case ReleaseSlots(_, _, _, _, _) =>
// keep it for compatible reason

case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _) =>
logTrace(s"Received RequestSlots request $requestSlots.")
executeWithLeaderChecker(context, handleRequestSlots(context, requestSlots))
Expand Down

0 comments on commit bf69e85

Please sign in to comment.