Skip to content

Commit

Permalink
fix bufferstream read stream
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongqiangczq committed Aug 28, 2023
1 parent c2fded8 commit 57df489
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,16 @@ public void close() {
public void moveToNextPartitionIfPossible(long endedStreamId) {
if (currentLocationIndex < locations.length) {
if (currentLocationIndex > 0) {
logger.debug("Get end streamId {}", endedStreamId);
cleanStream(endedStreamId);
if (endedStreamId == streamId) {
logger.debug("Get end streamId {}", endedStreamId);
cleanStream(endedStreamId);
} else {
logger.warn(
"Received unexpected stream end, current stream id {} received ended stream id {}",
this.streamId,
endedStreamId);
return;
}
}
try {
openStreamInternal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,29 @@ class WordCountTest extends AnyFunSuite with Logging with MiniClusterFeature
graph.setJobType(JobType.BATCH)
if (flinkCluster != null) {
flinkCluster.close()
flinkCluster = null
System.gc()
}
flinkCluster = null
System.gc()
val miniClusterConfiguration =
(new MiniClusterConfiguration.Builder).setConfiguration(configuration).build()
flinkCluster = new MiniCluster(miniClusterConfiguration)
flinkCluster.start()
try {
flinkCluster = new MiniCluster(miniClusterConfiguration)
flinkCluster.start()

val jobGraph: JobGraph = StreamingJobGraphGenerator.createJobGraph(graph)
val jobID = flinkCluster.submitJob(jobGraph).get.getJobID
val jobResult = flinkCluster.requestJobResult(jobID).get
if (jobResult.getSerializedThrowable.isPresent)
throw new AssertionError(jobResult.getSerializedThrowable.get)
checkFlushingFileLength()
} finally {
if (flinkCluster != null) {
flinkCluster.close()
flinkCluster = null
System.gc()
}
}

val jobGraph: JobGraph = StreamingJobGraphGenerator.createJobGraph(graph)
val jobID = flinkCluster.submitJob(jobGraph).get.getJobID
val jobResult = flinkCluster.requestJobResult(jobID).get
if (jobResult.getSerializedThrowable.isPresent)
throw new AssertionError(jobResult.getSerializedThrowable.get)
checkFlushingFileLength()
}

private def checkFlushingFileLength(): Unit = {
Expand Down Expand Up @@ -135,20 +144,26 @@ class WordCountTest extends AnyFunSuite with Logging with MiniClusterFeature
SplitHelper.runSplitRead(env)
if (flinkCluster != null) {
flinkCluster.close()
flinkCluster = null
System.gc()
}
flinkCluster = null
System.gc()
val miniClusterConfiguration =
(new MiniClusterConfiguration.Builder).setConfiguration(configuration).build()
flinkCluster = new MiniCluster(miniClusterConfiguration)
flinkCluster.start()
val graph = env.getStreamGraph
graph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING)
graph.setJobType(JobType.BATCH)
val jobGraph: JobGraph = StreamingJobGraphGenerator.createJobGraph(graph)
val jobID = flinkCluster.submitJob(jobGraph).get.getJobID
val jobResult = flinkCluster.requestJobResult(jobID).get
if (jobResult.getSerializedThrowable.isPresent)
throw new AssertionError(jobResult.getSerializedThrowable.get)
try {
flinkCluster = new MiniCluster(miniClusterConfiguration)
flinkCluster.start()
val graph = env.getStreamGraph
graph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING)
graph.setJobType(JobType.BATCH)
val jobGraph: JobGraph = StreamingJobGraphGenerator.createJobGraph(graph)
val jobID = flinkCluster.submitJob(jobGraph).get.getJobID
val jobResult = flinkCluster.requestJobResult(jobID).get
if (jobResult.getSerializedThrowable.isPresent)
throw new AssertionError(jobResult.getSerializedThrowable.get)
} finally {
flinkCluster.close()
flinkCluster = null
System.gc()
}
}
}

0 comments on commit 57df489

Please sign in to comment.