From 6d5475d231284f7cb90eee26497b2766de441ce6 Mon Sep 17 00:00:00 2001 From: "Xu Han@AutoMQ" Date: Wed, 27 Dec 2023 09:48:32 +0800 Subject: [PATCH] feat(kafka_issues601): optimize memory usage when upload too much blocks (#857) Signed-off-by: Robin Han --- .../com/automq/stream/s3/ObjectWriter.java | 48 ++++++++----------- 1 file changed, 20 insertions(+), 28 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java b/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java index a50688c27..f86c82ffa 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java @@ -65,6 +65,7 @@ class DefaultObjectWriter implements ObjectWriter { private final int blockSizeThreshold; private final int partSizeThreshold; private final List waitingUploadBlocks; + private int waitingUploadBlocksSize; private final List completedBlocks; private IndexBlock indexBlock; private final Writer writer; @@ -92,14 +93,14 @@ public DefaultObjectWriter(long objectId, S3Operator s3Operator, int blockSizeTh public void write(long streamId, List records) { List> blocks = groupByBlock(records); - List> closeCf = new ArrayList<>(blocks.size()); blocks.forEach(blockRecords -> { DataBlock block = new DataBlock(streamId, blockRecords); waitingUploadBlocks.add(block); - closeCf.add(block.close()); + waitingUploadBlocksSize += block.size(); }); - CompletableFuture.allOf(closeCf.toArray(new CompletableFuture[0])).whenComplete((nil, ex) -> tryUploadPart()); - + if (waitingUploadBlocksSize >= partSizeThreshold) { + tryUploadPart(); + } } private List> groupByBlock(List records) { @@ -122,13 +123,10 @@ private List> groupByBlock(List recor private synchronized void tryUploadPart() { for (; ; ) { - List uploadBlocks = new ArrayList<>(32); + List uploadBlocks = new ArrayList<>(waitingUploadBlocks.size()); boolean partFull = false; int size = 0; for (DataBlock block : waitingUploadBlocks) { - if (!block.close().isDone()) { - break; - } uploadBlocks.add(block); size += block.size(); if (size >= partSizeThreshold) { @@ -139,6 +137,7 @@ private synchronized void tryUploadPart() { if (partFull) { CompositeByteBuf partBuf = DirectByteBufAlloc.compositeByteBuffer(); for (DataBlock block : uploadBlocks) { + waitingUploadBlocksSize -= block.size(); partBuf.addComponent(true, block.buffer()); } writer.write(partBuf); @@ -151,22 +150,19 @@ private synchronized void tryUploadPart() { } public CompletableFuture close() { - CompletableFuture waitBlocksCloseCf = CompletableFuture.allOf(waitingUploadBlocks.stream().map(DataBlock::close).toArray(CompletableFuture[]::new)); - return waitBlocksCloseCf.thenCompose(nil -> { - CompositeByteBuf buf = DirectByteBufAlloc.compositeByteBuffer(); - for (DataBlock block : waitingUploadBlocks) { - buf.addComponent(true, block.buffer()); - completedBlocks.add(block); - } - waitingUploadBlocks.clear(); - indexBlock = new IndexBlock(); - buf.addComponent(true, indexBlock.buffer()); - Footer footer = new Footer(indexBlock.position(), indexBlock.size()); - buf.addComponent(true, footer.buffer()); - writer.write(buf.duplicate()); - size = indexBlock.position() + indexBlock.size() + footer.size(); - return writer.close(); - }); + CompositeByteBuf buf = DirectByteBufAlloc.compositeByteBuffer(); + for (DataBlock block : waitingUploadBlocks) { + buf.addComponent(true, block.buffer()); + completedBlocks.add(block); + } + waitingUploadBlocks.clear(); + indexBlock = new IndexBlock(); + buf.addComponent(true, indexBlock.buffer()); + Footer footer = new Footer(indexBlock.position(), indexBlock.size()); + buf.addComponent(true, footer.buffer()); + writer.write(buf.duplicate()); + size = indexBlock.position() + indexBlock.size() + footer.size(); + return writer.close(); } public List getStreamRanges() { @@ -268,10 +264,6 @@ public DataBlock(long streamId, List records) { this.streamRange = new ObjectStreamRange(streamId, records.get(0).getEpoch(), records.get(0).getBaseOffset(), records.get(records.size() - 1).getLastOffset(), size); } - public CompletableFuture close() { - return CompletableFuture.completedFuture(null); - } - public int size() { return size; }