Skip to content

Commit

Permalink
fix(s3stream/storage): calculate the confirm offset from unordered re…
Browse files Browse the repository at this point in the history
…quests

Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 committed Nov 27, 2023
1 parent 2b3fb24 commit 8c00377
Showing 1 changed file with 134 additions and 16 deletions.
150 changes: 134 additions & 16 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,23 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static com.automq.stream.s3.S3Storage.WALCallbackSequencer.NOOP_OFFSET;


public class S3Storage implements Storage {
private static final Logger LOGGER = LoggerFactory.getLogger(S3Storage.class);
Expand All @@ -76,9 +82,10 @@ public class S3Storage implements Storage {
*/
private final LogCache deltaWALCache;
/**
* WAL out of order callback sequencer. Single thread mainWriteExecutor will ensure the memory safety.
* WAL out of order callback sequencer. {@link #streamCallbackLocks} will ensure the memory safety.
*/
private final WALCallbackSequencer callbackSequencer = new WALCallbackSequencer();
private final WALConfirmOffsetCalculator confirmOffsetCalculator = new WALConfirmOffsetCalculator();
private final Queue<DeltaWALUploadTaskContext> walPrepareQueue = new LinkedList<>();
private final Queue<DeltaWALUploadTaskContext> walCommitQueue = new LinkedList<>();
private final List<CompletableFuture<Void>> inflightWALUploadTasks = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -277,23 +284,31 @@ public boolean append0(WalWriteRequest request, boolean fromBackoff) {
return true;
}
WriteAheadLog.AppendResult appendResult;
Lock lock = confirmOffsetCalculator.addLock();
lock.lock();
try {
StreamRecordBatch streamRecord = request.record;
streamRecord.retain();
appendResult = deltaWAL.append(streamRecord.encoded());
} catch (WriteAheadLog.OverCapacityException e) {
// the WAL write data align with block, 'WAL is full but LogCacheBlock is not full' may happen.
forceUpload(LogCache.MATCH_ALL_STREAMS);
if (!fromBackoff) {
backoffRecords.offer(request);
}
if (System.currentTimeMillis() - lastLogTimestamp > 1000L) {
LOGGER.warn("[BACKOFF] log over capacity", e);
lastLogTimestamp = System.currentTimeMillis();
try {
StreamRecordBatch streamRecord = request.record;
streamRecord.retain();
appendResult = deltaWAL.append(streamRecord.encoded());
} catch (WriteAheadLog.OverCapacityException e) {
// the WAL write data align with block, 'WAL is full but LogCacheBlock is not full' may happen.
confirmOffsetCalculator.update();
forceUpload(LogCache.MATCH_ALL_STREAMS);
if (!fromBackoff) {
backoffRecords.offer(request);
}
if (System.currentTimeMillis() - lastLogTimestamp > 1000L) {
LOGGER.warn("[BACKOFF] log over capacity", e);
lastLogTimestamp = System.currentTimeMillis();
}
return true;
}
return true;
request.offset = appendResult.recordOffset();
confirmOffsetCalculator.add(request);
} finally {
lock.unlock();
}
request.offset = appendResult.recordOffset();
appendResult.future().thenAccept(nil -> handleAppendCallback(request));
return false;
}
Expand Down Expand Up @@ -433,7 +448,7 @@ CompletableFuture<Void> uploadDeltaWAL() {

CompletableFuture<Void> uploadDeltaWAL(long streamId) {
synchronized (deltaWALCache) {
deltaWALCache.setConfirmOffset(callbackSequencer.getWALConfirmOffset());
deltaWALCache.setConfirmOffset(confirmOffsetCalculator.get());
Optional<LogCache.LogCacheBlock> blockOpt = deltaWALCache.archiveCurrentBlockIfContains(streamId);
if (blockOpt.isPresent()) {
LogCache.LogCacheBlock logCacheBlock = blockOpt.get();
Expand Down Expand Up @@ -534,12 +549,115 @@ private void freeCache(LogCache.LogCacheBlock cacheBlock) {
deltaWALCache.markFree(cacheBlock);
}

static class WALConfirmOffsetCalculator {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Queue<WalWriteRequestWrapper> queue = new ConcurrentLinkedQueue<>();
private final AtomicLong confirmOffset = new AtomicLong(NOOP_OFFSET);

public WALConfirmOffsetCalculator() {
Threads.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("wal-callback-sequencer-update-confirm-offset", true), LOGGER)
.scheduleAtFixedRate(this::update, 100, 100, TimeUnit.MILLISECONDS);
}

public Lock addLock() {
return rwLock.readLock();
}

public void add(WalWriteRequest request) {
queue.add(new WalWriteRequestWrapper(request));
}

/**
* Return the offset before and including which all records have been persisted.
* Note: The returned offset may be stale.
*/
public Long get() {
return confirmOffset.get();
}

public void update() {
long offset = calculate();
if (offset != NOOP_OFFSET) {
confirmOffset.set(offset);
}
}

/**
* Calculate the offset before and including which all records have been persisted.
* It returns {@link WALCallbackSequencer#NOOP_OFFSET} if the first record is not persisted yet.
*/
synchronized private long calculate() {
Lock lock = rwLock.writeLock();
try {
// Insert a flag.
queue.add(new WalWriteRequestWrapper(null));
} finally {
lock.unlock();
}

Long minUnconfirmedOffset = null;
long maxPersistedOffset = NOOP_OFFSET;
boolean reachFlag = false;
for (WalWriteRequestWrapper wrapper : queue) {
// Iterate the queue to find the min unconfirmed offset.
if (wrapper.request == null) {
// Reach the flag.
reachFlag = true;
break;
}
WalWriteRequest request = wrapper.request;
assert request.offset != NOOP_OFFSET;
if (request.persisted) {
maxPersistedOffset = Math.max(maxPersistedOffset, request.offset);
} else {
minUnconfirmedOffset = minUnconfirmedOffset == null ? request.offset : Math.min(minUnconfirmedOffset, request.offset);
}
}
assert reachFlag;

if (null == minUnconfirmedOffset) {
// All offsets are confirmed.
return maxPersistedOffset;
}

Long confirmedOffset = null;
while (true) {
// Iterate and poll the queue to find the max offset less than minUnconfirmedOffset.
WalWriteRequestWrapper wrapper = queue.poll();
assert wrapper != null;
if (wrapper.request == null) {
// Reach the flag.
break;
}
WalWriteRequest request = wrapper.request;
if (request.persisted && request.offset < minUnconfirmedOffset) {
confirmedOffset = confirmedOffset == null ? request.offset : Math.max(confirmedOffset, request.offset);
}
}

if (confirmedOffset == null) {
// The first record has not been persisted yet.
return NOOP_OFFSET;
} else {
return confirmedOffset;
}
}

/**
* Wrapper of {@link WalWriteRequest}.
* When the {@code request} is null, it is used as a flag.
*/
record WalWriteRequestWrapper(WalWriteRequest request) {
}
}

/**
* WALCallbackSequencer is used to sequence the unordered returned persistent data.
*/
static class WALCallbackSequencer {
public static final long NOOP_OFFSET = -1L;
private final Map<Long, BlockingQueue<WalWriteRequest>> stream2requests = new ConcurrentHashMap<>();
@Deprecated
private final BlockingQueue<WalWriteRequest> walRequests = new LinkedBlockingQueue<>();
private long walConfirmOffset = NOOP_OFFSET;

Expand Down

0 comments on commit 8c00377

Please sign in to comment.