diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java index 243d261f759..2a4349a1b99 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java @@ -411,9 +411,6 @@ public FileInfo getFileInfo() { public void closeReader() { synchronized (lock) { readFinished = true; - // old client can't support BufferStreamEnd, so for new client it tells client that this - // stream is finished. - if (!isLegacy) associatedChannel.writeAndFlush(new BufferStreamEnd(streamId)); } logger.debug("Closed read for stream {}", this.streamId); } @@ -443,6 +440,9 @@ public void release() { synchronized (lock) { if (!isReleased) { logger.debug("release reader for stream {}", streamId); + // old client can't support BufferStreamEnd, so for new client it tells client that this + // stream is finished. + if (!isLegacy) associatedChannel.writeAndFlush(new BufferStreamEnd(streamId)); if (!buffersToSend.isEmpty()) { numInUseBuffers.addAndGet(-1 * buffersToSend.size()); buffersToSend.forEach(RecyclableBuffer::recycle);