Skip to content

Commit

Permalink
[CELEBORN-1490][CIP-6] Extends FileMeta to support hybrid shuffle
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Second PR to support Hybrid Shuffle.

This extends `FileMeta` as hybrid shuffle supports reading while writing at segment-grained.

We also introduce `isSegmentGranularityVisible` to some message and method to identify segment-based shuffle.

### Does this PR introduce _any_ user-facing change?
no.

### How was this patch tested?
no need.

Closes #2716 from reswqa/cip6-2-pr.

Authored-by: Weijie Guo <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
  • Loading branch information
reswqa authored and SteNicholas committed Sep 4, 2024
1 parent 48910bb commit 40a6546
Show file tree
Hide file tree
Showing 8 changed files with 294 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void open(int initialCredit) {
try {
bufferStream =
client.readBufferedPartition(
shuffleId, partitionId, subPartitionIndexStart, subPartitionIndexEnd);
shuffleId, partitionId, subPartitionIndexStart, subPartitionIndexEnd, false);
bufferStream.open(
RemoteBufferStreamReader.this::requestBuffer, initialCredit, messageConsumer);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@
import org.apache.celeborn.common.protocol.PbPushDataHandShake;
import org.apache.celeborn.common.protocol.PbRegionFinish;
import org.apache.celeborn.common.protocol.PbRegionStart;
import org.apache.celeborn.common.protocol.PbSegmentStart;
import org.apache.celeborn.common.protocol.ReviveRequest;
import org.apache.celeborn.common.protocol.TransportModuleConstants;
import org.apache.celeborn.common.protocol.message.ControlMessages;
import org.apache.celeborn.common.protocol.message.StatusCode;
import org.apache.celeborn.common.rpc.RpcEndpointRef;
import org.apache.celeborn.common.util.CollectionUtils;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.PbSerDeUtils;
import org.apache.celeborn.common.util.Utils;
Expand Down Expand Up @@ -164,17 +166,31 @@ public void setupLifecycleManagerRef(RpcEndpointRef endpointRef) {
}

public CelebornBufferStream readBufferedPartition(
int shuffleId, int partitionId, int subPartitionIndexStart, int subPartitionIndexEnd)
int shuffleId,
int partitionId,
int subPartitionIndexStart,
int subPartitionIndexEnd,
boolean isSegmentGranularityVisible)
throws IOException {
String shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId);
ReduceFileGroups fileGroups = updateFileGroup(shuffleId, partitionId);
if (fileGroups.partitionGroups.size() == 0
|| !fileGroups.partitionGroups.containsKey(partitionId)) {
logger.error("Shuffle data is empty for shuffle {} partitionId {}.", shuffleId, partitionId);
throw new PartitionUnRetryAbleException(partitionId + " may be lost.");
PartitionLocation[] partitionLocations =
updateFileGroupAndGetLocations(shuffleId, partitionId, isSegmentGranularityVisible);
if (partitionLocations.length == 0) {
logger.error(
"Shuffle data is empty for shuffle {} partitionId {} isSegmentGranularityVisible {}.",
shuffleId,
partitionId,
isSegmentGranularityVisible);
if (isSegmentGranularityVisible) {
// When the downstream reduce tasks start early than upstream map tasks, the shuffle
// partition locations may be found empty, should retry until the upstream task started
return CelebornBufferStream.empty();
} else {
throw new PartitionUnRetryAbleException(
String.format(
"Shuffle data lost for shuffle %d partition %d.", shuffleId, partitionId));
}
} else {
PartitionLocation[] partitionLocations =
fileGroups.partitionGroups.get(partitionId).toArray(new PartitionLocation[0]);
Arrays.sort(partitionLocations, Comparator.comparingInt(PartitionLocation::getEpoch));
logger.debug(
"readBufferedPartition shuffleKey:{} partitionid:{} partitionLocation:{}",
Expand All @@ -193,8 +209,27 @@ public CelebornBufferStream readBufferedPartition(
}
}

/**
* Update the reduce file groups and obtain the PartitionLocations of the target
* shuffleId#partitionId. It is possible to return an empty array if the corresponding reduce file
* groups are nonexistent, a scenario likely arising when downstream reduce tasks are start early
* than upstream map tasks, e.g. Flink Hybrid Shuffle.
*/
public PartitionLocation[] updateFileGroupAndGetLocations(
int shuffleId, int partitionId, boolean isSegmentGranularityVisible) throws IOException {
ReduceFileGroups fileGroups =
updateFileGroup(shuffleId, partitionId, isSegmentGranularityVisible);
if (CollectionUtils.isEmpty(fileGroups.partitionGroups)
|| !fileGroups.partitionGroups.containsKey(partitionId)) {
return new PartitionLocation[0];
} else {
return fileGroups.partitionGroups.get(partitionId).toArray(new PartitionLocation[0]);
}
}

@Override
public ReduceFileGroups updateFileGroup(int shuffleId, int partitionId)
public ReduceFileGroups updateFileGroup(
int shuffleId, int partitionId, boolean isSegmentGranularityVisible)
throws CelebornIOException {
ReduceFileGroups reduceFileGroups =
reduceFileGroupsMap.computeIfAbsent(
Expand All @@ -213,7 +248,8 @@ public ReduceFileGroups updateFileGroup(int shuffleId, int partitionId)
Utils.makeReducerKey(shuffleId, partitionId));
} else {
// refresh file groups
Tuple2<ReduceFileGroups, String> fileGroups = loadFileGroupInternal(shuffleId);
Tuple2<ReduceFileGroups, String> fileGroups =
loadFileGroupInternal(shuffleId, isSegmentGranularityVisible);
ReduceFileGroups newGroups = fileGroups._1;
if (newGroups == null) {
throw new CelebornIOException(
Expand Down Expand Up @@ -516,6 +552,46 @@ public void regionFinish(int shuffleId, int mapId, int attemptId, PartitionLocat
});
}

public void segmentStart(
int shuffleId,
int mapId,
int attemptId,
int subPartitionId,
int segmentId,
PartitionLocation location)
throws IOException {
final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
final PushState pushState = pushStates.computeIfAbsent(mapKey, (s) -> new PushState(conf));
retrySendMessage(
() -> {
final String shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId);
logger.debug(
"SegmentStart for shuffle {} map {} attemptId {} locationId {} subpartitionId{} segmentId {}.",
shuffleId,
mapId,
attemptId,
location,
subPartitionId,
segmentId);
TransportClient client = createClientWaitingInFlightRequest(location, mapKey, pushState);
client.sendRpcSync(
new TransportMessage(
MessageType.SEGMENT_START,
PbSegmentStart.newBuilder()
.setMode(Mode.forNumber(PRIMARY_MODE))
.setShuffleKey(shuffleKey)
.setPartitionUniqueId(location.getUniqueId())
.setAttemptId(attemptId)
.setSubPartitionId(subPartitionId)
.setSegmentId(segmentId)
.build()
.toByteArray())
.toByteBuffer(),
conf.pushDataTimeoutMs());
return null;
});
}

@FunctionalInterface
interface ThrowingExceptionSupplier<R, E extends Exception> {
R get() throws E;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,17 @@ private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
@Override
public PartitionLocation registerMapPartitionTask(
int shuffleId, int numMappers, int mapId, int attemptId, int partitionId) throws IOException {
return registerMapPartitionTask(shuffleId, numMappers, mapId, attemptId, partitionId, false);
}

public PartitionLocation registerMapPartitionTask(
int shuffleId,
int numMappers,
int mapId,
int attemptId,
int partitionId,
boolean isSegmentGranularityVisible)
throws IOException {
logger.info(
"Register MapPartition task for shuffle {} map {} attempt {} partition {} with {} mapper.",
shuffleId,
Expand All @@ -545,7 +556,12 @@ public PartitionLocation registerMapPartitionTask(
() ->
lifecycleManagerRef.askSync(
RegisterMapPartitionTask$.MODULE$.apply(
shuffleId, numMappers, mapId, attemptId, partitionId),
shuffleId,
numMappers,
mapId,
attemptId,
partitionId,
isSegmentGranularityVisible),
conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));

Expand Down Expand Up @@ -1628,7 +1644,8 @@ public boolean cleanupShuffle(int shuffleId) {
return true;
}

protected Tuple2<ReduceFileGroups, String> loadFileGroupInternal(int shuffleId) {
protected Tuple2<ReduceFileGroups, String> loadFileGroupInternal(
int shuffleId, boolean isSegmentGranularityVisible) {
{
long getReducerFileGroupStartTime = System.nanoTime();
String exceptionMsg = null;
Expand All @@ -1637,7 +1654,8 @@ protected Tuple2<ReduceFileGroups, String> loadFileGroupInternal(int shuffleId)
exceptionMsg = "Driver endpoint is null!";
logger.warn(exceptionMsg);
} else {
GetReducerFileGroup getReducerFileGroup = new GetReducerFileGroup(shuffleId);
GetReducerFileGroup getReducerFileGroup =
new GetReducerFileGroup(shuffleId, isSegmentGranularityVisible);

GetReducerFileGroupResponse response =
lifecycleManagerRef.askSync(
Expand Down Expand Up @@ -1689,12 +1707,18 @@ protected Tuple2<ReduceFileGroups, String> loadFileGroupInternal(int shuffleId)
@Override
public ReduceFileGroups updateFileGroup(int shuffleId, int partitionId)
throws CelebornIOException {
return updateFileGroup(shuffleId, partitionId, false);
}

public ReduceFileGroups updateFileGroup(
int shuffleId, int partitionId, boolean isSegmentGranularityVisible)
throws CelebornIOException {
Tuple2<ReduceFileGroups, String> fileGroupTuple =
reduceFileGroupsMap.compute(
shuffleId,
(id, existsTuple) -> {
if (existsTuple == null || existsTuple._1 == null) {
return loadFileGroupInternal(shuffleId);
return loadFileGroupInternal(shuffleId, isSegmentGranularityVisible);
} else {
return existsTuple;
}
Expand Down Expand Up @@ -1738,7 +1762,7 @@ public CelebornInputStream readPartition(
// CelebornShuffleReader, which means `updateFileGroup` is already called and
// batch open stream has been tried
if (mapAttempts == null) {
ReduceFileGroups fileGroups = updateFileGroup(shuffleId, partitionId);
ReduceFileGroups fileGroups = updateFileGroup(shuffleId, partitionId, false);
mapAttempts = fileGroups.mapAttempts;
if (fileGroups.partitionGroups.containsKey(partitionId)) {
locations = new ArrayList(fileGroups.partitionGroups.get(partitionId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,9 +398,10 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
throw new UnsupportedOperationException(s"Not support $partitionType yet")
}

case GetReducerFileGroup(shuffleId: Int) =>
logDebug(s"Received GetShuffleFileGroup request for shuffleId $shuffleId.")
handleGetReducerFileGroup(context, shuffleId)
case GetReducerFileGroup(shuffleId: Int, isSegmentGranularityVisible: Boolean) =>
logDebug(
s"Received GetShuffleFileGroup request for shuffleId $shuffleId, isSegmentGranularityVisible $isSegmentGranularityVisible")
handleGetReducerFileGroup(context, shuffleId, isSegmentGranularityVisible)

case pb: PbGetShuffleId =>
val appShuffleId = pb.getAppShuffleId
Expand Down Expand Up @@ -786,8 +787,12 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends

private def handleGetReducerFileGroup(
context: RpcCallContext,
shuffleId: Int): Unit = {
if (!registeredShuffle.contains(shuffleId)) {
shuffleId: Int,
isSegmentGranularityVisible: Boolean): Unit = {
// If isSegmentGranularityVisible is set to true, the downstream reduce task may start early than upstream map task, e.g. flink hybrid shuffle.
// Under these circumstances, there's a possibility that the shuffle might not yet be registered when the downstream reduce task send GetReduceFileGroup request,
// so we shouldn't send a SHUFFLE_NOT_REGISTERED response directly, should enqueue this request to pending list, and response to the downstream reduce task the ReduceFileGroup when the upstream map task register shuffle done
if (!registeredShuffle.contains(shuffleId) && !isSegmentGranularityVisible) {
logWarning(s"[handleGetReducerFileGroup] shuffle $shuffleId not registered, maybe no shuffle data within this stage.")
context.reply(GetReducerFileGroupResponse(
StatusCode.SHUFFLE_NOT_REGISTERED,
Expand Down
Loading

0 comments on commit 40a6546

Please sign in to comment.