Skip to content

Commit

Permalink
fix test bug
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongqiangczq committed Aug 28, 2023
1 parent 27b1689 commit 5ee3dc9
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,8 @@ public void moveToNextPartitionIfPossible(long endedStreamId) {
currentLocationIndex,
streamId);
if (currentLocationIndex.get() > 0) {
if (endedStreamId == streamId) {
logger.debug("Get end streamId {}", endedStreamId);
cleanStream(endedStreamId);
} else {
logger.warn(
"Received unexpected stream end, current stream id {} received ended stream id {}",
this.streamId,
endedStreamId);
return;
}
logger.debug("Get end streamId {}", endedStreamId);
cleanStream(endedStreamId);
}
if (currentLocationIndex.get() < locations.length) {
try {
Expand Down
4 changes: 2 additions & 2 deletions tests/flink-it/src/test/resources/log4j2-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
~ limitations under the License.
-->

<Configuration status="DEBUG">
<Configuration status="INFO">
<Appenders>
<Console name="stdout" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} %p [%t] %c{1}: %m%n%ex"/>
Expand All @@ -29,7 +29,7 @@
</File>
</Appenders>
<Loggers>
<Root level="DEBUG">
<Root level="INFO">
<AppenderRef ref="stdout"/>
<AppenderRef ref="file"/>
</Root>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.apache.celeborn.tests.flink

import org.apache.flink.api.common.{ExecutionMode, RuntimeExecutionMode}
import org.apache.flink.configuration.{Configuration, ExecutionOptions, RestOptions}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.service.deploy.MiniClusterFeature
import org.apache.celeborn.service.deploy.worker.Worker

class SplitTest extends AnyFunSuite with Logging with MiniClusterFeature
with BeforeAndAfterAll {
var workers: collection.Set[Worker] = null
override def beforeAll(): Unit = {
logInfo("test initialized , setup celeborn mini cluster")
val masterConf = Map(
"celeborn.master.host" -> "localhost",
"celeborn.master.port" -> "9097")
val workerConf = Map(
"celeborn.master.endpoints" -> "localhost:9097",
CelebornConf.WORKER_FLUSHER_BUFFER_SIZE.key -> "10k")
workers = setUpMiniCluster(masterConf, workerConf)._2
}
override def afterAll(): Unit = {
logInfo("all test complete , stop celeborn mini cluster")
shutdownMiniCluster()
}

test("celeborn flink integration test - shuffle partition split test") {
val configuration = new Configuration
val parallelism = 8
configuration.setString(
"shuffle-service-factory.class",
"org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory")
configuration.setString(CelebornConf.MASTER_ENDPOINTS.key, "localhost:9097")
configuration.setString("execution.batch-shuffle-mode", "ALL_EXCHANGES_BLOCKING")
configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
configuration.setString("taskmanager.memory.network.min", "1024m")
configuration.setString(RestOptions.BIND_PORT, "8081-8089")
configuration.setString(
"execution.batch.adaptive.auto-parallelism.min-parallelism",
"" + parallelism)
configuration.setString(
"execution.batch.adaptive.auto-parallelism.max-parallelism",
"" + parallelism)
configuration.setString(CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD.key, "10k")
configuration.setString(CelebornConf.CLIENT_FLINK_SHUFFLE_PARTITION_SPLIT_ENABLED.key, "true");
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration)
env.getConfig.setExecutionMode(ExecutionMode.BATCH)
env.getConfig.setParallelism(parallelism)
SplitHelper.runSplitRead(env)
env.execute("split test")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,8 @@ public void release() {
logger.debug("release reader for stream {}", streamId);
// old client can't support BufferStreamEnd, so for new client it tells client that this
// stream is finished.
if (!isLegacy) associatedChannel.writeAndFlush(new BufferStreamEnd(streamId));
if (!isLegacy && readFinished && buffersToSend.isEmpty())
associatedChannel.writeAndFlush(new BufferStreamEnd(streamId));
if (!buffersToSend.isEmpty()) {
numInUseBuffers.addAndGet(-1 * buffersToSend.size());
buffersToSend.forEach(RecyclableBuffer::recycle);
Expand Down

0 comments on commit 5ee3dc9

Please sign in to comment.