Skip to content

Commit

Permalink
feat(kafka_issues601): optimize memory usage when upload too much blo…
Browse files Browse the repository at this point in the history
…cks (#857)

Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Dec 27, 2023
1 parent eb69853 commit 6d5475d
Showing 1 changed file with 20 additions and 28 deletions.
48 changes: 20 additions & 28 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class DefaultObjectWriter implements ObjectWriter {
private final int blockSizeThreshold;
private final int partSizeThreshold;
private final List<DataBlock> waitingUploadBlocks;
private int waitingUploadBlocksSize;
private final List<DataBlock> completedBlocks;
private IndexBlock indexBlock;
private final Writer writer;
Expand Down Expand Up @@ -92,14 +93,14 @@ public DefaultObjectWriter(long objectId, S3Operator s3Operator, int blockSizeTh

public void write(long streamId, List<StreamRecordBatch> records) {
List<List<StreamRecordBatch>> blocks = groupByBlock(records);
List<CompletableFuture<Void>> closeCf = new ArrayList<>(blocks.size());
blocks.forEach(blockRecords -> {
DataBlock block = new DataBlock(streamId, blockRecords);
waitingUploadBlocks.add(block);
closeCf.add(block.close());
waitingUploadBlocksSize += block.size();
});
CompletableFuture.allOf(closeCf.toArray(new CompletableFuture[0])).whenComplete((nil, ex) -> tryUploadPart());

if (waitingUploadBlocksSize >= partSizeThreshold) {
tryUploadPart();
}
}

private List<List<StreamRecordBatch>> groupByBlock(List<StreamRecordBatch> records) {
Expand All @@ -122,13 +123,10 @@ private List<List<StreamRecordBatch>> groupByBlock(List<StreamRecordBatch> recor

private synchronized void tryUploadPart() {
for (; ; ) {
List<DataBlock> uploadBlocks = new ArrayList<>(32);
List<DataBlock> uploadBlocks = new ArrayList<>(waitingUploadBlocks.size());
boolean partFull = false;
int size = 0;
for (DataBlock block : waitingUploadBlocks) {
if (!block.close().isDone()) {
break;
}
uploadBlocks.add(block);
size += block.size();
if (size >= partSizeThreshold) {
Expand All @@ -139,6 +137,7 @@ private synchronized void tryUploadPart() {
if (partFull) {
CompositeByteBuf partBuf = DirectByteBufAlloc.compositeByteBuffer();
for (DataBlock block : uploadBlocks) {
waitingUploadBlocksSize -= block.size();
partBuf.addComponent(true, block.buffer());
}
writer.write(partBuf);
Expand All @@ -151,22 +150,19 @@ private synchronized void tryUploadPart() {
}

public CompletableFuture<Void> close() {
CompletableFuture<Void> waitBlocksCloseCf = CompletableFuture.allOf(waitingUploadBlocks.stream().map(DataBlock::close).toArray(CompletableFuture[]::new));
return waitBlocksCloseCf.thenCompose(nil -> {
CompositeByteBuf buf = DirectByteBufAlloc.compositeByteBuffer();
for (DataBlock block : waitingUploadBlocks) {
buf.addComponent(true, block.buffer());
completedBlocks.add(block);
}
waitingUploadBlocks.clear();
indexBlock = new IndexBlock();
buf.addComponent(true, indexBlock.buffer());
Footer footer = new Footer(indexBlock.position(), indexBlock.size());
buf.addComponent(true, footer.buffer());
writer.write(buf.duplicate());
size = indexBlock.position() + indexBlock.size() + footer.size();
return writer.close();
});
CompositeByteBuf buf = DirectByteBufAlloc.compositeByteBuffer();
for (DataBlock block : waitingUploadBlocks) {
buf.addComponent(true, block.buffer());
completedBlocks.add(block);
}
waitingUploadBlocks.clear();
indexBlock = new IndexBlock();
buf.addComponent(true, indexBlock.buffer());
Footer footer = new Footer(indexBlock.position(), indexBlock.size());
buf.addComponent(true, footer.buffer());
writer.write(buf.duplicate());
size = indexBlock.position() + indexBlock.size() + footer.size();
return writer.close();
}

public List<ObjectStreamRange> getStreamRanges() {
Expand Down Expand Up @@ -268,10 +264,6 @@ public DataBlock(long streamId, List<StreamRecordBatch> records) {
this.streamRange = new ObjectStreamRange(streamId, records.get(0).getEpoch(), records.get(0).getBaseOffset(), records.get(records.size() - 1).getLastOffset(), size);
}

public CompletableFuture<Void> close() {
return CompletableFuture.completedFuture(null);
}

public int size() {
return size;
}
Expand Down

0 comments on commit 6d5475d

Please sign in to comment.