diff --git a/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java b/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java index 14db9084c..b88ea2160 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java +++ b/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java @@ -13,20 +13,15 @@ import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.stats.ByteBufStats; -import com.automq.stream.utils.Threads; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.PooledByteBufAllocator; -import java.util.ArrayList; -import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DirectByteBufAlloc { private static final Logger LOGGER = LoggerFactory.getLogger(DirectByteBufAlloc.class); private static final PooledByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; - private static final List OOM_HANDLERS = new ArrayList<>(); - private static long lastLogTimestamp = 0L; public static CompositeByteBuf compositeByteBuffer() { return ALLOC.compositeDirectBuffer(Integer.MAX_VALUE); @@ -43,31 +38,13 @@ public static ByteBuf byteBuffer(int initCapacity, String name) { } return ALLOC.directBuffer(initCapacity); } catch (OutOfMemoryError e) { - for (; ; ) { - int freedBytes = 0; - for (OOMHandler handler : OOM_HANDLERS) { - freedBytes += handler.handle(initCapacity); - try { - ByteBuf buf = ALLOC.directBuffer(initCapacity); - LOGGER.warn("OOM recovered, freed {} bytes", freedBytes); - return buf; - } catch (OutOfMemoryError e2) { - // ignore - } - } - if (System.currentTimeMillis() - lastLogTimestamp >= 1000L) { - LOGGER.error("try recover from OOM fail, freedBytes={}, retry later", freedBytes); - lastLogTimestamp = System.currentTimeMillis(); - } - Threads.sleep(1L); - } + LOGGER.error("alloc direct buffer OOM", e); + System.err.println("alloc direct buffer OOM"); + Runtime.getRuntime().halt(1); + throw e; } } - public static void registerOOMHandlers(OOMHandler handler) { - OOM_HANDLERS.add(handler); - } - public interface OOMHandler { /** * Try handle OOM exception. diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index c577064b4..27c27cc4d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -125,7 +125,6 @@ public S3Storage(Config config, WriteAheadLog deltaWAL, StreamManager streamMana this.deltaWAL = deltaWAL; this.blockCache = blockCache; this.deltaWALCache = new LogCache(config.walCacheSize(), config.walUploadThreshold(), config.maxStreamNumPerStreamSetObject()); - DirectByteBufAlloc.registerOOMHandlers(new LogCacheEvictOOMHandler()); this.streamManager = streamManager; this.objectManager = objectManager; this.s3Operator = s3Operator; @@ -687,7 +686,7 @@ private void commitDeltaWALUpload(DeltaWALUploadTaskContext context) { System.err.println("Unexpected exception when commit stream set object"); //noinspection CallToPrintStackTrace ex.printStackTrace(); - System.exit(1); + Runtime.getRuntime().halt(1); return null; }); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java index 6476d45f9..864addc9a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java @@ -51,7 +51,6 @@ public class BlockCache implements DirectByteBufAlloc.OOMHandler { public BlockCache(long maxSize) { this.maxSize = maxSize; - DirectByteBufAlloc.registerOOMHandlers(this); S3StreamMetricsManager.registerBlockCacheSizeSupplier(size::get); }