From 375ba4663db8cce38df3fc058c81ee7ee814de30 Mon Sep 17 00:00:00 2001 From: Xianming Lei <31424839+leixm@users.noreply.github.com> Date: Tue, 3 Dec 2024 18:17:18 +0800 Subject: [PATCH 1/6] [CELEBORN-1760] OOM causes disk buffer unable to be released --- .../apache/celeborn/service/deploy/worker/PushDataHandler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index af6d4cc663..316ea46260 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -1292,7 +1292,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler try { fileWriter.write(body) } catch { - case e: Exception => + case e: Throwable => if (e.isInstanceOf[AlreadyClosedException]) { val (mapId, attemptId) = getMapAttempt(body) val endedAttempt = From 44c6c1cfec3293b52d9f20db3364b525dd94e5f4 Mon Sep 17 00:00:00 2001 From: Xianming Lei <31424839+leixm@users.noreply.github.com> Date: Thu, 5 Dec 2024 14:10:39 +0800 Subject: [PATCH 2/6] fix --- .../common/meta/WorkerPartitionLocationInfo.scala | 6 +++--- .../deploy/worker/storage/PartitionDataWriter.java | 8 +++++++- .../apache/celeborn/service/deploy/worker/Worker.scala | 7 ++++++- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala index 96ff209717..1bc05147d2 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala @@ -97,9 +97,9 @@ class WorkerPartitionLocationInfo extends Logging { locations } - def removeShuffle(shuffleKey: String): Unit = { - primaryPartitionLocations.remove(shuffleKey) - replicaPartitionLocations.remove(shuffleKey) + def removeShuffle(shuffleKey: String): List[PartitionLocation] = { + primaryPartitionLocations.remove(shuffleKey).values().asScala.toList ++ + replicaPartitionLocations.remove(shuffleKey).values().asScala.toList } def removePrimaryPartitions( diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java index 9032bfe919..fdfbe68c80 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java @@ -416,7 +416,13 @@ public void write(ByteBuf data) throws IOException { } data.retain(); - flushBuffer.addComponent(true, data); + try { + flushBuffer.addComponent(true, data); + } catch (OutOfMemoryError oom) { + data.release(); + MemoryManager.instance().releaseDiskBuffer(numBytes); + throw oom; + } if (isMemoryShuffleFile.get()) { memoryFileInfo.updateBytesFlushed(numBytes); } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 3439a2e86d..e091d8185c 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -742,7 +742,12 @@ private[celeborn] class Worker( synchronized { val expiredApplicationIds = new JHashSet[String]() expiredShuffleKeys.asScala.foreach { shuffleKey => - partitionLocationInfo.removeShuffle(shuffleKey) + partitionLocationInfo.removeShuffle(shuffleKey).foreach { loc => + val fileWriter = loc.asInstanceOf[WorkingPartition].getFileWriter + if (!fileWriter.isClosed) { + fileWriter.close() + } + } shufflePartitionType.remove(shuffleKey) shufflePushDataTimeout.remove(shuffleKey) shuffleMapperAttempts.remove(shuffleKey) From 2c4e840004acf0b4f56c6f02e22373059bf7469f Mon Sep 17 00:00:00 2001 From: Xianming Lei <31424839+leixm@users.noreply.github.com> Date: Thu, 5 Dec 2024 14:24:56 +0800 Subject: [PATCH 3/6] fix --- .../celeborn/common/meta/WorkerPartitionLocationInfo.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala index 1bc05147d2..d0d7833f27 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala @@ -99,7 +99,7 @@ class WorkerPartitionLocationInfo extends Logging { def removeShuffle(shuffleKey: String): List[PartitionLocation] = { primaryPartitionLocations.remove(shuffleKey).values().asScala.toList ++ - replicaPartitionLocations.remove(shuffleKey).values().asScala.toList + replicaPartitionLocations.remove(shuffleKey).values().asScala.toList } def removePrimaryPartitions( From e20dc7fe116043f0e222a6f1d51742ecf406d0e2 Mon Sep 17 00:00:00 2001 From: Xianming Lei <31424839+leixm@users.noreply.github.com> Date: Thu, 5 Dec 2024 15:45:36 +0800 Subject: [PATCH 4/6] fix --- .../celeborn/common/meta/WorkerPartitionLocationInfo.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala index d0d7833f27..1f0cf014aa 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala @@ -98,8 +98,10 @@ class WorkerPartitionLocationInfo extends Logging { } def removeShuffle(shuffleKey: String): List[PartitionLocation] = { - primaryPartitionLocations.remove(shuffleKey).values().asScala.toList ++ - replicaPartitionLocations.remove(shuffleKey).values().asScala.toList + Option(primaryPartitionLocations.remove(shuffleKey)) + .map(_.values().asScala.toList).getOrElse(List.empty) ++ + Option(replicaPartitionLocations.remove(shuffleKey)) + .map(_.values().asScala.toList).getOrElse(List.empty) } def removePrimaryPartitions( From 59f627250f560d0d8b70d352a1a89248dce1045b Mon Sep 17 00:00:00 2001 From: Xianming Lei <31424839+leixm@users.noreply.github.com> Date: Fri, 6 Dec 2024 15:19:45 +0800 Subject: [PATCH 5/6] fix --- .../common/meta/WorkerPartitionLocationInfo.scala | 8 +++----- .../apache/celeborn/service/deploy/worker/Worker.scala | 7 +------ 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala index 1f0cf014aa..96ff209717 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala @@ -97,11 +97,9 @@ class WorkerPartitionLocationInfo extends Logging { locations } - def removeShuffle(shuffleKey: String): List[PartitionLocation] = { - Option(primaryPartitionLocations.remove(shuffleKey)) - .map(_.values().asScala.toList).getOrElse(List.empty) ++ - Option(replicaPartitionLocations.remove(shuffleKey)) - .map(_.values().asScala.toList).getOrElse(List.empty) + def removeShuffle(shuffleKey: String): Unit = { + primaryPartitionLocations.remove(shuffleKey) + replicaPartitionLocations.remove(shuffleKey) } def removePrimaryPartitions( diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index e091d8185c..3439a2e86d 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -742,12 +742,7 @@ private[celeborn] class Worker( synchronized { val expiredApplicationIds = new JHashSet[String]() expiredShuffleKeys.asScala.foreach { shuffleKey => - partitionLocationInfo.removeShuffle(shuffleKey).foreach { loc => - val fileWriter = loc.asInstanceOf[WorkingPartition].getFileWriter - if (!fileWriter.isClosed) { - fileWriter.close() - } - } + partitionLocationInfo.removeShuffle(shuffleKey) shufflePartitionType.remove(shuffleKey) shufflePushDataTimeout.remove(shuffleKey) shuffleMapperAttempts.remove(shuffleKey) From eeef82137dc67b14f580e407e25f7efddc903b23 Mon Sep 17 00:00:00 2001 From: Xianming Lei <31424839+leixm@users.noreply.github.com> Date: Fri, 6 Dec 2024 16:24:48 +0800 Subject: [PATCH 6/6] fix --- .../service/deploy/worker/storage/PartitionDataWriter.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java index fdfbe68c80..185eeb9506 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java @@ -420,7 +420,11 @@ public void write(ByteBuf data) throws IOException { flushBuffer.addComponent(true, data); } catch (OutOfMemoryError oom) { data.release(); - MemoryManager.instance().releaseDiskBuffer(numBytes); + if (isMemoryShuffleFile.get()) { + MemoryManager.instance().releaseMemoryFileStorage(numBytes); + } else { + MemoryManager.instance().releaseDiskBuffer(numBytes); + } throw oom; } if (isMemoryShuffleFile.get()) {