Skip to content

Commit

Permalink
fix(s3stream): fix delayed release for StreamDataBlock (#945)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Feb 26, 2024
1 parent 23bf90c commit 684e272
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ List<CompactionPlan> generatePlanWithCacheLimit(List<CompactedObjectBuilder> com
List<CompactedObject> compactedObjects = new ArrayList<>();
CompactedObjectBuilder compactedStreamSetObjectBuilder = null;
long totalSize = 0L;
int compactionOrder = 0;
for (int i = 0; i < compactedObjectBuilders.size(); ) {
CompactedObjectBuilder compactedObjectBuilder = compactedObjectBuilders.get(i);
if (totalSize + compactedObjectBuilder.totalBlockSize() > compactionCacheSize) {
Expand All @@ -210,7 +211,7 @@ List<CompactionPlan> generatePlanWithCacheLimit(List<CompactedObjectBuilder> com
compactedStreamSetObjectBuilder = addOrMergeCompactedObject(builder, compactedObjects, compactedStreamSetObjectBuilder);
}
}
compactionPlans.add(generateCompactionPlan(compactedObjects, compactedStreamSetObjectBuilder));
compactionPlans.add(generateCompactionPlan(compactionOrder++, compactedObjects, compactedStreamSetObjectBuilder));
compactedObjects.clear();
compactedStreamSetObjectBuilder = null;
totalSize = 0;
Expand All @@ -223,7 +224,7 @@ List<CompactionPlan> generatePlanWithCacheLimit(List<CompactedObjectBuilder> com

}
if (!compactedObjects.isEmpty() || compactedStreamSetObjectBuilder != null) {
compactionPlans.add(generateCompactionPlan(compactedObjects, compactedStreamSetObjectBuilder));
compactionPlans.add(generateCompactionPlan(compactionOrder, compactedObjects, compactedStreamSetObjectBuilder));
}
return compactionPlans;
}
Expand All @@ -248,7 +249,7 @@ private boolean shouldSplitObject(CompactedObjectBuilder compactedObjectBuilder)
return true;
}

private CompactionPlan generateCompactionPlan(List<CompactedObject> compactedObjects,
private CompactionPlan generateCompactionPlan(int order, List<CompactedObject> compactedObjects,
CompactedObjectBuilder compactedStreamSetObject) {
if (compactedStreamSetObject != null) {
compactedObjects.add(compactedStreamSetObject.build());
Expand All @@ -263,7 +264,7 @@ private CompactionPlan generateCompactionPlan(List<CompactedObject> compactedObj
dataBlocks.sort(StreamDataBlock.BLOCK_POSITION_COMPARATOR);
}

return new CompactionPlan(new ArrayList<>(compactedObjects), streamDataBlockMap);
return new CompactionPlan(order, new ArrayList<>(compactedObjects), streamDataBlockMap);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List<Compactio
}
// wait for all stream objects and stream set object part to be uploaded
compactionCf = CompletableFuture.allOf(cfList.toArray(new CompletableFuture[0]))
.thenAccept(v -> uploader.forceUploadStreamSetObject())
.thenCompose(v -> uploader.forceUploadStreamSetObject())
.exceptionally(ex -> {
uploader.release().thenAccept(v -> {
for (CompactedObject compactedObject : compactionPlan.compactedObjects()) {
Expand All @@ -713,6 +713,15 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List<Compactio
compactionCf = null;

streamObjectCfList.stream().map(CompletableFuture::join).forEach(request::addStreamObject);

List<CompactedObject> compactedObjects = compactionPlan.compactedObjects();
for (CompactedObject compactedObject : compactedObjects) {
for (StreamDataBlock block : compactedObject.streamDataBlocks()) {
if (block.getDataCf().join().refCnt() > 0) {
logger.error("Block {} is not released after compaction, compact type: {}", block, compactedObject.type());
}
}
}
}
List<ObjectStreamRange> objectStreamRanges = CompactionUtils.buildObjectStreamRangeFromGroup(
CompactionUtils.groupStreamDataBlocks(sortedStreamDataBlocks, new GroupByOffsetPredicate()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@
import java.util.Map;

public class CompactionPlan {
private final int order;
private final List<CompactedObject> compactedObjects;
private final Map<Long/* Object id*/, List<StreamDataBlock>> streamDataBlocksMap;

public CompactionPlan(List<CompactedObject> compactedObjects,
public CompactionPlan(int order, List<CompactedObject> compactedObjects,
Map<Long, List<StreamDataBlock>> streamDataBlocksMap) {
this.order = order;
this.compactedObjects = compactedObjects;
this.streamDataBlocksMap = streamDataBlocksMap;
}

public int order() {
return order;
}

public List<CompactedObject> compactedObjects() {
return compactedObjects;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,8 @@ private void write0(String path, ByteBuf data, CompletableFuture<Void> cf) {
S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, objectSize);
S3OperationStats.getInstance().putObjectStats(objectSize, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
LOGGER.debug("put object {} with size {}, cost {}ms", path, objectSize, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
cf.complete(null);
data.release();
cf.complete(null);
}).exceptionally(ex -> {
S3OperationStats.getInstance().putObjectStats(objectSize, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
Expand Down Expand Up @@ -482,7 +482,6 @@ public CompletableFuture<CompletedPart> uploadPart(String path, String uploadId,
} else {
uploadPart0(path, uploadId, partNumber, data, cf);
}
cf.whenComplete((rst, ex) -> data.release());
return refCf;
}

Expand All @@ -497,12 +496,14 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p
uploadPartCf.thenAccept(uploadPartResponse -> {
S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, size);
S3OperationStats.getInstance().uploadPartStats(size, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
part.release();
CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()).build();
cf.complete(completedPart);
}).exceptionally(ex -> {
S3OperationStats.getInstance().uploadPartStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
LOGGER.error("UploadPart for object {}-{} fail", path, partNumber, ex);
part.release();
cf.completeExceptionally(ex);
} else {
LOGGER.warn("UploadPart for object {}-{} fail, retry later", path, partNumber, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@ public CompletableFuture<Void> write(ByteBuf part) {
if (multiPartWriter != null) {
return multiPartWriter.write(part);
} else {
objectWriter.write(part);
if (objectWriter.isFull()) {
newMultiPartWriter();
return multiPartWriter.write(part);
} else {
return objectWriter.write(part);
}
return objectWriter.cf;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public void testWrite_dataLargerThanMaxUploadSize() {
assertTrue(writer.hasBatchingPart());
assertNull(writer.multiPartWriter);
writer.write(TestUtils.random(17 * 1024 * 1024));
assertTrue(writer.hasBatchingPart());
assertNotNull(writer.multiPartWriter);
assertFalse(writer.hasBatchingPart());
writer.write(TestUtils.random(17 * 1024 * 1024));
assertNotNull(writer.multiPartWriter);
assertFalse(writer.hasBatchingPart());
Expand Down

0 comments on commit 684e272

Please sign in to comment.