Skip to content

Commit

Permalink
[CELEBORN-726][FOLLOWUP] Amend method names
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
As title

### Why are the changes needed?
As title

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Passes GA

Closes apache#1776 from waitinfuture/method.

Authored-by: zky.zhoukeyong <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
  • Loading branch information
waitinfuture authored and pan3793 committed Aug 7, 2023
1 parent 1a76e78 commit cae0552
Showing 1 changed file with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
// First, request to get allocated slots from Primary
val ids = new util.ArrayList[Integer](numPartitions)
(0 until numPartitions).foreach(idx => ids.add(Integer.valueOf(idx)))
val res = requestPrimaryRequestSlotsWithRetry(shuffleId, ids)
val res = requestMasterRequestSlotsWithRetry(shuffleId, ids)

res.status match {
case StatusCode.REQUEST_FAILED =>
Expand Down Expand Up @@ -993,7 +993,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
latestPartitionLocation.remove(shuffleId)
commitManager.removeExpiredShuffle(shuffleId)
changePartitionManager.removeExpiredShuffle(shuffleId)
val unregisterShuffleResponse = requestPrimaryUnregisterShuffle(
val unregisterShuffleResponse = requestMasterUnregisterShuffle(
UnregisterShuffle(appUniqueId, shuffleId, MasterClient.genRequestId()))
// if unregister shuffle not success, wait next turn
if (StatusCode.SUCCESS == Utils.toStatusCode(unregisterShuffleResponse.getStatus)) {
Expand All @@ -1003,7 +1003,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
}
}

private def requestPrimaryRequestSlotsWithRetry(
private def requestMasterRequestSlotsWithRetry(
shuffleId: Int,
ids: util.ArrayList[Integer]): RequestSlotsResponse = {
val req =
Expand All @@ -1015,15 +1015,15 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
pushReplicateEnabled,
pushRackAwareEnabled,
userIdentifier)
val res = requestPrimaryRequestSlots(req)
val res = requestMasterRequestSlots(req)
if (res.status != StatusCode.SUCCESS) {
requestPrimaryRequestSlots(req)
requestMasterRequestSlots(req)
} else {
res
}
}

private def requestPrimaryRequestSlots(message: RequestSlots): RequestSlotsResponse = {
private def requestMasterRequestSlots(message: RequestSlots): RequestSlotsResponse = {
val shuffleKey = Utils.makeShuffleKey(message.applicationId, message.shuffleId)
try {
masterClient.askSync[RequestSlotsResponse](message, classOf[RequestSlotsResponse])
Expand Down Expand Up @@ -1064,7 +1064,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
}
}

private def requestPrimaryUnregisterShuffle(message: PbUnregisterShuffle)
private def requestMasterUnregisterShuffle(message: PbUnregisterShuffle)
: PbUnregisterShuffleResponse = {
try {
masterClient.askSync[PbUnregisterShuffleResponse](
Expand Down

0 comments on commit cae0552

Please sign in to comment.