Skip to content

Commit

Permalink
[CELEBORN-627][FLINK][FOLLOWUP] Support split partitions
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
fix duplicated sending commitFiles for MapPartition and fix not sending BufferStreamEnd while opening MapPartition split.

### Why are the changes needed?
After open partition split  for MapPartition, there are 2 errors.
- ERROR1 : Worker don't send streamend to client because concurrent thread sync problem . After idle timeout, client will close the channel and throws the Exception **" xx is lost, notify related stream xx"**
```java
2023-09-06T04:40:47.7549935Z 23/09/06 04:40:47,753 WARN [Keyed Aggregation -> Map -> Sink: Unnamed (5/8)#0] Task: Keyed Aggregation -> Map -> Sink: Unnamed (5/8)#0 (c1cade728ddb3a32e0bf72acb1d87588_c27dcf7b54ef6bfd6cff02ca8870b681_4_0) switched from RUNNING to FAILED with failure cause:
2023-09-06T04:40:47.7550644Z java.io.IOException: Client localhost/127.0.0.1:38485 is lost, notify related stream 256654410004
2023-09-06T04:40:47.7551219Z 	at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:142)
2023-09-06T04:40:47.7551886Z 	at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77)
2023-09-06T04:40:47.7552576Z 	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:57)
2023-09-06T04:40:47.7553250Z 	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.lambda$channelInactive$0(ReadClientHandler.java:119)
2023-09-06T04:40:47.7553806Z 	at java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1597)
2023-09-06T04:40:47.7554564Z 	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.channelInactive(ReadClientHandler.java:110)
2023-09-06T04:40:47.7555270Z 	at org.apache.celeborn.common.network.server.TransportRequestHandler.channelInactive(TransportRequestHandler.java:71)
2023-09-06T04:40:47.7556005Z 	at org.apache.celeborn.common.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:136)
2023-09-06T04:40:47.7556710Z 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
2023-09-06T04:40:47.7557370Z 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
2023-09-06T04:40:47.7558172Z 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
2023-09-06T04:40:47.7558803Z 	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
2023-09-06T04:40:47.7559368Z 	at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
2023-09-06T04:40:47.7559954Z 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
2023-09-06T04:40:47.7560589Z 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
2023-09-06T04:40:47.7561222Z 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
2023-09-06T04:40:47.7561829Z 	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
2023-09-06T04:40:47.7562620Z 	at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelInactive(TransportFrameDecoderWithBufferSupplier.java:206)
2023-09-06T04:40:47.7563506Z 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
2023-09-06T04:40:47.7564207Z 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
2023-09-06T04:40:47.7564829Z 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
2023-09-06T04:40:47.7565417Z 	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
2023-09-06T04:40:47.7566014Z 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
2023-09-06T04:40:47.7566654Z 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
2023-09-06T04:40:47.7567317Z 	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
2023-09-06T04:40:47.7567813Z 	at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
2023-09-06T04:40:47.7568297Z 	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
2023-09-06T04:40:47.7568830Z 	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
2023-09-06T04:40:47.7569402Z 	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
2023-09-06T04:40:47.7569894Z 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
2023-09-06T04:40:47.7570356Z 	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
2023-09-06T04:40:47.7570841Z 	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
2023-09-06T04:40:47.7571319Z 	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
2023-09-06T04:40:47.7571721Z 	at java.lang.Thread.run(Thread.java:750)
```
- ERROR2: Client will send duplicated commitFiles to worker. Becuase of inconsistency unHandledPartiitions , both batchCommit and finalCommit send commitFiles
``` java
2023-09-06T04:36:48.3146773Z 23/09/06 04:36:48,314 WARN [Worker-CommitFiles-1] Controller: Get Partition Location for 1693975002919-61094c8156f918062a5fae12d551bc90-0 0-1 but didn't exist.
```

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci

Closes #1881 from zhongqiangczq/fix-split-test.

Authored-by: zhongqiang.czq <[email protected]>
Signed-off-by: zhongqiang.czq <[email protected]>
  • Loading branch information
zhongqiangczq committed Sep 6, 2023
1 parent 10c63e0 commit b1e3d66
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class MapPartitionCommitHandler(
shuffleId: Int,
shuffleCommittedInfo: ShuffleCommittedInfo): mutable.Set[PartitionLocation] = {
shuffleCommittedInfo.unhandledPartitionLocations.asScala.filterNot { partitionLocation =>
shuffleCommittedInfo.handledPartitionLocations.contains(partitionLocation) &&
shuffleCommittedInfo.handledPartitionLocations.contains(partitionLocation) ||
isPartitionInProcess(shuffleId, partitionLocation.getId)
}
}
Expand Down Expand Up @@ -199,7 +199,6 @@ class MapPartitionCommitHandler(
recordWorkerFailure(commitFailedWorkers)
}

inProcessingPartitionIds.remove(partitionId)
if (dataCommitSuccess) {
val resultPartitions =
shuffleSucceedPartitionIds.computeIfAbsent(
Expand Down
4 changes: 2 additions & 2 deletions tests/flink-it/src/test/resources/log4j2-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
~ limitations under the License.
-->

<Configuration status="INFO">
<Configuration status="DEBUG">
<Appenders>
<Console name="stdout" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} %p [%t] %c{1}: %m%n%ex"/>
Expand All @@ -29,7 +29,7 @@
</File>
</Appenders>
<Loggers>
<Root level="INFO">
<Root level="DEBUG">
<AppenderRef ref="stdout"/>
<AppenderRef ref="file"/>
</Root>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class MapDataPartitionReader implements Comparable<MapDataPartitionReader

/** Whether all the data has been successfully read or not. */
@GuardedBy("lock")
private boolean readFinished;
private volatile boolean readFinished;

/** Whether this partition reader has been released or not. */
@GuardedBy("lock")
Expand Down Expand Up @@ -162,13 +162,14 @@ public void readData(BufferQueue bufferQueue, BufferRecycler bufferRecycler) thr
addBuffer(buffer, bufferRecycler);
++numDataBuffers;
}
if (numDataBuffers > 0) {
notifyBacklog(numDataBuffers);
}

if (!hasRemaining) {
closeReader();
}

if (numDataBuffers > 0) {
notifyBacklog(numDataBuffers);
}
}

private void addBuffer(ByteBuf buffer, BufferRecycler bufferRecycler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -940,13 +940,15 @@ class PushDataHandler extends BaseMessageHandler with Logging {
val isPartitionSplitEnabled = fileWriter.asInstanceOf[
MapPartitionFileWriter].getFileInfo.isPartitionSplitEnabled

if (shutdown.get() && (messageType == Type.REGION_START || messageType == Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled) {
if (shutdown.get() && (messageType == Type.REGION_START || messageType ==
Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled) {
logInfo(s"$messageType return HARD_SPLIT for shuffle $shuffleKey since worker shutdown.")
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
return
}

if (checkSplit && (messageType == Type.REGION_START || messageType == Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled && checkDiskFullAndSplit(
if (checkSplit && (messageType == Type.REGION_START || messageType ==
Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled && checkDiskFullAndSplit(
fileWriter,
isPrimary,
null,
Expand Down Expand Up @@ -1116,7 +1118,14 @@ class PushDataHandler extends BaseMessageHandler with Logging {
callback: RpcResponseCallback): Boolean = {
val diskFull = checkDiskFull(fileWriter)
logDebug(
s"CheckDiskFullAndSplit in diskfull: $diskFull, partitionSplitMinimumSize: $partitionSplitMinimumSize, splitThreshold: ${fileWriter.getSplitThreshold()}, filelength: ${fileWriter.getFileInfo.getFileLength}, filename:${fileWriter.getFileInfo.getFilePath}")
s"""
|CheckDiskFullAndSplit in
|diskFull:$diskFull,
|partitionSplitMinimumSize: $partitionSplitMinimumSize,
|splitThreshold:${fileWriter.getSplitThreshold()},
|fileLength:${fileWriter.getFileInfo.getFileLength}
|fileName:${fileWriter.getFileInfo.getFilePath}
|""".stripMargin)
if (workerPartitionSplitEnabled && ((diskFull && fileWriter.getFileInfo.getFileLength > partitionSplitMinimumSize) ||
(isPrimary && fileWriter.getFileInfo.getFileLength > fileWriter.getSplitThreshold()))) {
if (softSplit != null && fileWriter.getSplitMode == PartitionSplitMode.SOFT &&
Expand All @@ -1125,7 +1134,14 @@ class PushDataHandler extends BaseMessageHandler with Logging {
} else {
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
logDebug(
s"CheckDiskFullAndSplit hardsplit diskfull: $diskFull, partitionSplitMinimumSize: $partitionSplitMinimumSize, splitThreshold: ${fileWriter.getSplitThreshold()}, filelength: ${fileWriter.getFileInfo.getFileLength}, filename:${fileWriter.getFileInfo.getFilePath}")
s"""
|CheckDiskFullAndSplit hardSplit
|diskFull:$diskFull,
|partitionSplitMinimumSize:$partitionSplitMinimumSize,
|splitThreshold:${fileWriter.getSplitThreshold()},
|fileLength:${fileWriter.getFileInfo.getFileLength},
|fileName:${fileWriter.getFileInfo.getFilePath}
|""".stripMargin)
return true
}
}
Expand Down

0 comments on commit b1e3d66

Please sign in to comment.