diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionAnalyzer.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionAnalyzer.java index 6bb071d86..94ce4b9c7 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionAnalyzer.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionAnalyzer.java @@ -191,6 +191,7 @@ List generatePlanWithCacheLimit(List com List compactedObjects = new ArrayList<>(); CompactedObjectBuilder compactedStreamSetObjectBuilder = null; long totalSize = 0L; + int compactionOrder = 0; for (int i = 0; i < compactedObjectBuilders.size(); ) { CompactedObjectBuilder compactedObjectBuilder = compactedObjectBuilders.get(i); if (totalSize + compactedObjectBuilder.totalBlockSize() > compactionCacheSize) { @@ -210,7 +211,7 @@ List generatePlanWithCacheLimit(List com compactedStreamSetObjectBuilder = addOrMergeCompactedObject(builder, compactedObjects, compactedStreamSetObjectBuilder); } } - compactionPlans.add(generateCompactionPlan(compactedObjects, compactedStreamSetObjectBuilder)); + compactionPlans.add(generateCompactionPlan(compactionOrder++, compactedObjects, compactedStreamSetObjectBuilder)); compactedObjects.clear(); compactedStreamSetObjectBuilder = null; totalSize = 0; @@ -223,7 +224,7 @@ List generatePlanWithCacheLimit(List com } if (!compactedObjects.isEmpty() || compactedStreamSetObjectBuilder != null) { - compactionPlans.add(generateCompactionPlan(compactedObjects, compactedStreamSetObjectBuilder)); + compactionPlans.add(generateCompactionPlan(compactionOrder, compactedObjects, compactedStreamSetObjectBuilder)); } return compactionPlans; } @@ -248,7 +249,7 @@ private boolean shouldSplitObject(CompactedObjectBuilder compactedObjectBuilder) return true; } - private CompactionPlan generateCompactionPlan(List compactedObjects, + private CompactionPlan generateCompactionPlan(int order, List compactedObjects, CompactedObjectBuilder compactedStreamSetObject) { if (compactedStreamSetObject != null) { compactedObjects.add(compactedStreamSetObject.build()); @@ -263,7 +264,7 @@ private CompactionPlan generateCompactionPlan(List compactedObj dataBlocks.sort(StreamDataBlock.BLOCK_POSITION_COMPARATOR); } - return new CompactionPlan(new ArrayList<>(compactedObjects), streamDataBlockMap); + return new CompactionPlan(order, new ArrayList<>(compactedObjects), streamDataBlockMap); } /** diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index 0546399e7..8547b99e8 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -694,7 +694,7 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List uploader.forceUploadStreamSetObject()) + .thenCompose(v -> uploader.forceUploadStreamSetObject()) .exceptionally(ex -> { uploader.release().thenAccept(v -> { for (CompactedObject compactedObject : compactionPlan.compactedObjects()) { @@ -713,6 +713,15 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List compactedObjects = compactionPlan.compactedObjects(); + for (CompactedObject compactedObject : compactedObjects) { + for (StreamDataBlock block : compactedObject.streamDataBlocks()) { + if (block.getDataCf().join().refCnt() > 0) { + logger.error("Block {} is not released after compaction, compact type: {}", block, compactedObject.type()); + } + } + } } List objectStreamRanges = CompactionUtils.buildObjectStreamRangeFromGroup( CompactionUtils.groupStreamDataBlocks(sortedStreamDataBlocks, new GroupByOffsetPredicate())); diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionPlan.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionPlan.java index a64f5766d..268a8412b 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionPlan.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionPlan.java @@ -17,15 +17,21 @@ import java.util.Map; public class CompactionPlan { + private final int order; private final List compactedObjects; private final Map> streamDataBlocksMap; - public CompactionPlan(List compactedObjects, + public CompactionPlan(int order, List compactedObjects, Map> streamDataBlocksMap) { + this.order = order; this.compactedObjects = compactedObjects; this.streamDataBlocksMap = streamDataBlocksMap; } + public int order() { + return order; + } + public List compactedObjects() { return compactedObjects; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index 89d281124..b36725f9b 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -374,8 +374,8 @@ private void write0(String path, ByteBuf data, CompletableFuture cf) { S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, objectSize); S3OperationStats.getInstance().putObjectStats(objectSize, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); LOGGER.debug("put object {} with size {}, cost {}ms", path, objectSize, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); - cf.complete(null); data.release(); + cf.complete(null); }).exceptionally(ex -> { S3OperationStats.getInstance().putObjectStats(objectSize, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (isUnrecoverable(ex)) { @@ -482,7 +482,6 @@ public CompletableFuture uploadPart(String path, String uploadId, } else { uploadPart0(path, uploadId, partNumber, data, cf); } - cf.whenComplete((rst, ex) -> data.release()); return refCf; } @@ -497,12 +496,14 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p uploadPartCf.thenAccept(uploadPartResponse -> { S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, size); S3OperationStats.getInstance().uploadPartStats(size, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + part.release(); CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()).build(); cf.complete(completedPart); }).exceptionally(ex -> { S3OperationStats.getInstance().uploadPartStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (isUnrecoverable(ex)) { LOGGER.error("UploadPart for object {}-{} fail", path, partNumber, ex); + part.release(); cf.completeExceptionally(ex); } else { LOGGER.warn("UploadPart for object {}-{} fail, retry later", path, partNumber, ex); diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java index 18bf258f7..8eb579f48 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java @@ -50,12 +50,11 @@ public CompletableFuture write(ByteBuf part) { if (multiPartWriter != null) { return multiPartWriter.write(part); } else { + objectWriter.write(part); if (objectWriter.isFull()) { newMultiPartWriter(); - return multiPartWriter.write(part); - } else { - return objectWriter.write(part); } + return objectWriter.cf; } } diff --git a/s3stream/src/test/java/com/automq/stream/s3/operator/ProxyWriterTest.java b/s3stream/src/test/java/com/automq/stream/s3/operator/ProxyWriterTest.java index 7e392c7fe..62682b910 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/operator/ProxyWriterTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/operator/ProxyWriterTest.java @@ -68,7 +68,8 @@ public void testWrite_dataLargerThanMaxUploadSize() { assertTrue(writer.hasBatchingPart()); assertNull(writer.multiPartWriter); writer.write(TestUtils.random(17 * 1024 * 1024)); - assertTrue(writer.hasBatchingPart()); + assertNotNull(writer.multiPartWriter); + assertFalse(writer.hasBatchingPart()); writer.write(TestUtils.random(17 * 1024 * 1024)); assertNotNull(writer.multiPartWriter); assertFalse(writer.hasBatchingPart());