diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStore.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStore.java index 176928e93..de7859f7e 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStore.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStore.java @@ -102,7 +102,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import java.util.stream.IntStream; import com.google.gson.Gson; @@ -1742,37 +1741,53 @@ public CompletableFuture> listWALObjects() { } @Override - public CompletableFuture> listWALObjects(long streamId, - long startOffset, + public CompletableFuture> listWALObjects(long streamId, long startOffset, long endOffset, int limit) { CompletableFuture> future = new CompletableFuture<>(); try (SqlSession session = this.sessionFactory.openSession()) { - S3WalObjectMapper s3WalObjectMapper = session.getMapper(S3WalObjectMapper.class); - List s3WalObjects = s3WalObjectMapper.list(this.config.nodeId(), null); + RangeMapper rangeMapper = session.getMapper(RangeMapper.class); + + List nodes = rangeMapper.listByStreamId(streamId) + .stream() + .filter(range -> range.getEndOffset() > startOffset && range.getStartOffset() < endOffset) + .mapToInt(Range::getNodeId) + .distinct() + .boxed() + .toList(); - List walObjects = s3WalObjects.stream() - .map(s3WALObject -> { - TypeToken> typeToken = new TypeToken<>() { - - }; - Map subStreams = gson.fromJson(new String(s3WALObject.getSubStreams().getBytes(StandardCharsets.UTF_8)), typeToken.getType()); - Map streamsRecords = new HashMap<>(); - if (!Objects.isNull(subStreams) && subStreams.containsKey(streamId)) { - SubStream subStream = subStreams.get(streamId); - if (subStream.getStartOffset() <= endOffset && subStream.getEndOffset() > startOffset) { - streamsRecords.put(streamId, subStream); + S3WalObjectMapper s3WalObjectMapper = session.getMapper(S3WalObjectMapper.class); + List s3WALObjects = new ArrayList<>(); + for (int nodeId : nodes) { + List s3WalObjects = s3WalObjectMapper.list(nodeId, null); + s3WalObjects.stream() + .map(s3WalObject -> { + TypeToken> typeToken = new TypeToken<>() { + }; + Map subStreams = gson.fromJson(new String(s3WalObject.getSubStreams().getBytes(StandardCharsets.UTF_8)), typeToken.getType()); + Map streamsRecords = new HashMap<>(); + if (!Objects.isNull(subStreams) && subStreams.containsKey(streamId)) { + SubStream subStream = subStreams.get(streamId); + if (subStream.getStartOffset() <= endOffset && subStream.getEndOffset() > startOffset) { + streamsRecords.put(streamId, subStream); + } } - } - if (!streamsRecords.isEmpty()) { - return buildS3WALObject(s3WALObject, streamsRecords); - } - return null; + if (!streamsRecords.isEmpty()) { + return buildS3WALObject(s3WalObject, streamsRecords); + } + return null; + }) + .filter(Objects::nonNull) + .forEach(s3WALObjects::add); + } - }) - .filter(Objects::nonNull) - .limit(limit) - .collect(Collectors.toList()); - future.complete(walObjects); + // Sort by start-offset of the given stream + s3WALObjects.sort((l, r) -> { + long lhs = l.getSubStreamsMap().get(streamId).getStartOffset(); + long rhs = r.getSubStreamsMap().get(streamId).getStartOffset(); + return Long.compare(lhs, rhs); + }); + + future.complete(s3WALObjects.stream().limit(limit).toList()); } return future; } @@ -1891,7 +1906,8 @@ public CompletableFuture .build()) .toList(); - List walObjects = s3WalObjectMapper.list(config.nodeId(), null) + List walObjects = new ArrayList<>(); + s3WalObjectMapper.list(config.nodeId(), null) .parallelStream() .map(s3WalObject -> { TypeToken> typeToken = new TypeToken<>() { @@ -1905,9 +1921,18 @@ public CompletableFuture return streamsRecords.isEmpty() ? null : buildS3WALObject(s3WalObject, streamsRecords); }) .filter(Objects::nonNull) - .limit(limit - s3StreamObjects.size()) - .toList(); + .limit(limit) + .forEach(walObjects::add); + + if (!walObjects.isEmpty()) { + walObjects.sort((l, r) -> { + long lhs = l.getSubStreamsMap().get(streamId).getStartOffset(); + long rhs = r.getSubStreamsMap().get(streamId).getStartOffset(); + return Long.compare(lhs, rhs); + }); + } + // TODO: apply limit in whole. return new ImmutablePair<>(s3StreamObjects, walObjects); } }, asyncExecutorService); diff --git a/controller/src/main/resources/database/mapper/S3StreamObjectMapper.xml b/controller/src/main/resources/database/mapper/S3StreamObjectMapper.xml index c3949c51c..8879c28a2 100644 --- a/controller/src/main/resources/database/mapper/S3StreamObjectMapper.xml +++ b/controller/src/main/resources/database/mapper/S3StreamObjectMapper.xml @@ -113,7 +113,7 @@ AND end_offset > #{startOffset} - ORDER BY id + ORDER BY start_offset ASC LIMIT #{limit} diff --git a/controller/src/test/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStoreTest.java b/controller/src/test/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStoreTest.java index 06a1fd589..cd39b145e 100644 --- a/controller/src/test/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStoreTest.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStoreTest.java @@ -598,17 +598,27 @@ public void testListWALObjects_WithPrams() throws IOException, ExecutionExceptio } }"""; try (SqlSession session = getSessionFactory().openSession()) { - S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); - S3WalObject s3WALObject = new S3WalObject(); - s3WALObject.setObjectId(123L); - s3WALObject.setNodeId(1); - s3WALObject.setObjectSize(22L); - s3WALObject.setSequenceId(999L); - s3WALObject.setSubStreams(subStreamsJson); + RangeMapper rangeMapper = session.getMapper(RangeMapper.class); - s3WALObject.setBaseDataTimestamp(System.currentTimeMillis()); + Range range = new Range(); + range.setStreamId(streamId); + range.setRangeId(0); + range.setStartOffset(startOffset); + range.setEndOffset(endOffset); + range.setEpoch(1L); + range.setNodeId(1); + rangeMapper.create(range); - s3WALObjectMapper.create(s3WALObject); + S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); + S3WalObject s3WalObject = new S3WalObject(); + s3WalObject.setObjectId(123L); + s3WalObject.setNodeId(1); + s3WalObject.setObjectSize(22L); + s3WalObject.setSequenceId(999L); + s3WalObject.setSubStreams(subStreamsJson); + s3WalObject.setBaseDataTimestamp(System.currentTimeMillis()); + + s3WALObjectMapper.create(s3WalObject); session.commit(); } String expectSubStream = """