Skip to content

Commit

Permalink
fix(kafka_issues562): fix duplicated release (#838)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Dec 16, 2023
1 parent 69b9659 commit 1b2e195
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.1.3-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.6.10-SNAPSHOT</s3stream.version>
<s3stream.version>0.7.0-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.6.10-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
13 changes: 9 additions & 4 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ private CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset,
}
Timeout timeout = timeoutDetect.newTimeout(t -> LOGGER.warn("read from block cache timeout, stream={}, {}, maxBytes: {}", streamId, startOffset, maxBytes), 1, TimeUnit.MINUTES);
long finalEndOffset = endOffset;
return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(readDataBlock -> {
List<StreamRecordBatch> rst = new ArrayList<>(readDataBlock.getRecords());
return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(blockCacheRst -> {
List<StreamRecordBatch> rst = new ArrayList<>(blockCacheRst.getRecords());
int remainingBytesSize = maxBytes - rst.stream().mapToInt(StreamRecordBatch::size).sum();
int readIndex = -1;
for (int i = 0; i < logCacheRecords.size() && remainingBytesSize > 0; i++) {
Expand All @@ -376,12 +376,17 @@ private CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset,
rst.add(record);
remainingBytesSize -= record.size();
}
try {
continuousCheck(rst);
} catch (IllegalArgumentException e) {
blockCacheRst.getRecords().forEach(StreamRecordBatch::release);
throw e;
}
if (readIndex < logCacheRecords.size()) {
// release unnecessary record
logCacheRecords.subList(readIndex + 1, logCacheRecords.size()).forEach(StreamRecordBatch::release);
}
continuousCheck(rst);
return new ReadDataBlock(rst, readDataBlock.getCacheAccessType());
return new ReadDataBlock(rst, blockCacheRst.getCacheAccessType());
}).whenComplete((rst, ex) -> {
timeout.cancel();
if (ex != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.stream.Collectors;

import static com.automq.stream.s3.cache.LogCache.StreamRange.NOOP_OFFSET;
import static com.automq.stream.utils.FutureUtil.suppress;

public class LogCache {
private static final Logger LOGGER = LoggerFactory.getLogger(LogCache.class);
Expand Down Expand Up @@ -361,8 +362,10 @@ public long size() {
}

public void free() {
map.forEach((streamId, records) -> records.free());
map.clear();
suppress(() -> {
map.forEach((streamId, records) -> records.free());
map.clear();
}, LOGGER);
}

public long createdTimestamp() {
Expand Down

0 comments on commit 1b2e195

Please sign in to comment.