Skip to content

Commit

Permalink
feat(cache): add cache entry overhead to cache size
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx committed Jan 30, 2024
1 parent 178a58d commit 1c2cd83
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 2 deletions.
3 changes: 3 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@ public class Constants {
public static final int CAPACITY_NOT_SET = -1;
public static final int NOOP_NODE_ID = -1;
public static final long NOOP_EPOCH = -1L;

/* StreamRecordBatch object overhead */
public static final int STREAM_RECORD_BATCH_OVERHEAD = 52;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.automq.stream.s3.Constants.STREAM_RECORD_BATCH_OVERHEAD;

public class BlockCache implements DirectByteBufAlloc.OOMHandler {
public static final Integer ASYNC_READ_AHEAD_NOOP_OFFSET = -1;
static final int BLOCK_SIZE = 1024 * 1024;
Expand Down Expand Up @@ -91,6 +93,7 @@ void put0(long streamId, long raAsyncOffset, long raEndOffset, List<StreamRecord
}

int size = records.stream().mapToInt(StreamRecordBatch::size).sum();
size += records.size() * STREAM_RECORD_BATCH_OVERHEAD;

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] put block cache, stream={}, {}-{}, raAsyncOffset: {}, raEndOffset: {}, total bytes: {} ", streamId, startOffset, endOffset, raAsyncOffset, raEndOffset, size);
Expand Down Expand Up @@ -446,6 +449,7 @@ public CacheBlock(List<StreamRecordBatch> records, ReadAheadRecord readAheadReco
this.firstOffset = records.get(0).getBaseOffset();
this.lastOffset = records.get(records.size() - 1).getLastOffset();
this.size = records.stream().mapToInt(StreamRecordBatch::size).sum();
this.size += records.size() * STREAM_RECORD_BATCH_OVERHEAD;
this.readAheadRecord = readAheadRecord;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.automq.stream.s3.Constants.STREAM_RECORD_BATCH_OVERHEAD;
import static com.automq.stream.s3.cache.LogCache.StreamRange.NOOP_OFFSET;
import static com.automq.stream.utils.FutureUtil.suppress;

Expand Down Expand Up @@ -91,7 +92,7 @@ public LogCache(long capacity, long cacheBlockMaxSize, int maxCacheBlockStreamCo
public boolean put(StreamRecordBatch recordBatch) {
TimerUtil timerUtil = new TimerUtil();
tryRealFree();
size.addAndGet(recordBatch.size());
size.addAndGet(recordBatch.size() + STREAM_RECORD_BATCH_OVERHEAD);
readLock.lock();
boolean full;
try {
Expand Down Expand Up @@ -339,7 +340,7 @@ public boolean put(StreamRecordBatch recordBatch) {
return cache;
});
int recordSize = recordBatch.size();
return size.addAndGet(recordSize) >= maxSize || map.size() >= maxStreamCount;
return size.addAndGet(recordSize + STREAM_RECORD_BATCH_OVERHEAD) >= maxSize || map.size() >= maxStreamCount;
}

public List<StreamRecordBatch> get(long streamId, long startOffset, long endOffset, int maxBytes) {
Expand Down

0 comments on commit 1c2cd83

Please sign in to comment.