Skip to content

Commit

Permalink
fix: List WAL objects of the given stream, limit top N by start_offset (
Browse files Browse the repository at this point in the history
#250)

Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Oct 11, 2023
1 parent 8f5bdfa commit 88383bc
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.google.gson.Gson;
Expand Down Expand Up @@ -1742,37 +1741,53 @@ public CompletableFuture<List<S3WALObject>> listWALObjects() {
}

@Override
public CompletableFuture<List<S3WALObject>> listWALObjects(long streamId,
long startOffset,
public CompletableFuture<List<S3WALObject>> listWALObjects(long streamId, long startOffset,
long endOffset, int limit) {
CompletableFuture<List<S3WALObject>> future = new CompletableFuture<>();
try (SqlSession session = this.sessionFactory.openSession()) {
S3WalObjectMapper s3WalObjectMapper = session.getMapper(S3WalObjectMapper.class);
List<S3WalObject> s3WalObjects = s3WalObjectMapper.list(this.config.nodeId(), null);
RangeMapper rangeMapper = session.getMapper(RangeMapper.class);

List<Integer> nodes = rangeMapper.listByStreamId(streamId)
.stream()
.filter(range -> range.getEndOffset() > startOffset && range.getStartOffset() < endOffset)
.mapToInt(Range::getNodeId)
.distinct()
.boxed()
.toList();

List<S3WALObject> walObjects = s3WalObjects.stream()
.map(s3WALObject -> {
TypeToken<Map<Long, SubStream>> typeToken = new TypeToken<>() {

};
Map<Long, SubStream> subStreams = gson.fromJson(new String(s3WALObject.getSubStreams().getBytes(StandardCharsets.UTF_8)), typeToken.getType());
Map<Long, SubStream> streamsRecords = new HashMap<>();
if (!Objects.isNull(subStreams) && subStreams.containsKey(streamId)) {
SubStream subStream = subStreams.get(streamId);
if (subStream.getStartOffset() <= endOffset && subStream.getEndOffset() > startOffset) {
streamsRecords.put(streamId, subStream);
S3WalObjectMapper s3WalObjectMapper = session.getMapper(S3WalObjectMapper.class);
List<S3WALObject> s3WALObjects = new ArrayList<>();
for (int nodeId : nodes) {
List<S3WalObject> s3WalObjects = s3WalObjectMapper.list(nodeId, null);
s3WalObjects.stream()
.map(s3WalObject -> {
TypeToken<Map<Long, SubStream>> typeToken = new TypeToken<>() {
};
Map<Long, SubStream> subStreams = gson.fromJson(new String(s3WalObject.getSubStreams().getBytes(StandardCharsets.UTF_8)), typeToken.getType());
Map<Long, SubStream> streamsRecords = new HashMap<>();
if (!Objects.isNull(subStreams) && subStreams.containsKey(streamId)) {
SubStream subStream = subStreams.get(streamId);
if (subStream.getStartOffset() <= endOffset && subStream.getEndOffset() > startOffset) {
streamsRecords.put(streamId, subStream);
}
}
}
if (!streamsRecords.isEmpty()) {
return buildS3WALObject(s3WALObject, streamsRecords);
}
return null;
if (!streamsRecords.isEmpty()) {
return buildS3WALObject(s3WalObject, streamsRecords);
}
return null;
})
.filter(Objects::nonNull)
.forEach(s3WALObjects::add);
}

})
.filter(Objects::nonNull)
.limit(limit)
.collect(Collectors.toList());
future.complete(walObjects);
// Sort by start-offset of the given stream
s3WALObjects.sort((l, r) -> {
long lhs = l.getSubStreamsMap().get(streamId).getStartOffset();
long rhs = r.getSubStreamsMap().get(streamId).getStartOffset();
return Long.compare(lhs, rhs);
});

future.complete(s3WALObjects.stream().limit(limit).toList());
}
return future;
}
Expand Down Expand Up @@ -1891,7 +1906,8 @@ public CompletableFuture<Pair<List<apache.rocketmq.controller.v1.S3StreamObject>
.build())
.toList();

List<S3WALObject> walObjects = s3WalObjectMapper.list(config.nodeId(), null)
List<S3WALObject> walObjects = new ArrayList<>();
s3WalObjectMapper.list(config.nodeId(), null)
.parallelStream()
.map(s3WalObject -> {
TypeToken<Map<Long, SubStream>> typeToken = new TypeToken<>() {
Expand All @@ -1905,9 +1921,18 @@ public CompletableFuture<Pair<List<apache.rocketmq.controller.v1.S3StreamObject>
return streamsRecords.isEmpty() ? null : buildS3WALObject(s3WalObject, streamsRecords);
})
.filter(Objects::nonNull)
.limit(limit - s3StreamObjects.size())
.toList();
.limit(limit)
.forEach(walObjects::add);

if (!walObjects.isEmpty()) {
walObjects.sort((l, r) -> {
long lhs = l.getSubStreamsMap().get(streamId).getStartOffset();
long rhs = r.getSubStreamsMap().get(streamId).getStartOffset();
return Long.compare(lhs, rhs);
});
}

// TODO: apply limit in whole.
return new ImmutablePair<>(s3StreamObjects, walObjects);
}
}, asyncExecutorService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@
<if test="null != endOffset">
AND end_offset &gt; #{startOffset}
</if>
ORDER BY id
ORDER BY start_offset ASC
<if test="null != limit">
LIMIT #{limit}
</if>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,17 +598,27 @@ public void testListWALObjects_WithPrams() throws IOException, ExecutionExceptio
}
}""";
try (SqlSession session = getSessionFactory().openSession()) {
S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class);
S3WalObject s3WALObject = new S3WalObject();
s3WALObject.setObjectId(123L);
s3WALObject.setNodeId(1);
s3WALObject.setObjectSize(22L);
s3WALObject.setSequenceId(999L);
s3WALObject.setSubStreams(subStreamsJson);
RangeMapper rangeMapper = session.getMapper(RangeMapper.class);

s3WALObject.setBaseDataTimestamp(System.currentTimeMillis());
Range range = new Range();
range.setStreamId(streamId);
range.setRangeId(0);
range.setStartOffset(startOffset);
range.setEndOffset(endOffset);
range.setEpoch(1L);
range.setNodeId(1);
rangeMapper.create(range);

s3WALObjectMapper.create(s3WALObject);
S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class);
S3WalObject s3WalObject = new S3WalObject();
s3WalObject.setObjectId(123L);
s3WalObject.setNodeId(1);
s3WalObject.setObjectSize(22L);
s3WalObject.setSequenceId(999L);
s3WalObject.setSubStreams(subStreamsJson);
s3WalObject.setBaseDataTimestamp(System.currentTimeMillis());

s3WALObjectMapper.create(s3WalObject);
session.commit();
}
String expectSubStream = """
Expand Down

0 comments on commit 88383bc

Please sign in to comment.