diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 11ab16d80..966623d4e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -545,7 +545,9 @@ private void handleAppendCallback0(WalWriteRequest request) { waitingAckRequests = callbackSequencer.after(request); waitingAckRequests.forEach(r -> r.record.retain()); for (WalWriteRequest waitingAckRequest : waitingAckRequests) { - if (deltaWALCache.put(waitingAckRequest.record)) { + boolean full = deltaWALCache.put(waitingAckRequest.record); + waitingAckRequest.confirmed = true; + if (full) { // cache block is full, trigger WAL upload. uploadDeltaWAL(); } @@ -766,7 +768,7 @@ synchronized private long calculate() { } WalWriteRequest request = wrapper.request; assert request.offset != NOOP_OFFSET; - if (!request.persisted) { + if (!request.confirmed) { minUnconfirmedOffset = Math.min(minUnconfirmedOffset, request.offset); } } @@ -783,7 +785,7 @@ synchronized private long calculate() { break; } WalWriteRequest request = wrapper.request; - if (request.persisted && request.offset < minUnconfirmedOffset) { + if (request.confirmed && request.offset < minUnconfirmedOffset) { confirmedOffset = Math.max(confirmedOffset, request.offset); iterator.remove(); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/WalWriteRequest.java b/s3stream/src/main/java/com/automq/stream/s3/WalWriteRequest.java index 01919c66f..c843e095f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/WalWriteRequest.java +++ b/s3stream/src/main/java/com/automq/stream/s3/WalWriteRequest.java @@ -11,8 +11,10 @@ package com.automq.stream.s3; +import com.automq.stream.s3.cache.LogCache; import com.automq.stream.s3.context.AppendContext; import com.automq.stream.s3.model.StreamRecordBatch; +import com.automq.stream.s3.wal.WriteAheadLog; import java.util.concurrent.CompletableFuture; public class WalWriteRequest implements Comparable { @@ -20,8 +22,22 @@ public class WalWriteRequest implements Comparable { final AppendContext context; final CompletableFuture cf; long offset; + /** + * Whether the record has been persisted to the {@link WriteAheadLog} + * When a continuous series of records IN A STREAM have been persisted to the WAL, they can be uploaded to S3. + * + * @see S3Storage.WALCallbackSequencer + */ boolean persisted; + /** + * Whether the record has been put to the {@link LogCache} + * When a continuous series of records have been persisted to the WAL and uploaded to S3, they can be trimmed. + * + * @see S3Storage.WALConfirmOffsetCalculator + */ + boolean confirmed; + public WalWriteRequest(StreamRecordBatch record, long offset, CompletableFuture cf) { this(record, offset, cf, AppendContext.DEFAULT); } @@ -44,6 +60,7 @@ public String toString() { "record=" + record + ", offset=" + offset + ", persisted=" + persisted + + ", confirmed=" + confirmed + '}'; } } diff --git a/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java b/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java index 3413b65c9..f7b8f4a8f 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java @@ -135,19 +135,19 @@ public void testWALConfirmOffsetCalculator() { calc.update(); assertEquals(-1L, calc.get()); - r0.persisted = true; + r0.confirmed = true; calc.update(); assertEquals(0L, calc.get()); - r3.persisted = true; + r3.confirmed = true; calc.update(); assertEquals(0L, calc.get()); - r1.persisted = true; + r1.confirmed = true; calc.update(); assertEquals(1L, calc.get()); - r2.persisted = true; + r2.confirmed = true; calc.update(); assertEquals(3L, calc.get()); }