From dec0f31dede527a31f4b8b5b7ca1bafec7c50fb1 Mon Sep 17 00:00:00 2001 From: mingji Date: Tue, 10 Dec 2024 18:03:00 +0800 Subject: [PATCH] [CELEBORN-1769] Fix packed partition location cause GetReducerFileGroupResponse lose location ### What changes were proposed in this pull request? Fix the issue of losing the primary location when parsing `GetReducerFileGroupResponse` from `LifecycleManager`. ### Why are the changes needed? In previous optimizations, I introduced packed partition locations to reduce the size of RPC calls, based on the assumption that primary partition locations would always be available. However, in some test scenarios where data replication is enabled and workers are randomly terminated, the primary location may be lost while the replica location remains. This causes the replica locations to be ignored which will cause data loss. ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? GA and cluster. Closes #2986 from FMX/b1769. Authored-by: mingji Signed-off-by: Shuang (cherry picked from commit 069e5b6c18e11d53c1ade6e4a85484856f22fba9) Signed-off-by: Shuang --- .../protocol/message/ControlMessages.scala | 15 ++- .../common/util/PbSerDeUtilsTest.scala | 99 ++++++++++++++++++- 2 files changed, 110 insertions(+), 4 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 6870ba1ca04..f5cd7ff74b8 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -1063,10 +1063,21 @@ object ControlMessages extends Logging { .parseFrom(message.getPayload) val fileGroup = pbGetReducerFileGroupResponse.getFileGroupsMap.asScala.map { case (partitionId, fileGroup) => + val locationsSet: java.util.Set[PartitionLocation] = + new util.LinkedHashSet[PartitionLocation]() + + // In PbGetReducerFileGroupResponse, location with same + // uniqueId will not be put into the location set + // check out the logic @org.apache.celeborn.client.commit.CommitHandler.parallelCommitFiles + // This is why we should join the primary location list and replica location list + + val (pris, reps) = PbSerDeUtils.fromPbPackedPartitionLocationsPair( + fileGroup.getPartitionLocationsPair) + locationsSet.addAll(pris) + locationsSet.addAll(reps) ( partitionId, - PbSerDeUtils.fromPbPackedPartitionLocationsPair( - fileGroup.getPartitionLocationsPair)._1.asScala.toSet.asJava) + locationsSet) }.asJava val attempts = pbGetReducerFileGroupResponse.getAttemptsList.asScala.map(_.toInt).toArray diff --git a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala index 485662cdaf5..94c73204a2b 100644 --- a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala +++ b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala @@ -21,15 +21,18 @@ import java.io.File import java.util import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.util.Random import org.apache.hadoop.shaded.org.apache.commons.lang3.RandomStringUtils import org.apache.celeborn.CelebornFunSuite import org.apache.celeborn.common.identity.UserIdentifier -import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo, DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus} +import org.apache.celeborn.common.meta._ import org.apache.celeborn.common.protocol.{PartitionLocation, PbPackedWorkerResource, PbWorkerResource, StorageInfo} -import org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource +import org.apache.celeborn.common.protocol.PartitionLocation.Mode +import org.apache.celeborn.common.protocol.message.{ControlMessages, StatusCode} +import org.apache.celeborn.common.protocol.message.ControlMessages.{GetReducerFileGroupResponse, WorkerResource} import org.apache.celeborn.common.quota.ResourceConsumption import org.apache.celeborn.common.util.PbSerDeUtils.{fromPbPackedPartitionLocationsPair, toPbPackedPartitionLocationsPair} @@ -466,4 +469,96 @@ class PbSerDeUtilsTest extends CelebornFunSuite { testSerializationPerformance(100) } + test("GetReduceFileGroup with primary and replica locations") { + val shuffleMap: util.Map[Integer, util.Set[PartitionLocation]] = + JavaUtils.newConcurrentHashMap() + val locationSet = new util.LinkedHashSet[PartitionLocation]() + val uniqueIds = mutable.Set("0-0", "1-0", "2-0", "3-0", "4-0") + locationSet.add(new PartitionLocation(0, 0, "h", 1, 1, 1, 1, Mode.REPLICA)) + locationSet.add(new PartitionLocation(1, 0, "h", 1, 1, 1, 1, Mode.PRIMARY)) + locationSet.add(new PartitionLocation(2, 0, "h", 1, 1, 1, 1, Mode.PRIMARY)) + locationSet.add(new PartitionLocation(3, 0, "h", 1, 1, 1, 1, Mode.REPLICA)) + locationSet.add(new PartitionLocation(4, 0, "h", 1, 1, 1, 1, Mode.PRIMARY)) + shuffleMap.put(1, locationSet) + val attempts = Array.fill(10)(10) + val succeedPartitions = Array.fill(10)(10).map(java.lang.Integer.valueOf).toSet.asJava + + val GetReducerFileGroupResponseMsg = GetReducerFileGroupResponse( + StatusCode.SUCCESS, + shuffleMap, + attempts, + succeedPartitions) + + val transportGetReducerFileGroup = + ControlMessages.toTransportMessage(GetReducerFileGroupResponseMsg) + val fromTransportGetReducerFileGroup: GetReducerFileGroupResponse = + ControlMessages.fromTransportMessage( + transportGetReducerFileGroup).asInstanceOf[GetReducerFileGroupResponse] + + val locations = fromTransportGetReducerFileGroup.fileGroup.get(1) + locations.asScala.foreach(p => uniqueIds.remove(p.getUniqueId)) + assert(uniqueIds.isEmpty) + } + + test("GetReduceFileGroup with primary location only") { + val shuffleMap: util.Map[Integer, util.Set[PartitionLocation]] = + JavaUtils.newConcurrentHashMap() + val locationSet = new util.LinkedHashSet[PartitionLocation]() + val uniqueIds = mutable.Set("0-0", "1-0", "2-0", "3-0", "4-0") + locationSet.add(new PartitionLocation(0, 0, "h", 1, 1, 1, 1, Mode.PRIMARY)) + locationSet.add(new PartitionLocation(1, 0, "h", 1, 1, 1, 1, Mode.PRIMARY)) + locationSet.add(new PartitionLocation(2, 0, "h", 1, 1, 1, 1, Mode.PRIMARY)) + locationSet.add(new PartitionLocation(3, 0, "h", 1, 1, 1, 1, Mode.PRIMARY)) + locationSet.add(new PartitionLocation(4, 0, "h", 1, 1, 1, 1, Mode.PRIMARY)) + shuffleMap.put(1, locationSet) + val attempts = Array.fill(10)(10) + val succeedPartitions = Array.fill(10)(10).map(java.lang.Integer.valueOf).toSet.asJava + + val GetReducerFileGroupResponseMsg = GetReducerFileGroupResponse( + StatusCode.SUCCESS, + shuffleMap, + attempts, + succeedPartitions) + + val transportGetReducerFileGroup = + ControlMessages.toTransportMessage(GetReducerFileGroupResponseMsg) + val fromTransportGetReducerFileGroup: GetReducerFileGroupResponse = + ControlMessages.fromTransportMessage( + transportGetReducerFileGroup).asInstanceOf[GetReducerFileGroupResponse] + + val locations = fromTransportGetReducerFileGroup.fileGroup.get(1) + locations.asScala.foreach(p => uniqueIds.remove(p.getUniqueId)) + assert(uniqueIds.isEmpty) + } + + test("GetReduceFileGroup with replica location only") { + val shuffleMap: util.Map[Integer, util.Set[PartitionLocation]] = + JavaUtils.newConcurrentHashMap() + val locationSet = new util.LinkedHashSet[PartitionLocation]() + val uniqueIds = mutable.Set("0-0", "1-0", "2-0", "3-0", "4-0") + locationSet.add(new PartitionLocation(0, 0, "h", 1, 1, 1, 1, Mode.REPLICA)) + locationSet.add(new PartitionLocation(1, 0, "h", 1, 1, 1, 1, Mode.REPLICA)) + locationSet.add(new PartitionLocation(2, 0, "h", 1, 1, 1, 1, Mode.REPLICA)) + locationSet.add(new PartitionLocation(3, 0, "h", 1, 1, 1, 1, Mode.REPLICA)) + locationSet.add(new PartitionLocation(4, 0, "h", 1, 1, 1, 1, Mode.REPLICA)) + shuffleMap.put(1, locationSet) + val attempts = Array.fill(10)(10) + val succeedPartitions = Array.fill(10)(10).map(java.lang.Integer.valueOf).toSet.asJava + + val GetReducerFileGroupResponseMsg = GetReducerFileGroupResponse( + StatusCode.SUCCESS, + shuffleMap, + attempts, + succeedPartitions) + + val transportGetReducerFileGroup = + ControlMessages.toTransportMessage(GetReducerFileGroupResponseMsg) + val fromTransportGetReducerFileGroup: GetReducerFileGroupResponse = + ControlMessages.fromTransportMessage( + transportGetReducerFileGroup).asInstanceOf[GetReducerFileGroupResponse] + + val locations = fromTransportGetReducerFileGroup.fileGroup.get(1) + locations.asScala.foreach(p => uniqueIds.remove(p.getUniqueId)) + assert(uniqueIds.isEmpty) + } }