diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java index 9032bfe9196..185eeb9506a 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java @@ -416,7 +416,17 @@ public void write(ByteBuf data) throws IOException { } data.retain(); - flushBuffer.addComponent(true, data); + try { + flushBuffer.addComponent(true, data); + } catch (OutOfMemoryError oom) { + data.release(); + if (isMemoryShuffleFile.get()) { + MemoryManager.instance().releaseMemoryFileStorage(numBytes); + } else { + MemoryManager.instance().releaseDiskBuffer(numBytes); + } + throw oom; + } if (isMemoryShuffleFile.get()) { memoryFileInfo.updateBytesFlushed(numBytes); } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index af6d4cc663d..316ea462603 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -1292,7 +1292,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler try { fileWriter.write(body) } catch { - case e: Exception => + case e: Throwable => if (e.isInstanceOf[AlreadyClosedException]) { val (mapId, attemptId) = getMapAttempt(body) val endedAttempt =