Skip to content

Commit

Permalink
feat(s3stream): halt when oom (#942)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Feb 23, 2024
1 parent 0123f90 commit 10241ef
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,15 @@

import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.stats.ByteBufStats;
import com.automq.stream.utils.Threads;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DirectByteBufAlloc {
private static final Logger LOGGER = LoggerFactory.getLogger(DirectByteBufAlloc.class);
private static final PooledByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
private static final List<OOMHandler> OOM_HANDLERS = new ArrayList<>();
private static long lastLogTimestamp = 0L;

public static CompositeByteBuf compositeByteBuffer() {
return ALLOC.compositeDirectBuffer(Integer.MAX_VALUE);
Expand All @@ -43,31 +38,13 @@ public static ByteBuf byteBuffer(int initCapacity, String name) {
}
return ALLOC.directBuffer(initCapacity);
} catch (OutOfMemoryError e) {
for (; ; ) {
int freedBytes = 0;
for (OOMHandler handler : OOM_HANDLERS) {
freedBytes += handler.handle(initCapacity);
try {
ByteBuf buf = ALLOC.directBuffer(initCapacity);
LOGGER.warn("OOM recovered, freed {} bytes", freedBytes);
return buf;
} catch (OutOfMemoryError e2) {
// ignore
}
}
if (System.currentTimeMillis() - lastLogTimestamp >= 1000L) {
LOGGER.error("try recover from OOM fail, freedBytes={}, retry later", freedBytes);
lastLogTimestamp = System.currentTimeMillis();
}
Threads.sleep(1L);
}
LOGGER.error("alloc direct buffer OOM", e);
System.err.println("alloc direct buffer OOM");
Runtime.getRuntime().halt(1);
throw e;
}
}

public static void registerOOMHandlers(OOMHandler handler) {
OOM_HANDLERS.add(handler);
}

public interface OOMHandler {
/**
* Try handle OOM exception.
Expand Down
3 changes: 1 addition & 2 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ public S3Storage(Config config, WriteAheadLog deltaWAL, StreamManager streamMana
this.deltaWAL = deltaWAL;
this.blockCache = blockCache;
this.deltaWALCache = new LogCache(config.walCacheSize(), config.walUploadThreshold(), config.maxStreamNumPerStreamSetObject());
DirectByteBufAlloc.registerOOMHandlers(new LogCacheEvictOOMHandler());
this.streamManager = streamManager;
this.objectManager = objectManager;
this.s3Operator = s3Operator;
Expand Down Expand Up @@ -687,7 +686,7 @@ private void commitDeltaWALUpload(DeltaWALUploadTaskContext context) {
System.err.println("Unexpected exception when commit stream set object");
//noinspection CallToPrintStackTrace
ex.printStackTrace();
System.exit(1);
Runtime.getRuntime().halt(1);
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public class BlockCache implements DirectByteBufAlloc.OOMHandler {

public BlockCache(long maxSize) {
this.maxSize = maxSize;
DirectByteBufAlloc.registerOOMHandlers(this);
S3StreamMetricsManager.registerBlockCacheSizeSupplier(size::get);
}

Expand Down

0 comments on commit 10241ef

Please sign in to comment.