diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALCachedChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALCachedChannel.java index fd82611a0..49144a800 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALCachedChannel.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALCachedChannel.java @@ -58,10 +58,17 @@ public synchronized int read(ByteBuf dst, long position, int length) throws IOEx // If we don't know the capacity now, we can't cache. return channel.read(dst, position, length); } + long start = position; length = Math.min(length, dst.writableBytes()); long end = position + length; + ByteBuf cache = getCache(); + if (length > cache.capacity()) { + // If the length is larger than the cache capacity, we can't cache. + return channel.read(dst, position, length); + } + boolean fallWithinCache = cachePosition >= 0 && cachePosition <= start && end <= cachePosition + cache.readableBytes(); if (!fallWithinCache) { cache.clear(); @@ -70,6 +77,7 @@ public synchronized int read(ByteBuf dst, long position, int length) throws IOEx int cacheLength = (int) Math.min(cache.writableBytes(), channel.capacity() - cachePosition); channel.read(cache, cachePosition, cacheLength); } + // Now the cache is ready. int relativePosition = (int) (start - cachePosition); dst.writeBytes(cache, relativePosition, length);