Skip to content

Commit

Permalink
feat(s3stream): Block cache read ahead optimization (#657)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Nov 17, 2023
1 parent 3b0b582 commit 595041f
Show file tree
Hide file tree
Showing 14 changed files with 358 additions and 243 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.0.1-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.4.1-SNAPSHOT</s3stream.version>
<s3stream.version>0.4.2-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.4.1-SNAPSHOT</version>
<version>0.4.2-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
3 changes: 3 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ public List<DataBlockIndex> find(long streamId, long startOffset, long endOffset
int blockSize = blocks.getInt(rangeBlockId * 16 + 8);
int recordCount = blocks.getInt(rangeBlockId * 16 + 12);
rst.add(new DataBlockIndex(rangeBlockId, blockPosition, blockSize, recordCount));

// we consider first block as not matched because we do not know exactly how many bytes are within
// the range in first block, as a result we may read one more block than expected.
if (matched) {
nextMaxBytes -= Math.min(nextMaxBytes, blockSize);
}
Expand Down
235 changes: 108 additions & 127 deletions s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.automq.stream.s3.cache;

import com.automq.stream.s3.ObjectReader;
import com.automq.stream.utils.FutureUtil;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -27,19 +26,13 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* Accumulate inflight data block read requests to one real read request.
*/
public class DataBlockReadAccumulator {
private static final Logger LOGGER = LoggerFactory.getLogger(DataBlockReadAccumulator.class);
private final Map<Pair<String, Integer>, DataBlockRecords> inflightDataBlockReads = new ConcurrentHashMap<>();
private final Consumer<DataBlockRecords> dataBlockConsumer;

public DataBlockReadAccumulator(Consumer<DataBlockRecords> dataBlockConsumer) {
this.dataBlockConsumer = dataBlockConsumer;
}

public CompletableFuture<DataBlockRecords> readDataBlock(ObjectReader reader, ObjectReader.DataBlockIndex blockIndex) {
CompletableFuture<DataBlockRecords> cf = new CompletableFuture<>();
Expand All @@ -64,7 +57,6 @@ public CompletableFuture<DataBlockRecords> readDataBlock(ObjectReader reader, Ob
inflightDataBlockReads.remove(key, finalRecords);
}
finalRecords.complete(dataBlock, ex);
FutureUtil.suppress(() -> dataBlockConsumer.accept(finalRecords), LOGGER);
} catch (Throwable e) {
LOGGER.error("[UNEXPECTED] DataBlockRecords fail to notify listener {}", listener, e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@

public class DefaultS3BlockCache implements S3BlockCache {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultS3BlockCache.class);
public static final Integer MAX_READ_AHEAD_SIZE = 40 * 1024 * 1024; // 40MB
private final LRUCache<Long, ObjectReader> objectReaderLRU = new LRUCache<>();
private final Map<ReadingTaskKey, CompletableFuture<?>> readAheadTasks = new ConcurrentHashMap<>();
private final Map<ReadAheadTaskKey, CompletableFuture<Void>> inflightReadAheadTasks = new ConcurrentHashMap<>();
private final Semaphore readAheadLimiter = new Semaphore(16);
private final BlockCache cache;
private final ExecutorService mainExecutor;
Expand All @@ -65,14 +66,11 @@ public DefaultS3BlockCache(long cacheBytesSize, ObjectManager objectManager, S3O
LOGGER);
this.objectManager = objectManager;
this.s3Operator = s3Operator;
this.dataBlockReadAccumulator = new DataBlockReadAccumulator(dataBlockRecords -> {
List<StreamRecordBatch> records = dataBlockRecords.records();
if (!records.isEmpty()) {
long streamId = records.get(0).getStreamId();
records.forEach(StreamRecordBatch::retain);
cache.put(streamId, records);
}
});
this.dataBlockReadAccumulator = new DataBlockReadAccumulator();
}

public void shutdown() {
mainExecutor.shutdown();
}

@Override
Expand Down Expand Up @@ -107,7 +105,8 @@ public CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, l

if (awaitReadAhead) {
// expect read ahead will fill the cache with the data we need.
CompletableFuture<?> readAheadCf = readAheadTasks.get(new ReadingTaskKey(streamId, startOffset));
//TODO: optimize await read ahead logic
CompletableFuture<Void> readAheadCf = inflightReadAheadTasks.get(new ReadAheadTaskKey(streamId, startOffset));
if (readAheadCf != null) {
CompletableFuture<ReadDataBlock> readCf = new CompletableFuture<>();
readAheadCf.whenComplete((nil, ex) -> FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes, false), readCf));
Expand All @@ -124,31 +123,38 @@ public CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, l
if (!cacheRecords.isEmpty()) {
nextStartOffset = cacheRecords.get(cacheRecords.size() - 1).getLastOffset();
nextMaxBytes -= Math.min(nextMaxBytes, cacheRecords.stream().mapToInt(StreamRecordBatch::size).sum());
}
cacheRst.getReadAhead().ifPresent(readAhead -> backgroundReadAhead(streamId, readAhead));
if (nextStartOffset >= endOffset || nextMaxBytes == 0) {
return CompletableFuture.completedFuture(new ReadDataBlock(cacheRecords, CacheAccessType.BLOCK_CACHE_HIT));
if (nextStartOffset >= endOffset || nextMaxBytes == 0) {
// cache hit
asyncReadAhead(streamId, cacheRst.getReadAheadRecords());
return CompletableFuture.completedFuture(new ReadDataBlock(cacheRecords, CacheAccessType.BLOCK_CACHE_HIT));
} else {
// cache partially hit
return read0(streamId, nextStartOffset, endOffset, nextMaxBytes, true).thenApply(rst -> {
List<StreamRecordBatch> records = new ArrayList<>(cacheRecords);
records.addAll(rst.getRecords());
return new ReadDataBlock(records, CacheAccessType.BLOCK_CACHE_MISS);
});
}
}

// 2. get from s3
//TODO: size of s3 read should be double of the size of round up value of read size to data block size
ReadContext context = new ReadContext(Collections.emptyList(), nextStartOffset, nextMaxBytes);
CompletableFuture<List<S3ObjectMetadata>> getObjectsCf = objectManager.getObjects(streamId, nextStartOffset, endOffset, 2);
return getObjectsCf.thenComposeAsync(objects -> {
context.objects = objects;
return readFromS3(streamId, endOffset, context)
.thenApply(s3Rst -> {
List<StreamRecordBatch> records = new ArrayList<>(cacheRst.getRecords());
records.addAll(s3Rst.getRecords());
return new ReadDataBlock(records, CacheAccessType.BLOCK_CACHE_MISS);
});
return readFromS3(streamId, endOffset, context).thenApply(rst -> {
List<StreamRecordBatch> readAheadRecords = context.readAheadRecords;
if (!readAheadRecords.isEmpty()) {
long readEndOffset = rst.getRecords().get(rst.getRecords().size() - 1).getLastOffset();
cache.put(streamId, readEndOffset, readAheadRecords);
}
return rst;
});
}, mainExecutor);
}

@Override
public void put(Map<Long, List<StreamRecordBatch>> stream2records) {
stream2records.forEach(cache::put);
}

//TODO: optimize to parallel read
private CompletableFuture<ReadDataBlock> readFromS3(long streamId, long endOffset, ReadContext context) {
CompletableFuture<Boolean /* empty objects */> getObjectsCf = CompletableFuture.completedFuture(false);
if (context.objectIndex >= context.objects.size()) {
Expand Down Expand Up @@ -205,6 +211,8 @@ private CompletableFuture<ReadDataBlock> readFromS3(long streamId, long endOffse
boolean fulfill = false;
for (CompletableFuture<DataBlockRecords> blockCf : blockCfList) {
DataBlockRecords dataBlock = blockCf.get();
dataBlock.records().forEach(StreamRecordBatch::retain);
context.readAheadRecords.addAll(dataBlock.records());
// TODO: add #getRecords to DataBlockRecords, use binary search to get the records we need.
if (!fulfill) {
for (StreamRecordBatch recordBatch : dataBlock.records()) {
Expand Down Expand Up @@ -238,34 +246,51 @@ private CompletableFuture<ReadDataBlock> readFromS3(long streamId, long endOffse
}, mainExecutor);
}

private void backgroundReadAhead(long streamId, BlockCache.ReadAhead readahead) {
mainExecutor.execute(() -> {
CompletableFuture<List<S3ObjectMetadata>> getObjectsCf = objectManager.getObjects(streamId, readahead.startOffset(), NOOP_OFFSET, 2);
getObjectsCf.thenAccept(objects -> {
if (objects.isEmpty()) {
return;
private void asyncReadAhead(long streamId, List<ReadAheadRecord> readAheadRecords) {
if (readAheadRecords.isEmpty()) {
return;
}
ReadAheadRecord lastRecord = readAheadRecords.get(readAheadRecords.size() - 1);
long nextRaOffset = lastRecord.nextRaOffset;
int currRaSizeSum = readAheadRecords.stream().mapToInt(ReadAheadRecord::currRaSize).sum();
int nextRaSize = Math.min(MAX_READ_AHEAD_SIZE, currRaSizeSum * 2);

LOGGER.debug("[S3BlockCache] async read ahead, stream={}, {}-{}, total bytes: {} ",
streamId, nextRaOffset, NOOP_OFFSET, nextRaSize);

// check if next ra hits cache
if (cache.checkRange(streamId, nextRaOffset, nextRaSize)) {
return;
}
CompletableFuture<List<S3ObjectMetadata>> getObjectsCf = objectManager.getObjects(streamId, nextRaOffset, NOOP_OFFSET, 2);
ReadAheadTaskKey taskKey = new ReadAheadTaskKey(streamId, nextRaOffset);
CompletableFuture<Void> readAheadCf = new CompletableFuture<>();
inflightReadAheadTasks.put(taskKey, readAheadCf);
getObjectsCf.thenAcceptAsync(objects -> {
if (objects.isEmpty()) {
return;
}
if (!readAheadLimiter.tryAcquire()) {
// if inflight read ahead tasks exceed limit, skip this read ahead.
return;
}

ReadContext context = new ReadContext(objects, nextRaOffset, nextRaSize);
readFromS3(streamId, NOOP_OFFSET, context).whenComplete((rst, ex) -> {
if (ex != null) {
LOGGER.error("[S3BlockCache] async read ahead fail, stream={}, {}-{}, total bytes: {} ",
streamId, nextRaOffset, NOOP_OFFSET, nextRaSize, ex);
}
if (!readAheadLimiter.tryAcquire()) {
// if inflight read ahead tasks exceed limit, skip this read ahead.
return;
readAheadLimiter.release();
rst.getRecords().forEach(StreamRecordBatch::release);
List<StreamRecordBatch> records = context.readAheadRecords;
if (!records.isEmpty()) {
cache.put(streamId, records);
}

CompletableFuture<ReadDataBlock> readAheadCf = readFromS3(streamId, NOOP_OFFSET,
new ReadContext(objects, readahead.startOffset(), readahead.size()));
ReadingTaskKey readingTaskKey = new ReadingTaskKey(streamId, readahead.startOffset());
readAheadTasks.put(readingTaskKey, readAheadCf);
readAheadCf
.whenComplete((rst, ex) -> {
readAheadLimiter.release();
if (ex != null) {
LOGGER.error("background read ahead {} fail", readahead, ex);
} else {
rst.getRecords().forEach(StreamRecordBatch::release);
}
readAheadTasks.remove(readingTaskKey, readAheadCf);
});
inflightReadAheadTasks.remove(taskKey);
readAheadCf.complete(null);
});
});
}, mainExecutor);
}

private ObjectReader getObjectReader(S3ObjectMetadata metadata) {
Expand All @@ -288,20 +313,25 @@ static class ReadContext {
int objectIndex;
ObjectReader reader;
List<StreamRecordBatch> records;
List<StreamRecordBatch> readAheadRecords;
long nextStartOffset;
int nextMaxBytes;

public ReadContext(List<S3ObjectMetadata> objects, long startOffset, int maxBytes) {
this.objects = objects;
this.records = new LinkedList<>();
this.readAheadRecords = new LinkedList<>();
this.nextStartOffset = startOffset;
this.nextMaxBytes = maxBytes;
}

}

record ReadingTaskKey(long streamId, long startOffset) {
record ReadAheadTaskKey(long streamId, long startOffset) {

}

public record ReadAheadRecord(long nextRaOffset, int currRaSize) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package com.automq.stream.s3.cache;

import com.automq.stream.s3.model.StreamRecordBatch;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
Expand All @@ -31,6 +27,4 @@
public interface S3BlockCache {

CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, long endOffset, int maxBytes);

void put(Map<Long, List<StreamRecordBatch>> stream2records);
}
43 changes: 43 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/cache/StreamCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.stream.s3.cache;

import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;

public class StreamCache {
private final NavigableMap<Long, BlockCache.CacheBlock> blocks = new TreeMap<>();

public void put(BlockCache.CacheBlock cacheBlock) {
blocks.put(cacheBlock.firstOffset, cacheBlock);
}

public BlockCache.CacheBlock remove(long startOffset) {
return blocks.remove(startOffset);
}

NavigableMap<Long, BlockCache.CacheBlock> blocks() {
return blocks;
}

public NavigableMap<Long, BlockCache.CacheBlock> tailBlocks(long startOffset) {
Map.Entry<Long, BlockCache.CacheBlock> floorEntry = blocks.floorEntry(startOffset);
return blocks.tailMap(floorEntry != null ? floorEntry.getKey() : startOffset, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ public void close() {
readS3Client.close();
}
scheduler.shutdown();
readLimiterCallbackExecutor.shutdown();
writeLimiterCallbackExecutor.shutdown();
readCallbackExecutor.shutdown();
writeCallbackExecutor.shutdown();
}

@Override
Expand Down Expand Up @@ -631,7 +635,7 @@ <T> CompletableFuture<T> acquireWritePermit(CompletableFuture<T> cf) {
}

static class MergedReadTask {
static final int MAX_MERGE_READ_SIZE = 16 * 1024 * 1024;
static final int MAX_MERGE_READ_SIZE = 32 * 1024 * 1024;
final String path;
final List<ReadTask> readTasks = new ArrayList<>();
long start;
Expand Down
Loading

0 comments on commit 595041f

Please sign in to comment.