From b1e3d661e6e7db255867159b639c3b345566b2e4 Mon Sep 17 00:00:00 2001 From: "zhongqiang.czq" Date: Wed, 6 Sep 2023 22:33:56 +0800 Subject: [PATCH] [CELEBORN-627][FLINK][FOLLOWUP] Support split partitions ### What changes were proposed in this pull request? fix duplicated sending commitFiles for MapPartition and fix not sending BufferStreamEnd while opening MapPartition split. ### Why are the changes needed? After open partition split for MapPartition, there are 2 errors. - ERROR1 : Worker don't send streamend to client because concurrent thread sync problem . After idle timeout, client will close the channel and throws the Exception **" xx is lost, notify related stream xx"** ```java 2023-09-06T04:40:47.7549935Z 23/09/06 04:40:47,753 WARN [Keyed Aggregation -> Map -> Sink: Unnamed (5/8)#0] Task: Keyed Aggregation -> Map -> Sink: Unnamed (5/8)#0 (c1cade728ddb3a32e0bf72acb1d87588_c27dcf7b54ef6bfd6cff02ca8870b681_4_0) switched from RUNNING to FAILED with failure cause: 2023-09-06T04:40:47.7550644Z java.io.IOException: Client localhost/127.0.0.1:38485 is lost, notify related stream 256654410004 2023-09-06T04:40:47.7551219Z at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:142) 2023-09-06T04:40:47.7551886Z at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77) 2023-09-06T04:40:47.7552576Z at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:57) 2023-09-06T04:40:47.7553250Z at org.apache.celeborn.plugin.flink.network.ReadClientHandler.lambda$channelInactive$0(ReadClientHandler.java:119) 2023-09-06T04:40:47.7553806Z at java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1597) 2023-09-06T04:40:47.7554564Z at org.apache.celeborn.plugin.flink.network.ReadClientHandler.channelInactive(ReadClientHandler.java:110) 2023-09-06T04:40:47.7555270Z at org.apache.celeborn.common.network.server.TransportRequestHandler.channelInactive(TransportRequestHandler.java:71) 2023-09-06T04:40:47.7556005Z at org.apache.celeborn.common.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:136) 2023-09-06T04:40:47.7556710Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305) 2023-09-06T04:40:47.7557370Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) 2023-09-06T04:40:47.7558172Z at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) 2023-09-06T04:40:47.7558803Z at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) 2023-09-06T04:40:47.7559368Z at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277) 2023-09-06T04:40:47.7559954Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) 2023-09-06T04:40:47.7560589Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) 2023-09-06T04:40:47.7561222Z at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) 2023-09-06T04:40:47.7561829Z at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) 2023-09-06T04:40:47.7562620Z at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelInactive(TransportFrameDecoderWithBufferSupplier.java:206) 2023-09-06T04:40:47.7563506Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305) 2023-09-06T04:40:47.7564207Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) 2023-09-06T04:40:47.7564829Z at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) 2023-09-06T04:40:47.7565417Z at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) 2023-09-06T04:40:47.7566014Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301) 2023-09-06T04:40:47.7566654Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) 2023-09-06T04:40:47.7567317Z at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) 2023-09-06T04:40:47.7567813Z at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813) 2023-09-06T04:40:47.7568297Z at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) 2023-09-06T04:40:47.7568830Z at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) 2023-09-06T04:40:47.7569402Z at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) 2023-09-06T04:40:47.7569894Z at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569) 2023-09-06T04:40:47.7570356Z at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) 2023-09-06T04:40:47.7570841Z at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 2023-09-06T04:40:47.7571319Z at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 2023-09-06T04:40:47.7571721Z at java.lang.Thread.run(Thread.java:750) ``` - ERROR2: Client will send duplicated commitFiles to worker. Becuase of inconsistency unHandledPartiitions , both batchCommit and finalCommit send commitFiles ``` java 2023-09-06T04:36:48.3146773Z 23/09/06 04:36:48,314 WARN [Worker-CommitFiles-1] Controller: Get Partition Location for 1693975002919-61094c8156f918062a5fae12d551bc90-0 0-1 but didn't exist. ``` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? ci Closes #1881 from zhongqiangczq/fix-split-test. Authored-by: zhongqiang.czq Signed-off-by: zhongqiang.czq --- .../commit/MapPartitionCommitHandler.scala | 3 +-- .../src/test/resources/log4j2-test.xml | 4 ++-- .../storage/MapDataPartitionReader.java | 9 +++---- .../deploy/worker/PushDataHandler.scala | 24 +++++++++++++++---- 4 files changed, 28 insertions(+), 12 deletions(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala b/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala index e75e77bf073..54d05671fa6 100644 --- a/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala +++ b/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala @@ -74,7 +74,7 @@ class MapPartitionCommitHandler( shuffleId: Int, shuffleCommittedInfo: ShuffleCommittedInfo): mutable.Set[PartitionLocation] = { shuffleCommittedInfo.unhandledPartitionLocations.asScala.filterNot { partitionLocation => - shuffleCommittedInfo.handledPartitionLocations.contains(partitionLocation) && + shuffleCommittedInfo.handledPartitionLocations.contains(partitionLocation) || isPartitionInProcess(shuffleId, partitionLocation.getId) } } @@ -199,7 +199,6 @@ class MapPartitionCommitHandler( recordWorkerFailure(commitFailedWorkers) } - inProcessingPartitionIds.remove(partitionId) if (dataCommitSuccess) { val resultPartitions = shuffleSucceedPartitionIds.computeIfAbsent( diff --git a/tests/flink-it/src/test/resources/log4j2-test.xml b/tests/flink-it/src/test/resources/log4j2-test.xml index 5fb0cb8ba90..607cbb578c0 100644 --- a/tests/flink-it/src/test/resources/log4j2-test.xml +++ b/tests/flink-it/src/test/resources/log4j2-test.xml @@ -16,7 +16,7 @@ ~ limitations under the License. --> - + @@ -29,7 +29,7 @@ - + diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java index 06e50c01b76..b8f996fe7f8 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java @@ -71,7 +71,7 @@ public class MapDataPartitionReader implements Comparable 0) { - notifyBacklog(numDataBuffers); - } if (!hasRemaining) { closeReader(); } + + if (numDataBuffers > 0) { + notifyBacklog(numDataBuffers); + } } private void addBuffer(ByteBuf buffer, BufferRecycler bufferRecycler) { diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index 9637369960e..36167ad47cd 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -940,13 +940,15 @@ class PushDataHandler extends BaseMessageHandler with Logging { val isPartitionSplitEnabled = fileWriter.asInstanceOf[ MapPartitionFileWriter].getFileInfo.isPartitionSplitEnabled - if (shutdown.get() && (messageType == Type.REGION_START || messageType == Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled) { + if (shutdown.get() && (messageType == Type.REGION_START || messageType == + Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled) { logInfo(s"$messageType return HARD_SPLIT for shuffle $shuffleKey since worker shutdown.") callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue))) return } - if (checkSplit && (messageType == Type.REGION_START || messageType == Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled && checkDiskFullAndSplit( + if (checkSplit && (messageType == Type.REGION_START || messageType == + Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled && checkDiskFullAndSplit( fileWriter, isPrimary, null, @@ -1116,7 +1118,14 @@ class PushDataHandler extends BaseMessageHandler with Logging { callback: RpcResponseCallback): Boolean = { val diskFull = checkDiskFull(fileWriter) logDebug( - s"CheckDiskFullAndSplit in diskfull: $diskFull, partitionSplitMinimumSize: $partitionSplitMinimumSize, splitThreshold: ${fileWriter.getSplitThreshold()}, filelength: ${fileWriter.getFileInfo.getFileLength}, filename:${fileWriter.getFileInfo.getFilePath}") + s""" + |CheckDiskFullAndSplit in + |diskFull:$diskFull, + |partitionSplitMinimumSize: $partitionSplitMinimumSize, + |splitThreshold:${fileWriter.getSplitThreshold()}, + |fileLength:${fileWriter.getFileInfo.getFileLength} + |fileName:${fileWriter.getFileInfo.getFilePath} + |""".stripMargin) if (workerPartitionSplitEnabled && ((diskFull && fileWriter.getFileInfo.getFileLength > partitionSplitMinimumSize) || (isPrimary && fileWriter.getFileInfo.getFileLength > fileWriter.getSplitThreshold()))) { if (softSplit != null && fileWriter.getSplitMode == PartitionSplitMode.SOFT && @@ -1125,7 +1134,14 @@ class PushDataHandler extends BaseMessageHandler with Logging { } else { callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue))) logDebug( - s"CheckDiskFullAndSplit hardsplit diskfull: $diskFull, partitionSplitMinimumSize: $partitionSplitMinimumSize, splitThreshold: ${fileWriter.getSplitThreshold()}, filelength: ${fileWriter.getFileInfo.getFileLength}, filename:${fileWriter.getFileInfo.getFilePath}") + s""" + |CheckDiskFullAndSplit hardSplit + |diskFull:$diskFull, + |partitionSplitMinimumSize:$partitionSplitMinimumSize, + |splitThreshold:${fileWriter.getSplitThreshold()}, + |fileLength:${fileWriter.getFileInfo.getFileLength}, + |fileName:${fileWriter.getFileInfo.getFilePath} + |""".stripMargin) return true } }