Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-846][FOLLOWUP] Fix broken link caused by unknown RPC. #1794

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ enum MessageType {
REGISTER_SHUFFLE = 4;
REGISTER_SHUFFLE_RESPONSE = 5;
REQUEST_SLOTS = 6;
// RELEASE_SLOTS = 7;
// RELEASE_SLOTS_RESPONSE = 8;
// keep it for compatible with 0.3 client, will remove in 0.5
RELEASE_SLOTS = 7;
RELEASE_SLOTS_RESPONSE = 8;

REQUEST_SLOTS_RESPONSE = 9;
CHANGE_LOCATION = 10;
CHANGE_LOCATION_RESPONSE = 11;
Expand Down Expand Up @@ -192,6 +194,21 @@ message PbSlotInfo {
map<string, int32> slot = 1;
}

// keep it for compatible reason
message PbReleaseSlots {
string applicationId = 1;
int32 shuffleId = 2;
repeated string workerIds = 3;
repeated PbSlotInfo slots = 4;
string requestId = 6;
}

// keep it for compatible reason
message PbReleaseSlotsResponse {
int32 status = 1;
}


message PbRequestSlotsResponse {
int32 status = 1;
map<string, PbWorkerResource> workerResource = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,21 @@ object ControlMessages extends Logging {
override var requestId: String = ZERO_UUID)
extends MasterRequestMessage

// Keep it for compatible reason
@deprecated
case class ReleaseSlots(
applicationId: String,
shuffleId: Int,
workerIds: util.List[String],
slots: util.List[util.Map[String, Integer]],
override var requestId: String = ZERO_UUID)
extends MasterRequestMessage

// Keep it for compatible reason
@deprecated
case class ReleaseSlotsResponse(status: StatusCode)
extends MasterMessage

case class RequestSlotsResponse(
status: StatusCode,
workerResource: WorkerResource)
Expand Down Expand Up @@ -487,6 +502,23 @@ object ControlMessages extends Logging {
.build().toByteArray
new TransportMessage(MessageType.REQUEST_SLOTS, payload)

case ReleaseSlots(applicationId, shuffleId, workerIds, slots, requestId) =>
val pbSlots = slots.asScala.map(slot =>
PbSlotInfo.newBuilder().putAllSlot(slot).build()).toList
val payload = PbReleaseSlots.newBuilder()
.setApplicationId(applicationId)
.setShuffleId(shuffleId)
.setRequestId(requestId)
.addAllWorkerIds(workerIds)
.addAllSlots(pbSlots.asJava)
.build().toByteArray
new TransportMessage(MessageType.RELEASE_SLOTS, payload)

case ReleaseSlotsResponse(status) =>
val payload = PbReleaseSlotsResponse.newBuilder()
.setStatus(status.getValue).build().toByteArray
new TransportMessage(MessageType.RELEASE_SLOTS_RESPONSE, payload)

case RequestSlotsResponse(status, workerResource) =>
val builder = PbRequestSlotsResponse.newBuilder()
.setStatus(status.getValue)
Expand Down Expand Up @@ -746,6 +778,22 @@ object ControlMessages extends Logging {
logError(msg)
throw new UnsupportedOperationException(msg)

// keep it for compatible reason
case RELEASE_SLOTS_VALUE =>
val pbReleaseSlots = PbReleaseSlots.parseFrom(message.getPayload)
val slotsList = pbReleaseSlots.getSlotsList.asScala.map(pbSlot =>
new util.HashMap[String, Integer](pbSlot.getSlotMap)).toList.asJava
ReleaseSlots(
pbReleaseSlots.getApplicationId,
pbReleaseSlots.getShuffleId,
new util.ArrayList[String](pbReleaseSlots.getWorkerIdsList),
new util.ArrayList[util.Map[String, Integer]](slotsList),
pbReleaseSlots.getRequestId)

case RELEASE_SLOTS_RESPONSE_VALUE =>
val pbReleaseSlotsResponse = PbReleaseSlotsResponse.parseFrom(message.getPayload)
ReleaseSlotsResponse(Utils.toStatusCode(pbReleaseSlotsResponse.getStatus))

case REGISTER_WORKER_VALUE =>
PbRegisterWorker.parseFrom(message.getPayload)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ private[celeborn] class Master(
userResourceConsumption,
requestId))

case ReleaseSlots(_, _, _, _, _) =>
// keep it for compatible reason
context.reply(ReleaseSlotsResponse(StatusCode.SUCCESS))

case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _) =>
logTrace(s"Received RequestSlots request $requestSlots.")
executeWithLeaderChecker(context, handleRequestSlots(context, requestSlots))
Expand Down
Loading