diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index fd45e8a0b..8600fb597 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -56,7 +56,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -203,7 +202,10 @@ public CompletableFuture fetch(FetchContext context, LOGGER.error("{} stream fetch [{}, {}) {} fail", logIdent, startOffset, endOffset, maxBytes, ex); } } else if (networkOutboundLimiter != null) { - long totalSize = rs.recordBatchList().stream().mapToLong(record -> record.rawPayload().remaining()).sum(); + long totalSize = 0L; + for (RecordBatch recordBatch : rs.recordBatchList()) { + totalSize += recordBatch.rawPayload().remaining(); + } networkOutboundLimiter.forceConsume(totalSize); if (LOGGER.isDebugEnabled()) { LOGGER.debug("[S3BlockCache] fetch data, stream={}, {}-{}, total bytes: {}, cost={}ms", streamId, @@ -379,7 +381,11 @@ public DefaultFetchResult(List streamRecords, CacheAccessType boolean pooledBuf) { this.pooledRecords = streamRecords; this.pooledBuf = pooledBuf; - this.records = streamRecords.stream().map(r -> new RecordBatchWithContextWrapper(covert(r, pooledBuf), r.getBaseOffset())).collect(Collectors.toList()); + this.records = new ArrayList<>(streamRecords.size()); + for (StreamRecordBatch streamRecordBatch : streamRecords) { + RecordBatch recordBatch = covert(streamRecordBatch, pooledBuf); + records.add(new RecordBatchWithContextWrapper(recordBatch, streamRecordBatch.getBaseOffset())); + } this.cacheAccessType = cacheAccessType; if (!pooledBuf) { streamRecords.forEach(StreamRecordBatch::release); diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java index 793d823d4..096f67eaf 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java @@ -166,7 +166,11 @@ public List get0(long streamId, long startOffset, long endOff continue; } nextStartOffset = records.get(records.size() - 1).getLastOffset(); - nextMaxBytes -= Math.min(nextMaxBytes, records.stream().mapToInt(StreamRecordBatch::size).sum()); + int recordsSize = 0; + for (StreamRecordBatch record : records) { + recordsSize += record.size(); + } + nextMaxBytes -= Math.min(nextMaxBytes, recordsSize); rst.addAll(records); if (nextStartOffset >= endOffset || nextMaxBytes == 0) { fulfill = true; diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockBatch.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockBatch.java index a7359a978..ef28ebf86 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockBatch.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockBatch.java @@ -19,7 +19,7 @@ import java.util.Collection; import java.util.Collections; -import java.util.List; +import java.util.Iterator; import java.util.concurrent.CompletableFuture; public class BlockBatch { @@ -53,12 +53,30 @@ public Collection blocks() { return Collections.unmodifiableCollection(blocks); } - public List> futures() { - return blocks.stream() - .map(Block::futures) - .flatMap(List::stream) - .toList(); + public Iterator> futures() { + return new Iterator<>() { + private final Iterator blockIterator = blocks.iterator(); + private Iterator> futureIterator = blockIterator.next().futures().iterator(); + @Override + public boolean hasNext() { + if (futureIterator.hasNext()) { + return true; + } else { + if (blockIterator.hasNext()) { + futureIterator = blockIterator.next().futures().iterator(); + return hasNext(); + } else { + return false; + } + } + } + + @Override + public CompletableFuture next() { + return futureIterator.next(); + } + }; } public void release() { diff --git a/s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java b/s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java index 1ee5040d0..eeb6f4e1a 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java +++ b/s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java @@ -17,7 +17,7 @@ package com.automq.stream.utils; -import java.util.Collection; +import java.util.Iterator; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -93,14 +93,16 @@ public static Throwable cause(Throwable ex) { return ex; } - public static void completeExceptionally(Collection> futures, Throwable ex) { - for (CompletableFuture future : futures) { + public static void completeExceptionally(Iterator> futures, Throwable ex) { + while (futures.hasNext()) { + CompletableFuture future = futures.next(); future.completeExceptionally(ex); } } - public static void complete(Collection> futures, T value) { - for (CompletableFuture future : futures) { + public static void complete(Iterator> futures, T value) { + while (futures.hasNext()) { + CompletableFuture future = futures.next(); future.complete(value); } } diff --git a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java index 1a75679b9..934d5a340 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java +++ b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java @@ -18,6 +18,7 @@ package com.automq.stream.utils.biniarysearch; import com.automq.stream.s3.model.StreamRecordBatch; +import java.util.ArrayList; import java.util.List; public class StreamRecordBatchList extends AbstractOrderedCollection { @@ -26,7 +27,10 @@ public class StreamRecordBatchList extends AbstractOrderedCollection { private final int size; public StreamRecordBatchList(List records) { - this.records = records.stream().map(ComparableStreamRecordBatch::new).toList(); + this.records = new ArrayList<>(records.size()); + for (StreamRecordBatch record : records) { + this.records.add(new ComparableStreamRecordBatch(record)); + } this.size = records.size(); }