Skip to content

Commit

Permalink
[CELEBORN-1760] OOM causes disk buffer unable to be released
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
When OOM occurs in flushBuffer.addComponent, there are two problems.

1. decrementPendingWrites is not called, causing close PartitionDataWriter to be stuck for a period of time during commit.
2. After OOM occurs, ByteBuf is not released, causing memory leaks.

### Why are the changes needed?
Fix disk buffer unable to be released

### Does this PR introduce _any_ user-facing change?
Yes, it fixes a memory leak issue in some corner cases.

### How was this patch tested?

I did some tests. 2 of the nodes did not have this PR, and the memory of these two nodes could not be released. 1 node had this PR, and the memory could be released. It was obviously much lower than the previous 2 nodes.
![image](https://github.com/user-attachments/assets/3fe846ec-6ee8-432a-be7a-a7efb7c102d0)

Closes #2975 from leixm/CELEBORN-1760.

Authored-by: Xianming Lei <[email protected]>
Signed-off-by: Shuang <[email protected]>
  • Loading branch information
leixm authored and RexXiong committed Dec 10, 2024
1 parent 6cffc91 commit 372ef79
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,17 @@ public void write(ByteBuf data) throws IOException {
}

data.retain();
flushBuffer.addComponent(true, data);
try {
flushBuffer.addComponent(true, data);
} catch (OutOfMemoryError oom) {
data.release();
if (isMemoryShuffleFile.get()) {
MemoryManager.instance().releaseMemoryFileStorage(numBytes);
} else {
MemoryManager.instance().releaseDiskBuffer(numBytes);
}
throw oom;
}
if (isMemoryShuffleFile.get()) {
memoryFileInfo.updateBytesFlushed(numBytes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1473,7 +1473,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
fileWriter.write(body)
result(index) = StatusCode.SUCCESS
} catch {
case e: Exception =>
case e: Throwable =>
if (e.isInstanceOf[AlreadyClosedException]) {
val (mapId, attemptId) = getMapAttempt(body)
val endedAttempt =
Expand Down

0 comments on commit 372ef79

Please sign in to comment.