diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java index 49843f8d1..edadaae48 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java @@ -39,6 +39,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.automq.stream.s3.model.StreamRecordBatch.OBJECT_OVERHEAD; + public class BlockCache implements DirectByteBufAlloc.OOMHandler { public static final Integer ASYNC_READ_AHEAD_NOOP_OFFSET = -1; static final int BLOCK_SIZE = 1024 * 1024; @@ -91,6 +93,7 @@ void put0(long streamId, long raAsyncOffset, long raEndOffset, List records, ReadAheadRecord readAheadReco this.firstOffset = records.get(0).getBaseOffset(); this.lastOffset = records.get(records.size() - 1).getLastOffset(); this.size = records.stream().mapToInt(StreamRecordBatch::size).sum(); + this.size += records.size() * OBJECT_OVERHEAD; this.readAheadRecord = readAheadRecord; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java index 096f67eaf..0437235db 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java @@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory; import static com.automq.stream.s3.cache.LogCache.StreamRange.NOOP_OFFSET; +import static com.automq.stream.s3.model.StreamRecordBatch.OBJECT_OVERHEAD; import static com.automq.stream.utils.FutureUtil.suppress; public class LogCache { @@ -91,7 +92,7 @@ public LogCache(long capacity, long cacheBlockMaxSize, int maxCacheBlockStreamCo public boolean put(StreamRecordBatch recordBatch) { TimerUtil timerUtil = new TimerUtil(); tryRealFree(); - size.addAndGet(recordBatch.size()); + size.addAndGet(recordBatch.size() + OBJECT_OVERHEAD); readLock.lock(); boolean full; try { @@ -339,7 +340,7 @@ public boolean put(StreamRecordBatch recordBatch) { return cache; }); int recordSize = recordBatch.size(); - return size.addAndGet(recordSize) >= maxSize || map.size() >= maxStreamCount; + return size.addAndGet(recordSize + OBJECT_OVERHEAD) >= maxSize || map.size() >= maxStreamCount; } public List get(long streamId, long startOffset, long endOffset, int maxBytes) { diff --git a/s3stream/src/main/java/com/automq/stream/s3/model/StreamRecordBatch.java b/s3stream/src/main/java/com/automq/stream/s3/model/StreamRecordBatch.java index c2ed3bb39..418609fe6 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/model/StreamRecordBatch.java +++ b/s3stream/src/main/java/com/automq/stream/s3/model/StreamRecordBatch.java @@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf; public class StreamRecordBatch implements Comparable, ComparableItem { + public static final int OBJECT_OVERHEAD = 52; private final long streamId; private final long epoch; private final long baseOffset;