Skip to content

Commit

Permalink
perf(s3stream): use for loop rather than stream in critical paths (
Browse files Browse the repository at this point in the history
…#894)

Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Jan 15, 2024
1 parent 87098f5 commit 3d31f48
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 16 deletions.
12 changes: 9 additions & 3 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -203,7 +202,10 @@ public CompletableFuture<FetchResult> fetch(FetchContext context,
LOGGER.error("{} stream fetch [{}, {}) {} fail", logIdent, startOffset, endOffset, maxBytes, ex);
}
} else if (networkOutboundLimiter != null) {
long totalSize = rs.recordBatchList().stream().mapToLong(record -> record.rawPayload().remaining()).sum();
long totalSize = 0L;
for (RecordBatch recordBatch : rs.recordBatchList()) {
totalSize += recordBatch.rawPayload().remaining();
}
networkOutboundLimiter.forceConsume(totalSize);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] fetch data, stream={}, {}-{}, total bytes: {}, cost={}ms", streamId,
Expand Down Expand Up @@ -379,7 +381,11 @@ public DefaultFetchResult(List<StreamRecordBatch> streamRecords, CacheAccessType
boolean pooledBuf) {
this.pooledRecords = streamRecords;
this.pooledBuf = pooledBuf;
this.records = streamRecords.stream().map(r -> new RecordBatchWithContextWrapper(covert(r, pooledBuf), r.getBaseOffset())).collect(Collectors.toList());
this.records = new ArrayList<>(streamRecords.size());
for (StreamRecordBatch streamRecordBatch : streamRecords) {
RecordBatch recordBatch = covert(streamRecordBatch, pooledBuf);
records.add(new RecordBatchWithContextWrapper(recordBatch, streamRecordBatch.getBaseOffset()));
}
this.cacheAccessType = cacheAccessType;
if (!pooledBuf) {
streamRecords.forEach(StreamRecordBatch::release);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ public List<StreamRecordBatch> get0(long streamId, long startOffset, long endOff
continue;
}
nextStartOffset = records.get(records.size() - 1).getLastOffset();
nextMaxBytes -= Math.min(nextMaxBytes, records.stream().mapToInt(StreamRecordBatch::size).sum());
int recordsSize = 0;
for (StreamRecordBatch record : records) {
recordsSize += record.size();
}
nextMaxBytes -= Math.min(nextMaxBytes, recordsSize);
rst.addAll(records);
if (nextStartOffset >= endOffset || nextMaxBytes == 0) {
fulfill = true;
Expand Down
30 changes: 24 additions & 6 deletions s3stream/src/main/java/com/automq/stream/s3/wal/BlockBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;

public class BlockBatch {
Expand Down Expand Up @@ -53,12 +53,30 @@ public Collection<Block> blocks() {
return Collections.unmodifiableCollection(blocks);
}

public List<CompletableFuture<WriteAheadLog.AppendResult.CallbackResult>> futures() {
return blocks.stream()
.map(Block::futures)
.flatMap(List::stream)
.toList();
public Iterator<CompletableFuture<WriteAheadLog.AppendResult.CallbackResult>> futures() {
return new Iterator<>() {
private final Iterator<Block> blockIterator = blocks.iterator();
private Iterator<CompletableFuture<WriteAheadLog.AppendResult.CallbackResult>> futureIterator = blockIterator.next().futures().iterator();

@Override
public boolean hasNext() {
if (futureIterator.hasNext()) {
return true;
} else {
if (blockIterator.hasNext()) {
futureIterator = blockIterator.next().futures().iterator();
return hasNext();
} else {
return false;
}
}
}

@Override
public CompletableFuture<WriteAheadLog.AppendResult.CallbackResult> next() {
return futureIterator.next();
}
};
}

public void release() {
Expand Down
12 changes: 7 additions & 5 deletions s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.automq.stream.utils;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -93,14 +93,16 @@ public static Throwable cause(Throwable ex) {
return ex;
}

public static <T> void completeExceptionally(Collection<CompletableFuture<T>> futures, Throwable ex) {
for (CompletableFuture<T> future : futures) {
public static <T> void completeExceptionally(Iterator<CompletableFuture<T>> futures, Throwable ex) {
while (futures.hasNext()) {
CompletableFuture<T> future = futures.next();
future.completeExceptionally(ex);
}
}

public static <T> void complete(Collection<CompletableFuture<T>> futures, T value) {
for (CompletableFuture<T> future : futures) {
public static <T> void complete(Iterator<CompletableFuture<T>> futures, T value) {
while (futures.hasNext()) {
CompletableFuture<T> future = futures.next();
future.complete(value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.automq.stream.utils.biniarysearch;

import com.automq.stream.s3.model.StreamRecordBatch;
import java.util.ArrayList;
import java.util.List;

public class StreamRecordBatchList extends AbstractOrderedCollection<Long> {
Expand All @@ -26,7 +27,10 @@ public class StreamRecordBatchList extends AbstractOrderedCollection<Long> {
private final int size;

public StreamRecordBatchList(List<StreamRecordBatch> records) {
this.records = records.stream().map(ComparableStreamRecordBatch::new).toList();
this.records = new ArrayList<>(records.size());
for (StreamRecordBatch record : records) {
this.records.add(new ComparableStreamRecordBatch(record));
}
this.size = records.size();
}

Expand Down

0 comments on commit 3d31f48

Please sign in to comment.