Skip to content

Commit

Permalink
[CELEBORN-1432] ShuffleClientImpl should invoke loadFileGroupInternal…
Browse files Browse the repository at this point in the history
… only once when using the reduce partition mode

### What changes were proposed in this pull request?

`ShuffleClientImpl` invokes `loadFileGroupInternal` only once when using the reduce partition mode.

### Why are the changes needed?

`ShuffleClientImpl` may call `loadFileGroupInternal` multiple times when using reduce partition mode, which is not as expected. This bug was introduced in #2219.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

GA.

Closes #2531 from SteNicholas/CELEBORN-1432.

Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
  • Loading branch information
SteNicholas authored and FMX committed May 28, 2024
1 parent 493e0f1 commit 043a20e
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ public CelebornBufferStream readBufferedPartition(
public ReduceFileGroups updateFileGroup(int shuffleId, int partitionId)
throws CelebornIOException {
ReduceFileGroups reduceFileGroups =
reduceFileGroupsMap.computeIfAbsent(shuffleId, (id) -> new ReduceFileGroups());
reduceFileGroupsMap.computeIfAbsent(
shuffleId, (id) -> Tuple2.apply(new ReduceFileGroups(), null))
._1;
if (reduceFileGroups.partitionIds != null
&& reduceFileGroups.partitionIds.contains(partitionId)) {
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void update(ReduceFileGroups fileGroups) {
}

// key: shuffleId
protected final Map<Integer, ReduceFileGroups> reduceFileGroupsMap =
protected final Map<Integer, Tuple2<ReduceFileGroups, String>> reduceFileGroupsMap =
JavaUtils.newConcurrentHashMap();

public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier userIdentifier) {
Expand Down Expand Up @@ -1647,17 +1647,13 @@ protected Tuple2<ReduceFileGroups, String> loadFileGroupInternal(int shuffleId)

public ReduceFileGroups updateFileGroup(int shuffleId, int partitionId)
throws CelebornIOException {
if (reduceFileGroupsMap.containsKey(shuffleId)) {
return reduceFileGroupsMap.get(shuffleId);
Tuple2<ReduceFileGroups, String> fileGroupTuple =
reduceFileGroupsMap.computeIfAbsent(shuffleId, (id) -> loadFileGroupInternal(shuffleId));
if (fileGroupTuple._1 == null) {
throw new CelebornIOException(
loadFileGroupException(shuffleId, partitionId, (fileGroupTuple._2)));
} else {
Tuple2<ReduceFileGroups, String> fileGroups = loadFileGroupInternal(shuffleId);
ReduceFileGroups newGroups = fileGroups._1;
if (newGroups == null) {
throw new CelebornIOException(
loadFileGroupException(shuffleId, partitionId, fileGroups._2));
}
reduceFileGroupsMap.put(shuffleId, newGroups);
return newGroups;
return fileGroupTuple._1;
}
}

Expand Down Expand Up @@ -1726,7 +1722,7 @@ public CelebornInputStream readPartition(
}

@VisibleForTesting
public Map<Integer, ReduceFileGroups> getReduceFileGroupsMap() {
public Map<Integer, Tuple2<ReduceFileGroups, String>> getReduceFileGroupsMap() {
return reduceFileGroupsMap;
}

Expand Down

0 comments on commit 043a20e

Please sign in to comment.