From 5c8f32e6a8057af7f585671cafbe8b93dbd4a5a9 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Fri, 12 Jan 2024 18:03:38 +0800 Subject: [PATCH 1/5] perf: use `for` rather than `stream` in `DefaultFetchResult.` (~0.3%) Signed-off-by: Ning Yu --- s3stream/src/main/java/com/automq/stream/s3/S3Stream.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index fd45e8a0b..7af364066 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -379,7 +379,11 @@ public DefaultFetchResult(List streamRecords, CacheAccessType boolean pooledBuf) { this.pooledRecords = streamRecords; this.pooledBuf = pooledBuf; - this.records = streamRecords.stream().map(r -> new RecordBatchWithContextWrapper(covert(r, pooledBuf), r.getBaseOffset())).collect(Collectors.toList()); + this.records = new ArrayList<>(streamRecords.size()); + for (StreamRecordBatch streamRecordBatch : streamRecords) { + RecordBatch recordBatch = covert(streamRecordBatch, pooledBuf); + records.add(new RecordBatchWithContextWrapper(recordBatch, streamRecordBatch.getBaseOffset())); + } this.cacheAccessType = cacheAccessType; if (!pooledBuf) { streamRecords.forEach(StreamRecordBatch::release); From dd90c31f17c0cb9ffe61ce496a62445058fc4833 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Fri, 12 Jan 2024 18:05:20 +0800 Subject: [PATCH 2/5] perf: use `for` rather than `stream` in `StreamRecordBatchList.` (0.14%) Signed-off-by: Ning Yu --- .../stream/utils/biniarysearch/StreamRecordBatchList.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java index 1a75679b9..934d5a340 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java +++ b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java @@ -18,6 +18,7 @@ package com.automq.stream.utils.biniarysearch; import com.automq.stream.s3.model.StreamRecordBatch; +import java.util.ArrayList; import java.util.List; public class StreamRecordBatchList extends AbstractOrderedCollection { @@ -26,7 +27,10 @@ public class StreamRecordBatchList extends AbstractOrderedCollection { private final int size; public StreamRecordBatchList(List records) { - this.records = records.stream().map(ComparableStreamRecordBatch::new).toList(); + this.records = new ArrayList<>(records.size()); + for (StreamRecordBatch record : records) { + this.records.add(new ComparableStreamRecordBatch(record)); + } this.size = records.size(); } From ac4b64c02d6428cc7200080dd949973e64786b5e Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Fri, 12 Jan 2024 18:07:14 +0800 Subject: [PATCH 3/5] perf: use `for` rather than `stream` in `LogCache.get0` (0.16%) Signed-off-by: Ning Yu --- .../src/main/java/com/automq/stream/s3/cache/LogCache.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java index 793d823d4..096f67eaf 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java @@ -166,7 +166,11 @@ public List get0(long streamId, long startOffset, long endOff continue; } nextStartOffset = records.get(records.size() - 1).getLastOffset(); - nextMaxBytes -= Math.min(nextMaxBytes, records.stream().mapToInt(StreamRecordBatch::size).sum()); + int recordsSize = 0; + for (StreamRecordBatch record : records) { + recordsSize += record.size(); + } + nextMaxBytes -= Math.min(nextMaxBytes, recordsSize); rst.addAll(records); if (nextStartOffset >= endOffset || nextMaxBytes == 0) { fulfill = true; From e7eee2925b5064e63e0bd5c3406f7f701d2e6164 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Fri, 12 Jan 2024 18:12:44 +0800 Subject: [PATCH 4/5] perf: use `for` rather than `stream` in `S3Stream.fetch` (~0.3%) Signed-off-by: Ning Yu --- s3stream/src/main/java/com/automq/stream/s3/S3Stream.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index 7af364066..dc16aa477 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -203,7 +203,10 @@ public CompletableFuture fetch(FetchContext context, LOGGER.error("{} stream fetch [{}, {}) {} fail", logIdent, startOffset, endOffset, maxBytes, ex); } } else if (networkOutboundLimiter != null) { - long totalSize = rs.recordBatchList().stream().mapToLong(record -> record.rawPayload().remaining()).sum(); + long totalSize = 0L; + for (RecordBatch recordBatch : rs.recordBatchList()) { + totalSize += recordBatch.rawPayload().remaining(); + } networkOutboundLimiter.forceConsume(totalSize); if (LOGGER.isDebugEnabled()) { LOGGER.debug("[S3BlockCache] fetch data, stream={}, {}-{}, total bytes: {}, cost={}ms", streamId, From a058058f00de2e49214cb804d5334e421a63bc38 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Fri, 12 Jan 2024 18:50:34 +0800 Subject: [PATCH 5/5] perf: use `Iterator` rather than `List` in `BlockBatch.futures` (0.20%) Signed-off-by: Ning Yu --- .../java/com/automq/stream/s3/S3Stream.java | 1 - .../com/automq/stream/s3/wal/BlockBatch.java | 30 +++++++++++++++---- .../com/automq/stream/utils/FutureUtil.java | 12 ++++---- 3 files changed, 31 insertions(+), 12 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index dc16aa477..8600fb597 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -56,7 +56,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockBatch.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockBatch.java index a7359a978..ef28ebf86 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockBatch.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockBatch.java @@ -19,7 +19,7 @@ import java.util.Collection; import java.util.Collections; -import java.util.List; +import java.util.Iterator; import java.util.concurrent.CompletableFuture; public class BlockBatch { @@ -53,12 +53,30 @@ public Collection blocks() { return Collections.unmodifiableCollection(blocks); } - public List> futures() { - return blocks.stream() - .map(Block::futures) - .flatMap(List::stream) - .toList(); + public Iterator> futures() { + return new Iterator<>() { + private final Iterator blockIterator = blocks.iterator(); + private Iterator> futureIterator = blockIterator.next().futures().iterator(); + @Override + public boolean hasNext() { + if (futureIterator.hasNext()) { + return true; + } else { + if (blockIterator.hasNext()) { + futureIterator = blockIterator.next().futures().iterator(); + return hasNext(); + } else { + return false; + } + } + } + + @Override + public CompletableFuture next() { + return futureIterator.next(); + } + }; } public void release() { diff --git a/s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java b/s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java index 1ee5040d0..eeb6f4e1a 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java +++ b/s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java @@ -17,7 +17,7 @@ package com.automq.stream.utils; -import java.util.Collection; +import java.util.Iterator; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -93,14 +93,16 @@ public static Throwable cause(Throwable ex) { return ex; } - public static void completeExceptionally(Collection> futures, Throwable ex) { - for (CompletableFuture future : futures) { + public static void completeExceptionally(Iterator> futures, Throwable ex) { + while (futures.hasNext()) { + CompletableFuture future = futures.next(); future.completeExceptionally(ex); } } - public static void complete(Collection> futures, T value) { - for (CompletableFuture future : futures) { + public static void complete(Iterator> futures, T value) { + while (futures.hasNext()) { + CompletableFuture future = futures.next(); future.complete(value); } }