Skip to content

Commit

Permalink
[CELEBORN-846][FOLLOWUP] Fix broken link caused by unknown RPC
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Keep ReleaseSlots RPC to make sure that 0.3 client can worker with 0.3.1-SNAPSHOT and 0.4.0-SNAPSHOT.
This PR will need to merged into main and branch-0.3.

### Why are the changes needed?
Ditto.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
GA and cluster.

Closes #1794 from FMX/CELEBORN-846-FOLLOWUP.

Lead-authored-by: mingji <[email protected]>
Co-authored-by: Ethan Feng <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
  • Loading branch information
FMX authored and waitinfuture committed Aug 11, 2023
1 parent e137d0e commit 7d0e257
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 2 deletions.
21 changes: 19 additions & 2 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ enum MessageType {
REGISTER_SHUFFLE = 4;
REGISTER_SHUFFLE_RESPONSE = 5;
REQUEST_SLOTS = 6;
// RELEASE_SLOTS = 7;
// RELEASE_SLOTS_RESPONSE = 8;
// keep it for compatible with 0.3 client, will remove in 0.5
RELEASE_SLOTS = 7;
RELEASE_SLOTS_RESPONSE = 8;

REQUEST_SLOTS_RESPONSE = 9;
CHANGE_LOCATION = 10;
CHANGE_LOCATION_RESPONSE = 11;
Expand Down Expand Up @@ -192,6 +194,21 @@ message PbSlotInfo {
map<string, int32> slot = 1;
}

// keep it for compatible reason
message PbReleaseSlots {
string applicationId = 1;
int32 shuffleId = 2;
repeated string workerIds = 3;
repeated PbSlotInfo slots = 4;
string requestId = 6;
}

// keep it for compatible reason
message PbReleaseSlotsResponse {
int32 status = 1;
}


message PbRequestSlotsResponse {
int32 status = 1;
map<string, PbWorkerResource> workerResource = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,21 @@ object ControlMessages extends Logging {
override var requestId: String = ZERO_UUID)
extends MasterRequestMessage

// Keep it for compatible reason
@deprecated
case class ReleaseSlots(
applicationId: String,
shuffleId: Int,
workerIds: util.List[String],
slots: util.List[util.Map[String, Integer]],
override var requestId: String = ZERO_UUID)
extends MasterRequestMessage

// Keep it for compatible reason
@deprecated
case class ReleaseSlotsResponse(status: StatusCode)
extends MasterMessage

case class RequestSlotsResponse(
status: StatusCode,
workerResource: WorkerResource)
Expand Down Expand Up @@ -487,6 +502,23 @@ object ControlMessages extends Logging {
.build().toByteArray
new TransportMessage(MessageType.REQUEST_SLOTS, payload)

case ReleaseSlots(applicationId, shuffleId, workerIds, slots, requestId) =>
val pbSlots = slots.asScala.map(slot =>
PbSlotInfo.newBuilder().putAllSlot(slot).build()).toList
val payload = PbReleaseSlots.newBuilder()
.setApplicationId(applicationId)
.setShuffleId(shuffleId)
.setRequestId(requestId)
.addAllWorkerIds(workerIds)
.addAllSlots(pbSlots.asJava)
.build().toByteArray
new TransportMessage(MessageType.RELEASE_SLOTS, payload)

case ReleaseSlotsResponse(status) =>
val payload = PbReleaseSlotsResponse.newBuilder()
.setStatus(status.getValue).build().toByteArray
new TransportMessage(MessageType.RELEASE_SLOTS_RESPONSE, payload)

case RequestSlotsResponse(status, workerResource) =>
val builder = PbRequestSlotsResponse.newBuilder()
.setStatus(status.getValue)
Expand Down Expand Up @@ -746,6 +778,22 @@ object ControlMessages extends Logging {
logError(msg)
throw new UnsupportedOperationException(msg)

// keep it for compatible reason
case RELEASE_SLOTS_VALUE =>
val pbReleaseSlots = PbReleaseSlots.parseFrom(message.getPayload)
val slotsList = pbReleaseSlots.getSlotsList.asScala.map(pbSlot =>
new util.HashMap[String, Integer](pbSlot.getSlotMap)).toList.asJava
ReleaseSlots(
pbReleaseSlots.getApplicationId,
pbReleaseSlots.getShuffleId,
new util.ArrayList[String](pbReleaseSlots.getWorkerIdsList),
new util.ArrayList[util.Map[String, Integer]](slotsList),
pbReleaseSlots.getRequestId)

case RELEASE_SLOTS_RESPONSE_VALUE =>
val pbReleaseSlotsResponse = PbReleaseSlotsResponse.parseFrom(message.getPayload)
ReleaseSlotsResponse(Utils.toStatusCode(pbReleaseSlotsResponse.getStatus))

case REGISTER_WORKER_VALUE =>
PbRegisterWorker.parseFrom(message.getPayload)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ private[celeborn] class Master(
userResourceConsumption,
requestId))

case ReleaseSlots(_, _, _, _, _) =>
// keep it for compatible reason
context.reply(ReleaseSlotsResponse(StatusCode.SUCCESS))

case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _) =>
logTrace(s"Received RequestSlots request $requestSlots.")
executeWithLeaderChecker(context, handleRequestSlots(context, requestSlots))
Expand Down

0 comments on commit 7d0e257

Please sign in to comment.