Skip to content

Commit

Permalink
feat(s3stream): 0.5.4 (#712)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Nov 24, 2023
1 parent 1634d37 commit 670b671
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.0.1-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.5.3-SNAPSHOT</s3stream.version>
<s3stream.version>0.5.4-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.5.3-SNAPSHOT</version>
<version>0.5.4-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
44 changes: 30 additions & 14 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ public class S3Storage implements Storage {
private final long maxDeltaWALCacheSize;
private final Config config;
private final WriteAheadLog deltaWAL;
/** WAL log cache */
/**
* WAL log cache
*/
private final LogCache deltaWALCache;
/**
* WAL out of order callback sequencer. Single thread mainWriteExecutor will ensure the memory safety.
Expand Down Expand Up @@ -363,16 +365,12 @@ private void continuousCheck(List<StreamRecordBatch> records) {
* Force upload stream WAL cache to S3. Use group upload to avoid generate too many S3 objects when broker shutdown.
*/
@Override
public synchronized CompletableFuture<Void> forceUpload(long streamId) {
public CompletableFuture<Void> forceUpload(long streamId) {
CompletableFuture<Void> cf = new CompletableFuture<>();
List<CompletableFuture<Void>> inflightWALUploadTasks = new ArrayList<>(this.inflightWALUploadTasks);
// await inflight stream set object upload tasks to group force upload tasks.
CompletableFuture.allOf(inflightWALUploadTasks.toArray(new CompletableFuture[0])).whenCompleteAsync((nil, ex) -> {
deltaWALCache.setConfirmOffset(callbackSequencer.getWALConfirmOffset());
Optional<LogCache.LogCacheBlock> blockOpt = deltaWALCache.archiveCurrentBlockIfContains(streamId);
if (blockOpt.isPresent()) {
blockOpt.ifPresent(this::uploadDeltaWAL);
}
uploadDeltaWAL(streamId);
FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.toArray(new CompletableFuture[0])), cf);
mainWriteExecutor.execute(() -> callbackSequencer.tryFree(streamId));
});
Expand All @@ -389,30 +387,48 @@ private void handleAppendCallback(WalWriteRequest request) {

private void handleAppendCallback0(WalWriteRequest request) {
List<WalWriteRequest> waitingAckRequests = callbackSequencer.after(request);
long walConfirmOffset = callbackSequencer.getWALConfirmOffset();
waitingAckRequests.forEach(r -> r.record.retain());
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
if (deltaWALCache.put(waitingAckRequest.record)) {
// cache block is full, trigger WAL upload.
deltaWALCache.setConfirmOffset(walConfirmOffset);
LogCache.LogCacheBlock logCacheBlock = deltaWALCache.archiveCurrentBlock();
uploadDeltaWAL(logCacheBlock);
uploadDeltaWAL();
}
}
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
waitingAckRequest.cf.complete(null);
}
}

/**
* Upload cache block to S3. The earlier cache block will have smaller objectId and commit first.
*/
@SuppressWarnings("UnusedReturnValue")
CompletableFuture<Void> uploadDeltaWAL() {
return uploadDeltaWAL(LogCache.MATCH_ALL_STREAMS);
}

CompletableFuture<Void> uploadDeltaWAL(long streamId) {
synchronized (deltaWALCache) {
deltaWALCache.setConfirmOffset(callbackSequencer.getWALConfirmOffset());
Optional<LogCache.LogCacheBlock> blockOpt = deltaWALCache.archiveCurrentBlockIfContains(streamId);
if (blockOpt.isPresent()) {
LogCache.LogCacheBlock logCacheBlock = blockOpt.get();
DeltaWALUploadTaskContext context = new DeltaWALUploadTaskContext(logCacheBlock);
context.objectManager = this.objectManager;
return uploadDeltaWAL(context);
} else {
return CompletableFuture.completedFuture(null);
}
}
}

// only for test
CompletableFuture<Void> uploadDeltaWAL(LogCache.LogCacheBlock logCacheBlock) {
DeltaWALUploadTaskContext context = new DeltaWALUploadTaskContext(logCacheBlock);
context.objectManager = this.objectManager;
return uploadDeltaWAL(context);
}

/**
* Upload cache block to S3. The earlier cache block will have smaller objectId and commit first.
*/
CompletableFuture<Void> uploadDeltaWAL(DeltaWALUploadTaskContext context) {
TimerUtil timerUtil = new TimerUtil();
CompletableFuture<Void> cf = new CompletableFuture<>();
Expand Down
10 changes: 10 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,17 @@ public LogCacheBlock archiveCurrentBlock() {
}
}


public Optional<LogCacheBlock> archiveCurrentBlockIfContains(long streamId) {
writeLock.lock();
try {
return archiveCurrentBlockIfContains0(streamId);
} finally {
writeLock.unlock();
}
}

Optional<LogCacheBlock> archiveCurrentBlockIfContains0(long streamId) {
if (streamId == MATCH_ALL_STREAMS) {
if (activeBlock.size > 0) {
return Optional.of(archiveCurrentBlock());
Expand Down

0 comments on commit 670b671

Please sign in to comment.