From 90d61c947ceb80c72cc237b1c4d43cbc0c45ddd9 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Thu, 8 Feb 2024 21:21:46 +0800 Subject: [PATCH] feat(s3stream): support stream epoch when commit stream object (#926) Signed-off-by: Shichao Nie --- .../src/main/java/com/automq/stream/api/Stream.java | 5 +++++ .../src/main/java/com/automq/stream/s3/S3Stream.java | 5 +++++ .../com/automq/stream/s3/StreamObjectCompactor.java | 11 +++++++---- .../stream/s3/memory/MemoryMetadataManager.java | 3 +++ .../stream/s3/objects/CompactStreamObjectRequest.java | 11 +++++++++-- .../automq/stream/s3/StreamObjectCompactorTest.java | 2 +- .../automq/stream/s3/objects/ObjectManagerTest.java | 2 +- .../com/automq/rocketmq/store/S3ObjectManager.java | 1 + .../automq/rocketmq/store/S3ObjectManagerTest.java | 2 +- .../rocketmq/store/mock/MemoryStreamClient.java | 5 +++++ 10 files changed, 38 insertions(+), 9 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/api/Stream.java b/s3stream/src/main/java/com/automq/stream/api/Stream.java index 7a6e82071..ca8d41842 100644 --- a/s3stream/src/main/java/com/automq/stream/api/Stream.java +++ b/s3stream/src/main/java/com/automq/stream/api/Stream.java @@ -26,6 +26,11 @@ public interface Stream { */ long streamId(); + /** + * Get stream epoch. + */ + long streamEpoch(); + /** * Get stream start offset. */ diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index 9b138133f..cc9d22ac9 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -109,6 +109,11 @@ public long streamId() { return this.streamId; } + @Override + public long streamEpoch() { + return this.epoch; + } + @Override public long startOffset() { return this.startOffset; diff --git a/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java b/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java index 19ac9e5c9..4f5b86221 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java +++ b/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java @@ -77,8 +77,8 @@ void compact0() throws ExecutionException, InterruptedException { continue; } long objectId = objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(60)).get(); - Optional requestOpt = new StreamObjectGroupCompactor(streamId, startOffset, - objectGroup, objectId, dataBlockGroupSizeThreshold, s3Operator).compact(); + Optional requestOpt = new StreamObjectGroupCompactor(streamId, stream.streamEpoch(), + startOffset, objectGroup, objectId, dataBlockGroupSizeThreshold, s3Operator).compact(); if (requestOpt.isPresent()) { CompactStreamObjectRequest request = requestOpt.get(); objectManager.compactStreamObject(request).get(); @@ -97,15 +97,17 @@ List> group() throws ExecutionException, InterruptedExcep static class StreamObjectGroupCompactor { private final List objectGroup; private final long streamId; + private final long streamEpoch; private final long startOffset; // compact object group to the new object private final long objectId; private final S3Operator s3Operator; private final int dataBlockGroupSizeThreshold; - public StreamObjectGroupCompactor(long streamId, long startOffset, List objectGroup, + public StreamObjectGroupCompactor(long streamId, long streamEpoch, long startOffset, List objectGroup, long objectId, int dataBlockGroupSizeThreshold, S3Operator s3Operator) { this.streamId = streamId; + this.streamEpoch = streamEpoch; this.startOffset = startOffset; this.objectGroup = objectGroup; this.objectId = objectId; @@ -177,7 +179,8 @@ public Optional compact() throws ExecutionException, objectSize += indexBlockAndFooter.readableBytes(); writer.write(indexBlockAndFooter.duplicate()); writer.close().get(); - return Optional.of(new CompactStreamObjectRequest(objectId, objectSize, streamId, compactedStartOffset, compactedEndOffset, compactedObjectIds)); + return Optional.of(new CompactStreamObjectRequest(objectId, objectSize, streamId, streamEpoch, + compactedStartOffset, compactedEndOffset, compactedObjectIds)); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java b/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java index a08831555..911ae53b9 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java @@ -136,6 +136,9 @@ public synchronized CompletableFuture compactStreamObject(CompactStreamObj long streamId = request.getStreamId(); StreamMetadata stream = streams.get(streamId); assert stream != null; + if (stream.epoch() != request.getStreamEpoch()) { + throw new IllegalArgumentException("stream " + streamId + " epoch " + stream.epoch() + " is not equal to request " + request.getStreamEpoch()); + } if (stream.endOffset() < request.getEndOffset()) { throw new IllegalArgumentException("stream " + streamId + " end offset " + stream.endOffset() + " is lesser than request " + request.getEndOffset()); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/objects/CompactStreamObjectRequest.java b/s3stream/src/main/java/com/automq/stream/s3/objects/CompactStreamObjectRequest.java index 3eec31892..508a89d3f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/objects/CompactStreamObjectRequest.java +++ b/s3stream/src/main/java/com/automq/stream/s3/objects/CompactStreamObjectRequest.java @@ -19,18 +19,20 @@ public class CompactStreamObjectRequest { private long streamId; private long startOffset; private long endOffset; + private final long streamEpoch; /** * The source objects' id of the stream object. */ private List sourceObjectIds; - public CompactStreamObjectRequest(long objectId, long objectSize, long streamId, long startOffset, long endOffset, - List sourceObjectIds) { + public CompactStreamObjectRequest(long objectId, long objectSize, long streamId, long streamEpoch, long startOffset, + long endOffset, List sourceObjectIds) { this.objectId = objectId; this.objectSize = objectSize; this.streamId = streamId; this.startOffset = startOffset; this.endOffset = endOffset; + this.streamEpoch = streamEpoch; this.sourceObjectIds = sourceObjectIds; } @@ -74,6 +76,10 @@ public void setEndOffset(long endOffset) { this.endOffset = endOffset; } + public long getStreamEpoch() { + return streamEpoch; + } + public List getSourceObjectIds() { return sourceObjectIds; } @@ -90,6 +96,7 @@ public String toString() { ", streamId=" + streamId + ", startOffset=" + startOffset + ", endOffset=" + endOffset + + ", streamEpoch=" + streamEpoch + ", sourceObjectIds=" + sourceObjectIds + '}'; } diff --git a/s3stream/src/test/java/com/automq/stream/s3/StreamObjectCompactorTest.java b/s3stream/src/test/java/com/automq/stream/s3/StreamObjectCompactorTest.java index af8504158..c9a5f2aad 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/StreamObjectCompactorTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/StreamObjectCompactorTest.java @@ -211,7 +211,7 @@ public void testCompact() throws ExecutionException, InterruptedException { public void testCompact_groupBlocks() throws ExecutionException, InterruptedException { List objects = prepareData(); - CompactStreamObjectRequest req = new StreamObjectCompactor.StreamObjectGroupCompactor(streamId, 14L, + CompactStreamObjectRequest req = new StreamObjectCompactor.StreamObjectGroupCompactor(streamId, 0L, 14L, objects.subList(0, 2), 5, 5000, s3Operator).compact().get(); // verify compact request assertEquals(5, req.getObjectId()); diff --git a/s3stream/src/test/java/com/automq/stream/s3/objects/ObjectManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/objects/ObjectManagerTest.java index 3bbb0fd67..2638602d7 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/objects/ObjectManagerTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/objects/ObjectManagerTest.java @@ -142,7 +142,7 @@ void testCommitAndCompact() { assertEquals(1, streamObjectMetadataList.size()); // Compact stream object. - objectManager.compactStreamObject(new CompactStreamObjectRequest(5, 2000, 2, 0, 20, List.of(1L, 4L))).join(); + objectManager.compactStreamObject(new CompactStreamObjectRequest(5, 2000, 2, 0L, 0, 20, List.of(1L, 4L))).join(); streamObjectMetadataList = objectManager.getStreamObjects(2, 0, 10, 100).join(); assertEquals(1, streamObjectMetadataList.size()); ranges = streamObjectMetadataList.get(0).getOffsetRanges(); diff --git a/store/src/main/java/com/automq/rocketmq/store/S3ObjectManager.java b/store/src/main/java/com/automq/rocketmq/store/S3ObjectManager.java index 06e4746d2..87e1a3f45 100644 --- a/store/src/main/java/com/automq/rocketmq/store/S3ObjectManager.java +++ b/store/src/main/java/com/automq/rocketmq/store/S3ObjectManager.java @@ -101,6 +101,7 @@ public CompletableFuture commitStreamSetObject(Co @Override public CompletableFuture compactStreamObject(CompactStreamObjectRequest request) { // Build S3StreamObject + //TODO: implement stream epoch verification S3StreamObject.Builder builder = S3StreamObject.newBuilder(); builder.setStreamId(request.getStreamId()); builder.setStartOffset(request.getStartOffset()); diff --git a/store/src/test/java/com/automq/rocketmq/store/S3ObjectManagerTest.java b/store/src/test/java/com/automq/rocketmq/store/S3ObjectManagerTest.java index 68b4e5fc3..27fb9af4e 100644 --- a/store/src/test/java/com/automq/rocketmq/store/S3ObjectManagerTest.java +++ b/store/src/test/java/com/automq/rocketmq/store/S3ObjectManagerTest.java @@ -120,7 +120,7 @@ void commitStreamSetObject() { @Test void commitStreamObject() { - CompactStreamObjectRequest request = new CompactStreamObjectRequest(1L, 1000, 2000, 100, 1000, List.of(10L)); + CompactStreamObjectRequest request = new CompactStreamObjectRequest(1L, 1000, 2000, 0L, 100, 1000, List.of(10L)); when(metadataService.compactStreamObject(any(), any())).thenReturn(CompletableFuture.completedFuture(null)); objectManager.compactStreamObject(request); diff --git a/store/src/test/java/com/automq/rocketmq/store/mock/MemoryStreamClient.java b/store/src/test/java/com/automq/rocketmq/store/mock/MemoryStreamClient.java index a6b72e5e9..eb59ba565 100644 --- a/store/src/test/java/com/automq/rocketmq/store/mock/MemoryStreamClient.java +++ b/store/src/test/java/com/automq/rocketmq/store/mock/MemoryStreamClient.java @@ -79,6 +79,11 @@ public long streamId() { return streamId; } + @Override + public long streamEpoch() { + return 0; + } + @Override public long startOffset() { return startOffset.get();