From 0ce5938abf86871d25386b8fa1af36d2a93effcb Mon Sep 17 00:00:00 2001 From: "zhongqiang.czq" Date: Sun, 27 Aug 2023 21:56:37 +0800 Subject: [PATCH] fix bufferstream --- .../flink/readclient/CelebornBufferStream.java | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java index dfc932748b5..469f7ce2b54 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java @@ -149,16 +149,8 @@ public void close() { public void moveToNextPartitionIfPossible(long endedStreamId) { if (currentLocationIndex < locations.length) { if (currentLocationIndex > 0) { - if (endedStreamId == streamId) { - logger.debug("Get end streamId {}", endedStreamId); - cleanStream(endedStreamId); - } else { - logger.warn( - "Received unexpected stream end, current stream id {} received ended stream id {}", - this.streamId, - endedStreamId); - return; - } + logger.debug("Get end streamId {}", endedStreamId); + cleanStream(endedStreamId); } try { openStreamInternal();