diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java index f8fc35a61c1..45e33fe769b 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java @@ -155,16 +155,8 @@ public void moveToNextPartitionIfPossible(long endedStreamId) { currentLocationIndex, streamId); if (currentLocationIndex.get() > 0) { - if (endedStreamId == streamId) { - logger.debug("Get end streamId {}", endedStreamId); - cleanStream(endedStreamId); - } else { - logger.warn( - "Received unexpected stream end, current stream id {} received ended stream id {}", - this.streamId, - endedStreamId); - return; - } + logger.debug("Get end streamId {}", endedStreamId); + cleanStream(endedStreamId); } if (currentLocationIndex.get() < locations.length) { try { diff --git a/tests/flink-it/src/test/resources/log4j2-test.xml b/tests/flink-it/src/test/resources/log4j2-test.xml index 607cbb578c0..5fb0cb8ba90 100644 --- a/tests/flink-it/src/test/resources/log4j2-test.xml +++ b/tests/flink-it/src/test/resources/log4j2-test.xml @@ -16,7 +16,7 @@ ~ limitations under the License. --> - + @@ -29,7 +29,7 @@ - + diff --git a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/SplitTest.scala b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/SplitTest.scala new file mode 100644 index 00000000000..9ee68c507d2 --- /dev/null +++ b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/SplitTest.scala @@ -0,0 +1,58 @@ +package org.apache.celeborn.tests.flink + +import org.apache.flink.api.common.{ExecutionMode, RuntimeExecutionMode} +import org.apache.flink.configuration.{Configuration, ExecutionOptions, RestOptions} +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.internal.Logging +import org.apache.celeborn.service.deploy.MiniClusterFeature +import org.apache.celeborn.service.deploy.worker.Worker + +class SplitTest extends AnyFunSuite with Logging with MiniClusterFeature + with BeforeAndAfterAll { + var workers: collection.Set[Worker] = null + override def beforeAll(): Unit = { + logInfo("test initialized , setup celeborn mini cluster") + val masterConf = Map( + "celeborn.master.host" -> "localhost", + "celeborn.master.port" -> "9097") + val workerConf = Map( + "celeborn.master.endpoints" -> "localhost:9097", + CelebornConf.WORKER_FLUSHER_BUFFER_SIZE.key -> "10k") + workers = setUpMiniCluster(masterConf, workerConf)._2 + } + override def afterAll(): Unit = { + logInfo("all test complete , stop celeborn mini cluster") + shutdownMiniCluster() + } + + test("celeborn flink integration test - shuffle partition split test") { + val configuration = new Configuration + val parallelism = 8 + configuration.setString( + "shuffle-service-factory.class", + "org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory") + configuration.setString(CelebornConf.MASTER_ENDPOINTS.key, "localhost:9097") + configuration.setString("execution.batch-shuffle-mode", "ALL_EXCHANGES_BLOCKING") + configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH) + configuration.setString("taskmanager.memory.network.min", "1024m") + configuration.setString(RestOptions.BIND_PORT, "8081-8089") + configuration.setString( + "execution.batch.adaptive.auto-parallelism.min-parallelism", + "" + parallelism) + configuration.setString( + "execution.batch.adaptive.auto-parallelism.max-parallelism", + "" + parallelism) + configuration.setString(CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD.key, "10k") + configuration.setString(CelebornConf.CLIENT_FLINK_SHUFFLE_PARTITION_SPLIT_ENABLED.key, "true"); + val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration) + env.getConfig.setExecutionMode(ExecutionMode.BATCH) + env.getConfig.setParallelism(parallelism) + SplitHelper.runSplitRead(env) + env.execute("split test") + } + +} diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java index 2a4349a1b99..7f89115b4ef 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java @@ -442,7 +442,8 @@ public void release() { logger.debug("release reader for stream {}", streamId); // old client can't support BufferStreamEnd, so for new client it tells client that this // stream is finished. - if (!isLegacy) associatedChannel.writeAndFlush(new BufferStreamEnd(streamId)); + if (!isLegacy && readFinished && buffersToSend.isEmpty()) + associatedChannel.writeAndFlush(new BufferStreamEnd(streamId)); if (!buffersToSend.isEmpty()) { numInUseBuffers.addAndGet(-1 * buffersToSend.size()); buffersToSend.forEach(RecyclableBuffer::recycle);