Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-726][FOLLOWUP] Amend method names #1776

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
// First, request to get allocated slots from Primary
val ids = new util.ArrayList[Integer](numPartitions)
(0 until numPartitions).foreach(idx => ids.add(Integer.valueOf(idx)))
val res = requestPrimaryRequestSlotsWithRetry(shuffleId, ids)
val res = requestMasterRequestSlotsWithRetry(shuffleId, ids)

res.status match {
case StatusCode.REQUEST_FAILED =>
Expand Down Expand Up @@ -993,7 +993,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
latestPartitionLocation.remove(shuffleId)
commitManager.removeExpiredShuffle(shuffleId)
changePartitionManager.removeExpiredShuffle(shuffleId)
val unregisterShuffleResponse = requestPrimaryUnregisterShuffle(
val unregisterShuffleResponse = requestMasterUnregisterShuffle(
UnregisterShuffle(appUniqueId, shuffleId, MasterClient.genRequestId()))
// if unregister shuffle not success, wait next turn
if (StatusCode.SUCCESS == Utils.toStatusCode(unregisterShuffleResponse.getStatus)) {
Expand All @@ -1003,7 +1003,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
}
}

private def requestPrimaryRequestSlotsWithRetry(
private def requestMasterRequestSlotsWithRetry(
shuffleId: Int,
ids: util.ArrayList[Integer]): RequestSlotsResponse = {
val req =
Expand All @@ -1015,15 +1015,15 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
pushReplicateEnabled,
pushRackAwareEnabled,
userIdentifier)
val res = requestPrimaryRequestSlots(req)
val res = requestMasterRequestSlots(req)
if (res.status != StatusCode.SUCCESS) {
requestPrimaryRequestSlots(req)
requestMasterRequestSlots(req)
} else {
res
}
}

private def requestPrimaryRequestSlots(message: RequestSlots): RequestSlotsResponse = {
private def requestMasterRequestSlots(message: RequestSlots): RequestSlotsResponse = {
val shuffleKey = Utils.makeShuffleKey(message.applicationId, message.shuffleId)
try {
masterClient.askSync[RequestSlotsResponse](message, classOf[RequestSlotsResponse])
Expand Down Expand Up @@ -1064,7 +1064,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
}
}

private def requestPrimaryUnregisterShuffle(message: PbUnregisterShuffle)
private def requestMasterUnregisterShuffle(message: PbUnregisterShuffle)
: PbUnregisterShuffleResponse = {
try {
masterClient.askSync[PbUnregisterShuffleResponse](
Expand Down
Loading