diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java index cff9534132..a98d16e0ca 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java @@ -107,15 +107,15 @@ CompletableFuture read(long startOffset, long endOffset, int maxB Throwable cause = FutureUtil.cause(ex); if (cause != null) { readContext.records.forEach(StreamRecordBatch::release); - if (leftRetries > 0 && (cause instanceof ObjectNotExistException || cause instanceof NoSuchKeyException || cause instanceof BlockNotContinuousException)) { + for (Block block : readContext.blocks) { + block.release(); + } + if (leftRetries > 0 && isRecoverable(cause)) { // The cached blocks maybe invalid after object compaction, so we need to reset the blocks and retry read resetBlocks(); // use async to prevent recursive call cause stack overflow eventLoop.execute(() -> FutureUtil.propagate(read(startOffset, endOffset, maxBytes, leftRetries - 1), retCf)); } else { - for (Block block : readContext.blocks) { - block.release(); - } retCf.completeExceptionally(cause); } } else { @@ -163,6 +163,7 @@ void read0(ReadContext ctx, final long startOffset, final long endOffset, final // 3. extract records from blocks loadBlocksCf.thenAccept(nil -> { + ctx.blocks.addAll(blocks); Optional failedBlock = blocks.stream().filter(block -> block.exception != null).findAny(); if (failedBlock.isPresent()) { ctx.cf.completeExceptionally(failedBlock.get().exception); @@ -173,7 +174,6 @@ void read0(ReadContext ctx, final long startOffset, final long endOffset, final streamId, startOffset, endOffset, maxBytes))); return; } - ctx.blocks.addAll(blocks); int remainingSize = maxBytes; long nextStartOffset = startOffset; long nextEndOffset; @@ -440,6 +440,10 @@ private boolean putBlock(Block block) { return true; } + private static boolean isRecoverable(Throwable cause) { + return cause instanceof ObjectNotExistException || cause instanceof NoSuchKeyException || cause instanceof BlockNotContinuousException; + } + static class GetBlocksContext { final List blocks = new ArrayList<>(); final CompletableFuture> cf = new CompletableFuture<>(); @@ -496,7 +500,6 @@ public Block newBlockWithData(boolean readahead) { public void release() { if (released) { - LOGGER.error("[BUG] duplicated release", new IllegalStateException()); return; } released = true; @@ -557,7 +560,8 @@ public void tryReadahead(boolean cacheMiss) { // For get block indexes and load data block are sync success, // the whenComplete will invoke first before assign CompletableFuture to inflightReadaheadCf inflightReadaheadCf.whenComplete((nil, ex) -> { - if (ex != null) { + Throwable cause = FutureUtil.cause(ex); + if (!isRecoverable(cause)) { LOGGER.error("Readahead failed", ex); } inflightReadaheadCf = null;