Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1644] Optimize handle merged data on stage end #2806

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

FMX
Copy link
Contributor

@FMX FMX commented Oct 12, 2024

What changes were proposed in this pull request?

  1. Add worker readable address to commit files response logs, to avoid redundant logs
  2. Pass stage end status to the worker to avoid unnecessary retry

Why are the changes needed?

Committing files will take a period of time if a speculation task is pushing merged data to the worker. This will cause the client to retry until the committing files is complete or client timeout. This PR intend to optimize the logic to avoid unnecessary retry.

Does this PR introduce any user-facing change?

NO.

How was this patch tested?

GA.

.add(mapId);
if (response.remaining() > 0) {
int retCode = response.get();
if (retCode == StatusCode.MAP_ENDED.getValue()) {
Copy link
Member

@SteNicholas SteNicholas Oct 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any compatibility problem between old client version and new server version, which situation is that server returns STAGE_ENDED but client only handles MAP_ENDED?

Copy link
Contributor Author

@FMX FMX Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

STAGE_ENDED will be returned if the field stageEnd is true. So old servers won't return STAGE_ENDED.

@@ -490,7 +492,14 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
}
} else {
if (storageManager.shuffleKeySet().contains(shuffleKey)) {
// This means that this stage is ended and invoked commit files by stage end
if (presumptiveEndedShuffles.contains(shuffleKey)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we can check stage_end before checking map_end?

.computeIfAbsent(shuffleId, (id) -> ConcurrentHashMap.newKeySet())
.add(mapId);
}
if (retCode == StatusCode.STAGE_ENDED.getValue()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elseif maybe better

@@ -490,6 +498,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
}
} else {
// This means that this stage is ended and invoked commit files by stage end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should remove this line~

@@ -439,7 +442,7 @@ abstract class CommitHandler(
replicaIds)
}

doParallelCommitFiles(shuffleId, shuffleCommittedInfo, params, commitFilesFailedWorkers)
doParallelCommitFiles(shuffleId, shuffleCommittedInfo, params, commitFilesFailedWorkers, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the ReducePartitionCommitHandler can set stageEnd to true when call handleFinalCommitFiles method.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants