Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1760] OOM causes disk buffer unable to be released #2975

Closed
wants to merge 6 commits into from

Conversation

leixm
Copy link
Contributor

@leixm leixm commented Dec 3, 2024

What changes were proposed in this pull request?

When OOM occurs in flushBuffer.addComponent, there are two problems.

  1. decrementPendingWrites is not called, causing close PartitionDataWriter to be stuck for a period of time during commit.
  2. After OOM occurs, ByteBuf is not released, causing memory leaks.

Why are the changes needed?

Fix disk buffer unable to be released

Does this PR introduce any user-facing change?

Yes, it fixes a memory leak issue in some corner cases.

How was this patch tested?

I did some tests. 2 of the nodes did not have this PR, and the memory of these two nodes could not be released. 1 node had this PR, and the memory could be released. It was obviously much lower than the previous 2 nodes.
image

@leixm
Copy link
Contributor Author

leixm commented Dec 3, 2024

24/11/27 01:44:36,396 WARN [push-server-6-14] TransportChannelHandler: Exception in connection from /10.217.150.42:27112
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 4194304 byte(s) of direct memory (used: 10733223943, max: 10737418240)
        at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:843)
        at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:772)
        at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:717)
        at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:692)
        at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:215)
        at io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:197)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:139)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:129)
        at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:395)
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
        at io.netty.buffer.CompositeByteBuf.allocBuffer(CompositeByteBuf.java:1879)
        at io.netty.buffer.CompositeByteBuf.consolidate0(CompositeByteBuf.java:1758)
        at io.netty.buffer.CompositeByteBuf.consolidateIfNeeded(CompositeByteBuf.java:571)
        at io.netty.buffer.CompositeByteBuf.addComponent(CompositeByteBuf.java:266)
        at io.netty.buffer.CompositeByteBuf.addComponent(CompositeByteBuf.java:222)
        at org.apache.celeborn.service.deploy.worker.storage.FileWriter.write(FileWriter.java:224)
        at org.apache.celeborn.service.deploy.worker.PushDataHandler.writeData$1(PushDataHandler.scala:1235)
        at org.apache.celeborn.service.deploy.worker.PushDataHandler.writeLocalData(PushDataHandler.scala:1278)
        at org.apache.celeborn.service.deploy.worker.PushDataHandler.handlePushMergedData(PushDataHandler.scala:636)
        at org.apache.celeborn.service.deploy.worker.PushDataHandler.$anonfun$receive$2(PushDataHandler.scala:146)
        at org.apache.celeborn.service.deploy.worker.PushDataHandler.handleCore(PushDataHandler.scala:743)
        at org.apache.celeborn.service.deploy.worker.PushDataHandler.receive(PushDataHandler.scala:147)
        at org.apache.celeborn.common.network.server.TransportRequestHandler.processOtherMessages(TransportRequestHandler.java:132)
        at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:88)
        at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:151)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)



24/11/27 01:44:38,385 WARN [push-server-6-8] TransportChannelHandler: Exception in connection from /10.217.113.154:31790
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 4194304 byte(s) of direct memory (used: 10733223943, max: 10737418240)
        at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:843)
        at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:772)
        at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:717)
        at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:692)
        at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:215)
        at io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:197)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:139)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:129)
        at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:395)
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
        at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:140)
        at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:120)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:150)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
4/11/27 01:50:49,825 ERROR [worker-files-committer-34907] Controller: Commit file for application_1727161546827_15995292_1-205 1765-0 failed.
java.io.IOException: Wait pending actions timeout.
        at org.apache.celeborn.service.deploy.worker.storage.FileWriter.waitOnNoPending(FileWriter.java:359)
        at org.apache.celeborn.service.deploy.worker.storage.FileWriter.close(FileWriter.java:271)
        at org.apache.celeborn.service.deploy.worker.storage.ReducePartitionFileWriter.close(ReducePartitionFileWriter.java:93)
        at org.apache.celeborn.service.deploy.worker.Controller$$anon$1.run(Controller.scala:306)
        at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
24/11/27 01:50:49,825 ERROR [worker-files-committer-34908] Controller: Commit file for application_1727161546827_15995292_1-205 415-0 failed.
java.io.IOException: Wait pending actions timeout.
        at org.apache.celeborn.service.deploy.worker.storage.FileWriter.waitOnNoPending(FileWriter.java:359)
        at org.apache.celeborn.service.deploy.worker.storage.FileWriter.close(FileWriter.java:271)
        at org.apache.celeborn.service.deploy.worker.storage.ReducePartitionFileWriter.close(ReducePartitionFileWriter.java:93)
        at org.apache.celeborn.service.deploy.worker.Controller$$anon$1.run(Controller.scala:306)
        at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
24/11/27 01:50:49,825 ERROR [worker-files-committer-34909] Controller: Commit file for application_1727161546827_15995292_1-205 1090-0 failed.
java.io.IOException: Wait pending actions timeout.
        at org.apache.celeborn.service.deploy.worker.storage.FileWriter.waitOnNoPending(FileWriter.java:359)
        at org.apache.celeborn.service.deploy.worker.storage.FileWriter.close(FileWriter.java:271)
        at org.apache.celeborn.service.deploy.worker.storage.ReducePartitionFileWriter.close(ReducePartitionFileWriter.java:93)
        at org.apache.celeborn.service.deploy.worker.Controller$$anon$1.run(Controller.scala:306)
        at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

24/11/27 03:05:19,880 WARN [worker-memory-manager-actor] StorageManager: Skip flushOnMemoryPressure because LocalFlusher@93798665-/mnt/ssd/0 has error: Wait pending actions timeout.
24/11/27 03:05:19,881 WARN [worker-memory-manager-actor] StorageManager: Skip flushOnMemoryPressure because LocalFlusher@93798665-/mnt/ssd/0 has error: Wait pending actions timeout.
24/11/27 03:05:19,881 WARN [worker-memory-manager-actor] StorageManager: Skip flushOnMemoryPressure because LocalFlusher@93798665-/mnt/ssd/0 has error: Wait pending actions timeout.
24/11/27 03:05:19,881 WARN [worker-memory-manager-actor] StorageManager: Skip flushOnMemoryPressure because LocalFlusher@93798665-/mnt/ssd/0 has error: Wait pending actions timeout.
24/11/27 03:05:19,881 WARN [worker-memory-manager-actor] StorageManager: Skip flushOnMemoryPressure because LocalFlusher@93798665-/mnt/ssd/0 has error: Wait pending actions timeout.
24/11/27 03:05:19,881 WARN [worker-memory-manager-actor] StorageManager: Skip flushOnMemoryPressure because LocalFlusher@93798665-/mnt/ssd/0 has error: Wait pending actions timeout

image

@leixm
Copy link
Contributor Author

leixm commented Dec 3, 2024

@RexXiong @AngersZhuuuu PTAL.

@FMX
Copy link
Contributor

FMX commented Dec 3, 2024

@leixm Can you provide some memory metrics with this PR after a worker is OOM?

@leixm
Copy link
Contributor Author

leixm commented Dec 3, 2024

24/11/27 01:43:22,290 INFO [worker-memory-manager-reporter] MemoryManager: Direct memory usage: 2.2 GiB/10.0 GiB, disk buffer size: 651.4 MiB, sort memory size: 0.0 B, read buffer size: 0.0 B
24/11/27 01:43:32,290 INFO [worker-memory-manager-reporter] MemoryManager: Direct memory usage: 2.5 GiB/10.0 GiB, disk buffer size: 287.9 MiB, sort memory size: 0.0 B, read buffer size: 0.0 B
24/11/27 01:43:42,290 INFO [worker-memory-manager-reporter] MemoryManager: Direct memory usage: 2.3 GiB/10.0 GiB, disk buffer size: 299.9 MiB, sort memory size: 0.0 B, read buffer size: 0.0 B
24/11/27 01:43:52,290 INFO [worker-memory-manager-reporter] MemoryManager: Direct memory usage: 3.7 GiB/10.0 GiB, disk buffer size: 272.4 MiB, sort memory size: 0.0 B, read buffer size: 0.0 B
24/11/27 01:44:02,453 INFO [worker-memory-manager-reporter] MemoryManager: Direct memory usage: 3.2 GiB/10.0 GiB, disk buffer size: 284.2 MiB, sort memory size: 0.0 B, read buffer size: 0.0 B
24/11/27 01:44:13,184 INFO [worker-memory-manager-reporter] MemoryManager: Direct memory usage: 5.5 GiB/10.0 GiB, disk buffer size: 1998.8 MiB, sort memory size: 0.0 B, read buffer size: 0.0 B
24/11/27 01:44:23,185 INFO [worker-memory-manager-reporter] MemoryManager: Direct memory usage: 4.8 GiB/10.0 GiB, disk buffer size: 1269.2 MiB, sort memory size: 0.0 B, read buffer size: 0.0 B
24/11/27 01:44:33,418 INFO [worker-memory-manager-reporter] MemoryManager: Direct memory usage: 10.0 GiB/10.0 GiB, disk buffer size: 6.8 GiB, sort memory size: 0.0 B, read buffer size: 0.0 B
24/11/27 01:44:44,008 INFO [worker-memory-manager-reporter] MemoryManager: Direct memory usage: 9.2 GiB/10.0 GiB, disk buffer size: 0.0 B, sort memory size: 0.0 B, read buffer size: 0.0 B
24/11/27 01:44:54,008 INFO [worker-memory-manager-reporter] MemoryManager: Direct memory usage: 9.2 GiB/10.0 GiB, disk buffer size: 0.0 B, sort memory size: 0.0 B, read buffer size: 0.0 B
24/11/27 01:45:04,008 INFO [worker-memory-manager-reporter] MemoryManager: Direct memory usage: 9.2 GiB/10.0 GiB, disk buffer size: 0.0 B, sort memory size: 0.0 B, read buffer size: 0.0 B

@leixm
Copy link
Contributor Author

leixm commented Dec 3, 2024

It seems that the diskBufferCount has decreased, but the total off-heap memory still cannot be released.

@leixm
Copy link
Contributor Author

leixm commented Dec 3, 2024

The catch Exception here will cause fileWriter.decrementPendingWrites() to not be called, but I am not sure whether it will indirectly cause the off-heap memory to not be released. I will investigate further.

Copy link
Member

@turboFei turboFei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@FMX
Copy link
Contributor

FMX commented Dec 4, 2024

The off-heap memory is not released if the disk buffer counter is decreased. This PR failed to complete its purpose. There is more to investigate to find out what happened before merging into main.

Copy link
Contributor

@FMX FMX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More information is needed.

@leixm
Copy link
Contributor Author

leixm commented Dec 5, 2024

There are three problems in total

  1. PartitionDataWriter#write flushBuffer.addComponent(true, data) After OOM occurs, data is not released.
  2. After OOM occurs in PushDataHandler#writeLocalData fileWriter.write(body), fileWriter.decrementPendingWrites() is not called, which will cause some fileWriters to wait for a period of time when call close().
  3. For the failed shuffle, commit was not called and the PartitionDataWriter was not closed, so there was no returnBuffer. The PartitionDataWriter should be closed when the shuffle expires.

@leixm
Copy link
Contributor Author

leixm commented Dec 5, 2024

I did some tests. 2 of the nodes did not have this PR, and the memory of these two nodes could not be released. 1 node had this PR, and the memory could be released. It was obviously much lower than the previous 2 nodes.
image

@leixm
Copy link
Contributor Author

leixm commented Dec 5, 2024

@FMX @turboFei @RexXiong Can you help review again?

@leixm leixm requested a review from FMX December 6, 2024 02:46
@@ -742,7 +742,12 @@ private[celeborn] class Worker(
synchronized {
val expiredApplicationIds = new JHashSet[String]()
expiredShuffleKeys.asScala.foreach { shuffleKey =>
partitionLocationInfo.removeShuffle(shuffleKey)
partitionLocationInfo.removeShuffle(shuffleKey).foreach { loc =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StorageManager will remove expired shuffle keys which will destroy the writerd and return buffer.

Copy link
Contributor

@FMX FMX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@AngersZhuuuu AngersZhuuuu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@pan3793
Copy link
Member

pan3793 commented Dec 6, 2024

@leixm would you mind updating the PR description to summarize the conclusion?

@@ -416,7 +416,13 @@ public void write(ByteBuf data) throws IOException {
}

data.retain();
flushBuffer.addComponent(true, data);
try {
flushBuffer.addComponent(true, data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the non-memory shuffle file storage type requires a call to releaseDiskBuffer?

@leixm
Copy link
Contributor Author

leixm commented Dec 6, 2024

@leixm would you mind updating the PR description to summarize the conclusion?

Done,

@AngersZhuuuu
Copy link
Contributor

ping @FMX

@RexXiong RexXiong closed this in 372ef79 Dec 10, 2024
RexXiong pushed a commit that referenced this pull request Dec 10, 2024
### What changes were proposed in this pull request?
When OOM occurs in flushBuffer.addComponent, there are two problems.

1. decrementPendingWrites is not called, causing close PartitionDataWriter to be stuck for a period of time during commit.
2. After OOM occurs, ByteBuf is not released, causing memory leaks.

### Why are the changes needed?
Fix disk buffer unable to be released

### Does this PR introduce _any_ user-facing change?
Yes, it fixes a memory leak issue in some corner cases.

### How was this patch tested?

I did some tests. 2 of the nodes did not have this PR, and the memory of these two nodes could not be released. 1 node had this PR, and the memory could be released. It was obviously much lower than the previous 2 nodes.
![image](https://github.com/user-attachments/assets/3fe846ec-6ee8-432a-be7a-a7efb7c102d0)

Closes #2975 from leixm/CELEBORN-1760.

Authored-by: Xianming Lei <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit 372ef79)
Signed-off-by: Shuang <[email protected]>
@RexXiong
Copy link
Contributor

Thanks merge to main(v0.6.0) and branch-0.5(v0.5.3)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants