Skip to content

Commit

Permalink
perf(s3stream/storage): remove the single main write thread (#728)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Nov 25, 2023
1 parent e7f310d commit fba58e9
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 28 deletions.
76 changes: 56 additions & 20 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;


Expand All @@ -76,8 +79,6 @@ public class S3Storage implements Storage {
private final Queue<DeltaWALUploadTaskContext> walCommitQueue = new LinkedList<>();
private final List<CompletableFuture<Void>> inflightWALUploadTasks = new CopyOnWriteArrayList<>();

private final ExecutorService mainWriteExecutor = Threads.newFixedThreadPoolWithMonitor(1,
"s3-storage-main-write", false, LOGGER);
private final ScheduledExecutorService backgroundExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3-storage-background", true), LOGGER);
private final ExecutorService uploadWALExecutor = Threads.newFixedThreadPoolWithMonitor(
Expand All @@ -91,6 +92,12 @@ public class S3Storage implements Storage {
private final ObjectManager objectManager;
private final S3Operator s3Operator;
private final S3BlockCache blockCache;
/**
* Stream callback locks. Used to ensure the stream callbacks will not be called concurrently.
*
* @see #handleAppendCallback
*/
private final Map<Long, Lock> streamCallbackLocks = new ConcurrentHashMap<>();

public S3Storage(Config config, WriteAheadLog deltaWAL, StreamManager streamManager, ObjectManager objectManager,
S3BlockCache blockCache, S3Operator s3Operator) {
Expand Down Expand Up @@ -223,7 +230,6 @@ public void shutdown() {
}
deltaWAL.shutdownGracefully();
backgroundExecutor.shutdown();
mainWriteExecutor.shutdown();
}


Expand Down Expand Up @@ -372,31 +378,40 @@ public CompletableFuture<Void> forceUpload(long streamId) {
CompletableFuture.allOf(inflightWALUploadTasks.toArray(new CompletableFuture[0])).whenCompleteAsync((nil, ex) -> {
uploadDeltaWAL(streamId);
FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.toArray(new CompletableFuture[0])), cf);
mainWriteExecutor.execute(() -> callbackSequencer.tryFree(streamId));
callbackSequencer.tryFree(streamId);
});
return cf;
}

private void handleAppendRequest(WalWriteRequest request) {
mainWriteExecutor.execute(() -> callbackSequencer.before(request));
callbackSequencer.before(request);
}

private void handleAppendCallback(WalWriteRequest request) {
mainWriteExecutor.execute(() -> handleAppendCallback0(request));
}

private void handleAppendCallback0(WalWriteRequest request) {
List<WalWriteRequest> waitingAckRequests = callbackSequencer.after(request);
waitingAckRequests.forEach(r -> r.record.retain());
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
if (deltaWALCache.put(waitingAckRequest.record)) {
// cache block is full, trigger WAL upload.
uploadDeltaWAL();
TimerUtil timer = new TimerUtil();
List<WalWriteRequest> waitingAckRequests;
Lock lock = getStreamCallbackLock(request.record.getStreamId());
lock.lock();
try {
waitingAckRequests = callbackSequencer.after(request);
waitingAckRequests.forEach(r -> r.record.retain());
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
if (deltaWALCache.put(waitingAckRequest.record)) {
// cache block is full, trigger WAL upload.
uploadDeltaWAL();
}
}
} finally {
lock.unlock();
}
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
waitingAckRequest.cf.complete(null);
}
OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_APPEND_CALLBACK).update(timer.elapsedAs(TimeUnit.NANOSECONDS));
}

private Lock getStreamCallbackLock(long streamId) {
return streamCallbackLocks.computeIfAbsent(streamId, id -> new ReentrantLock());
}

@SuppressWarnings("UnusedReturnValue")
Expand Down Expand Up @@ -508,21 +523,25 @@ private void freeCache(LogCache.LogCacheBlock cacheBlock) {
}

/**
* WALCallbackSequencer is modified in single thread mainExecutor.
* WALCallbackSequencer is used to sequence the unordered returned persistent data.
*/
static class WALCallbackSequencer {
public static final long NOOP_OFFSET = -1L;
private final Map<Long, Queue<WalWriteRequest>> stream2requests = new HashMap<>();
private final Map<Long, BlockingQueue<WalWriteRequest>> stream2requests = new ConcurrentHashMap<>();
private final BlockingQueue<WalWriteRequest> walRequests = new LinkedBlockingQueue<>();
private long walConfirmOffset = NOOP_OFFSET;

/**
* Add request to stream sequence queue.
* When the {@code request.record.getStreamId()} is different, concurrent calls are allowed.
* When the {@code request.record.getStreamId()} is the same, concurrent calls are not allowed. And it is
* necessary to ensure that calls are made in the order of increasing offsets.
*/
public void before(WalWriteRequest request) {
try {
walRequests.put(request);
Queue<WalWriteRequest> streamRequests = stream2requests.computeIfAbsent(request.record.getStreamId(), s -> new LinkedBlockingQueue<>());
assert streamRequests.isEmpty() || streamRequests.peek().offset < request.offset;
streamRequests.add(request);
} catch (Throwable ex) {
request.cf.completeExceptionally(ex);
Expand All @@ -531,12 +550,15 @@ public void before(WalWriteRequest request) {

/**
* Try pop sequence persisted request from stream queue and move forward wal inclusive confirm offset.
* When the {@code request.record.getStreamId()} is different, concurrent calls are allowed.
* When the {@code request.record.getStreamId()} is the same, concurrent calls are not allowed.
*
* @return popped sequence persisted request.
*/
public List<WalWriteRequest> after(WalWriteRequest request) {
request.persisted = true;
// move the WAL inclusive confirm offset.
// FIXME: requests in walRequests may not be in order.
for (; ; ) {
WalWriteRequest peek = walRequests.peek();
if (peek == null || !peek.persisted) {
Expand All @@ -548,19 +570,33 @@ public List<WalWriteRequest> after(WalWriteRequest request) {

// pop sequence success stream request.
long streamId = request.record.getStreamId();
Queue<WalWriteRequest> streamRequests = stream2requests.get(streamId);
BlockingQueue<WalWriteRequest> streamRequests = stream2requests.get(streamId);
WalWriteRequest peek = streamRequests.peek();
if (peek == null || peek.offset != request.offset) {
return Collections.emptyList();
}
List<WalWriteRequest> rst = new ArrayList<>();
rst.add(streamRequests.poll());
if (streamRequests.remove(request)) {
rst.add(request);
} else {
// Should not happen.
LOGGER.error("request was removed by other thread after it was persisted. streamId={}, offset={}",
streamId, request.offset);
assert false;
}
for (; ; ) {
peek = streamRequests.peek();
if (peek == null || !peek.persisted) {
break;
}
rst.add(streamRequests.poll());
if (streamRequests.remove(peek)) {
rst.add(peek);
} else {
// Should not happen.
LOGGER.error("request was removed by other thread after it was persisted. streamId={}, offset={}, peekOffset={}",
streamId, request.offset, peek.offset);
assert false;
}
}
return rst;
}
Expand Down
20 changes: 12 additions & 8 deletions s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public LogCache(long capacity, long cacheBlockMaxSize, int maxCacheBlockStreamCo
this(capacity, cacheBlockMaxSize, maxCacheBlockStreamCount, DEFAULT_BLOCK_FREE_LISTENER);
}

/**
* Put a record batch into the cache.
* record batched in the same stream should be put in order.
*/
public boolean put(StreamRecordBatch recordBatch) {
TimerUtil timerUtil = new TimerUtil();
tryRealFree();
Expand Down Expand Up @@ -196,7 +200,7 @@ public Optional<LogCacheBlock> archiveCurrentBlockIfContains(long streamId) {

Optional<LogCacheBlock> archiveCurrentBlockIfContains0(long streamId) {
if (streamId == MATCH_ALL_STREAMS) {
if (activeBlock.size > 0) {
if (activeBlock.size() > 0) {
return Optional.of(archiveCurrentBlock());
} else {
return Optional.empty();
Expand Down Expand Up @@ -228,7 +232,7 @@ private void tryRealFree() {
return false;
}
if (b.free) {
size.addAndGet(-b.size);
size.addAndGet(-b.size());
removed.add(b);
}
return b.free;
Expand All @@ -251,8 +255,9 @@ public int forceFree(int required) {
if (!block.free || freedBytes.get() >= required) {
return false;
}
size.addAndGet(-block.size);
freedBytes.addAndGet((int) block.size);
long blockSize = block.size();
size.addAndGet(-blockSize);
freedBytes.addAndGet((int) blockSize);
removed.add(block);
return true;
});
Expand Down Expand Up @@ -280,7 +285,7 @@ public static class LogCacheBlock {
private final long maxSize;
private final int maxStreamCount;
private final Map<Long, List<StreamRecordBatch>> map = new ConcurrentHashMap<>();
private long size = 0;
private final AtomicLong size = new AtomicLong();
private long confirmOffset;
volatile boolean free;

Expand Down Expand Up @@ -310,8 +315,7 @@ public boolean put(StreamRecordBatch recordBatch) {
return records;
});
int recordSize = recordBatch.size();
size += recordSize;
return size >= maxSize || map.size() >= maxStreamCount;
return size.addAndGet(recordSize) >= maxSize || map.size() >= maxStreamCount;
}

public List<StreamRecordBatch> get(long streamId, long startOffset, long endOffset, int maxBytes) {
Expand Down Expand Up @@ -371,7 +375,7 @@ public void confirmOffset(long confirmOffset) {
}

public long size() {
return size;
return size.get();
}

public void free() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public enum S3Operation {
/* S3 storage operations start */
APPEND_STORAGE(S3MetricsType.S3Storage, "append"),
APPEND_STORAGE_WAL(S3MetricsType.S3Storage, "append_wal"),
APPEND_STORAGE_APPEND_CALLBACK(S3MetricsType.S3Storage, "append_callback"),
APPEND_STORAGE_WAL_FULL(S3MetricsType.S3Storage, "append_wal_full"),
APPEND_STORAGE_LOG_CACHE(S3MetricsType.S3Storage, "append_log_cache"),
APPEND_STORAGE_LOG_CACHE_FULL(S3MetricsType.S3Storage, "append_log_cache_full"),
Expand Down

0 comments on commit fba58e9

Please sign in to comment.