Skip to content

Commit

Permalink
[CELEBORN-1769] Fix packed partition location cause GetReducerFileGro…
Browse files Browse the repository at this point in the history
…upResponse lose location

### What changes were proposed in this pull request?
Fix the issue of losing the primary location when parsing `GetReducerFileGroupResponse` from `LifecycleManager`.

### Why are the changes needed?
In previous optimizations, I introduced packed partition locations to reduce the size of RPC calls, based on the assumption that primary partition locations would always be available. However, in some test scenarios where data replication is enabled and workers are randomly terminated, the primary location may be lost while the replica location remains. This causes the replica locations to be ignored which will cause data loss.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
GA and cluster.

Closes #2986 from FMX/b1769.

Authored-by: mingji <[email protected]>
Signed-off-by: Shuang <[email protected]>
  • Loading branch information
FMX authored and RexXiong committed Dec 10, 2024
1 parent 11cbacb commit 069e5b6
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1158,10 +1158,21 @@ object ControlMessages extends Logging {
.parseFrom(message.getPayload)
val fileGroup = pbGetReducerFileGroupResponse.getFileGroupsMap.asScala.map {
case (partitionId, fileGroup) =>
val locationsSet: java.util.Set[PartitionLocation] =
new util.LinkedHashSet[PartitionLocation]()

// In PbGetReducerFileGroupResponse, location with same
// uniqueId will not be put into the location set
// check out the logic @org.apache.celeborn.client.commit.CommitHandler.parallelCommitFiles
// This is why we should join the primary location list and replica location list

val (pris, reps) = PbSerDeUtils.fromPbPackedPartitionLocationsPair(
fileGroup.getPartitionLocationsPair)
locationsSet.addAll(pris)
locationsSet.addAll(reps)
(
partitionId,
PbSerDeUtils.fromPbPackedPartitionLocationsPair(
fileGroup.getPartitionLocationsPair)._1.asScala.toSet.asJava)
locationsSet)
}.asJava

val attempts = pbGetReducerFileGroupResponse.getAttemptsList.asScala.map(_.toInt).toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ import java.io.File
import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Random

import org.apache.hadoop.shaded.org.apache.commons.lang3.RandomStringUtils

import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo, DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.meta._
import org.apache.celeborn.common.protocol.{PartitionLocation, PbPackedWorkerResource, PbWorkerResource, StorageInfo}
import org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
import org.apache.celeborn.common.protocol.PartitionLocation.Mode
import org.apache.celeborn.common.protocol.message.{ControlMessages, StatusCode}
import org.apache.celeborn.common.protocol.message.ControlMessages.{GetReducerFileGroupResponse, WorkerResource}
import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.util.PbSerDeUtils.{fromPbPackedPartitionLocationsPair, toPbPackedPartitionLocationsPair}

Expand Down Expand Up @@ -470,4 +473,96 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
testSerializationPerformance(100)
}

test("GetReduceFileGroup with primary and replica locations") {
val shuffleMap: util.Map[Integer, util.Set[PartitionLocation]] =
JavaUtils.newConcurrentHashMap()
val locationSet = new util.LinkedHashSet[PartitionLocation]()
val uniqueIds = mutable.Set("0-0", "1-0", "2-0", "3-0", "4-0")
locationSet.add(new PartitionLocation(0, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
locationSet.add(new PartitionLocation(1, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
locationSet.add(new PartitionLocation(2, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
locationSet.add(new PartitionLocation(3, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
locationSet.add(new PartitionLocation(4, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
shuffleMap.put(1, locationSet)
val attempts = Array.fill(10)(10)
val succeedPartitions = Array.fill(10)(10).map(java.lang.Integer.valueOf).toSet.asJava

val GetReducerFileGroupResponseMsg = GetReducerFileGroupResponse(
StatusCode.SUCCESS,
shuffleMap,
attempts,
succeedPartitions)

val transportGetReducerFileGroup =
ControlMessages.toTransportMessage(GetReducerFileGroupResponseMsg)
val fromTransportGetReducerFileGroup: GetReducerFileGroupResponse =
ControlMessages.fromTransportMessage(
transportGetReducerFileGroup).asInstanceOf[GetReducerFileGroupResponse]

val locations = fromTransportGetReducerFileGroup.fileGroup.get(1)
locations.asScala.foreach(p => uniqueIds.remove(p.getUniqueId))
assert(uniqueIds.isEmpty)
}

test("GetReduceFileGroup with primary location only") {
val shuffleMap: util.Map[Integer, util.Set[PartitionLocation]] =
JavaUtils.newConcurrentHashMap()
val locationSet = new util.LinkedHashSet[PartitionLocation]()
val uniqueIds = mutable.Set("0-0", "1-0", "2-0", "3-0", "4-0")
locationSet.add(new PartitionLocation(0, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
locationSet.add(new PartitionLocation(1, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
locationSet.add(new PartitionLocation(2, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
locationSet.add(new PartitionLocation(3, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
locationSet.add(new PartitionLocation(4, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
shuffleMap.put(1, locationSet)
val attempts = Array.fill(10)(10)
val succeedPartitions = Array.fill(10)(10).map(java.lang.Integer.valueOf).toSet.asJava

val GetReducerFileGroupResponseMsg = GetReducerFileGroupResponse(
StatusCode.SUCCESS,
shuffleMap,
attempts,
succeedPartitions)

val transportGetReducerFileGroup =
ControlMessages.toTransportMessage(GetReducerFileGroupResponseMsg)
val fromTransportGetReducerFileGroup: GetReducerFileGroupResponse =
ControlMessages.fromTransportMessage(
transportGetReducerFileGroup).asInstanceOf[GetReducerFileGroupResponse]

val locations = fromTransportGetReducerFileGroup.fileGroup.get(1)
locations.asScala.foreach(p => uniqueIds.remove(p.getUniqueId))
assert(uniqueIds.isEmpty)
}

test("GetReduceFileGroup with replica location only") {
val shuffleMap: util.Map[Integer, util.Set[PartitionLocation]] =
JavaUtils.newConcurrentHashMap()
val locationSet = new util.LinkedHashSet[PartitionLocation]()
val uniqueIds = mutable.Set("0-0", "1-0", "2-0", "3-0", "4-0")
locationSet.add(new PartitionLocation(0, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
locationSet.add(new PartitionLocation(1, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
locationSet.add(new PartitionLocation(2, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
locationSet.add(new PartitionLocation(3, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
locationSet.add(new PartitionLocation(4, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
shuffleMap.put(1, locationSet)
val attempts = Array.fill(10)(10)
val succeedPartitions = Array.fill(10)(10).map(java.lang.Integer.valueOf).toSet.asJava

val GetReducerFileGroupResponseMsg = GetReducerFileGroupResponse(
StatusCode.SUCCESS,
shuffleMap,
attempts,
succeedPartitions)

val transportGetReducerFileGroup =
ControlMessages.toTransportMessage(GetReducerFileGroupResponseMsg)
val fromTransportGetReducerFileGroup: GetReducerFileGroupResponse =
ControlMessages.fromTransportMessage(
transportGetReducerFileGroup).asInstanceOf[GetReducerFileGroupResponse]

val locations = fromTransportGetReducerFileGroup.fileGroup.get(1)
locations.asScala.foreach(p => uniqueIds.remove(p.getUniqueId))
assert(uniqueIds.isEmpty)
}
}

0 comments on commit 069e5b6

Please sign in to comment.