From 043a20e85cb23213d04143e5ce1c1343a383a355 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Tue, 28 May 2024 16:46:36 +0800 Subject: [PATCH] [CELEBORN-1432] ShuffleClientImpl should invoke loadFileGroupInternal only once when using the reduce partition mode ### What changes were proposed in this pull request? `ShuffleClientImpl` invokes `loadFileGroupInternal` only once when using the reduce partition mode. ### Why are the changes needed? `ShuffleClientImpl` may call `loadFileGroupInternal` multiple times when using reduce partition mode, which is not as expected. This bug was introduced in #2219. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA. Closes #2531 from SteNicholas/CELEBORN-1432. Authored-by: SteNicholas Signed-off-by: mingji --- .../readclient/FlinkShuffleClientImpl.java | 4 +++- .../celeborn/client/ShuffleClientImpl.java | 20 ++++++++----------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java index 3d69613f8ef..79c659db538 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java @@ -174,7 +174,9 @@ public CelebornBufferStream readBufferedPartition( public ReduceFileGroups updateFileGroup(int shuffleId, int partitionId) throws CelebornIOException { ReduceFileGroups reduceFileGroups = - reduceFileGroupsMap.computeIfAbsent(shuffleId, (id) -> new ReduceFileGroups()); + reduceFileGroupsMap.computeIfAbsent( + shuffleId, (id) -> Tuple2.apply(new ReduceFileGroups(), null)) + ._1; if (reduceFileGroups.partitionIds != null && reduceFileGroups.partitionIds.contains(partitionId)) { logger.debug( diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index e44b8ba856a..345caecdbe0 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -164,7 +164,7 @@ public void update(ReduceFileGroups fileGroups) { } // key: shuffleId - protected final Map reduceFileGroupsMap = + protected final Map> reduceFileGroupsMap = JavaUtils.newConcurrentHashMap(); public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier userIdentifier) { @@ -1647,17 +1647,13 @@ protected Tuple2 loadFileGroupInternal(int shuffleId) public ReduceFileGroups updateFileGroup(int shuffleId, int partitionId) throws CelebornIOException { - if (reduceFileGroupsMap.containsKey(shuffleId)) { - return reduceFileGroupsMap.get(shuffleId); + Tuple2 fileGroupTuple = + reduceFileGroupsMap.computeIfAbsent(shuffleId, (id) -> loadFileGroupInternal(shuffleId)); + if (fileGroupTuple._1 == null) { + throw new CelebornIOException( + loadFileGroupException(shuffleId, partitionId, (fileGroupTuple._2))); } else { - Tuple2 fileGroups = loadFileGroupInternal(shuffleId); - ReduceFileGroups newGroups = fileGroups._1; - if (newGroups == null) { - throw new CelebornIOException( - loadFileGroupException(shuffleId, partitionId, fileGroups._2)); - } - reduceFileGroupsMap.put(shuffleId, newGroups); - return newGroups; + return fileGroupTuple._1; } } @@ -1726,7 +1722,7 @@ public CelebornInputStream readPartition( } @VisibleForTesting - public Map getReduceFileGroupsMap() { + public Map> getReduceFileGroupsMap() { return reduceFileGroupsMap; }