Skip to content

Commit

Permalink
fix(s3stream): merge continuous stream data block on force split
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored and daniel-y committed Oct 10, 2023
1 parent 91ba565 commit 012d299
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ List<CompletableFuture<StreamObject>> splitWALObjects(List<S3ObjectMetadata> obj
List<CompletableFuture<StreamObject>> splitFutureList = new ArrayList<>();
for (Map.Entry<Long, List<StreamDataBlock>> entry : streamDataBlocksMap.entrySet()) {
List<StreamDataBlock> streamDataBlocks = entry.getValue();
for (StreamDataBlock streamDataBlock : streamDataBlocks) {
List<StreamDataBlock> mergedStreamDataBlocks = CompactionUtils.mergeStreamDataBlocks(streamDataBlocks);
for (StreamDataBlock streamDataBlock : mergedStreamDataBlocks) {
CompletableFuture<StreamObject> streamObjectCf = new CompletableFuture<>();
splitFutureList.add(streamObjectCf);
objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,28 @@ public static Map<Long, List<StreamDataBlock>> blockWaitObjectIndices(List<S3Obj
.filter(Objects::nonNull)
.collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
}

public static List<StreamDataBlock> mergeStreamDataBlocks(List<StreamDataBlock> streamDataBlocks) {
List<StreamDataBlock> mergedStreamDataBlocks = new ArrayList<>();
StreamDataBlock currStreamDataBlock = null;
for (StreamDataBlock streamDataBlock : streamDataBlocks) {
if (currStreamDataBlock == null) {
currStreamDataBlock = streamDataBlock;
} else {
if (currStreamDataBlock.getStreamId() == streamDataBlock.getStreamId()
&& currStreamDataBlock.getEndOffset() == streamDataBlock.getStartOffset()) {
currStreamDataBlock = new StreamDataBlock(currStreamDataBlock.getStreamId(),
currStreamDataBlock.getStartOffset(), streamDataBlock.getEndOffset(),
currStreamDataBlock.getBlockId(), currStreamDataBlock.getObjectId(),
currStreamDataBlock.getBlockStartPosition(), currStreamDataBlock.getBlockSize() + streamDataBlock.getBlockSize(),
currStreamDataBlock.getRecordCount() + streamDataBlock.getRecordCount());
} else {
mergedStreamDataBlocks.add(currStreamDataBlock);
currStreamDataBlock = streamDataBlock;
}
}
}
mergedStreamDataBlocks.add(currStreamDataBlock);
return mergedStreamDataBlocks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,25 +95,11 @@ public void testFilterS3Object() {
@Test
public void testForceSplit() {
List<S3ObjectMetadata> s3ObjectMetadata = this.objectManager.getServerObjects().join();
List<StreamDataBlock> streamDataBlocks = CompactionUtils.blockWaitObjectIndices(s3ObjectMetadata, s3Operator)
.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
compactionManager = new CompactionManager(config, objectManager, s3Operator);
List<CompletableFuture<StreamObject>> cfList = compactionManager.splitWALObjects(s3ObjectMetadata);
List<StreamObject> streamObjects = cfList.stream().map(CompletableFuture::join).collect(Collectors.toList());
List<StreamObject> streamObjects = cfList.stream().map(CompletableFuture::join).toList();

for (StreamObject streamObject : streamObjects) {
StreamDataBlock streamDataBlock = get(streamDataBlocks, streamObject);
Assertions.assertNotNull(streamDataBlock);
assertEquals(calculateObjectSize(List.of(streamDataBlock)), streamObject.getObjectSize());

DataBlockReader reader = new DataBlockReader(new S3ObjectMetadata(streamObject.getObjectId(),
streamObject.getObjectSize(), S3ObjectType.STREAM), s3Operator);
reader.parseDataBlockIndex();
List<StreamDataBlock> streamDataBlocksFromS3 = reader.getDataBlockIndex().join();
assertEquals(1, streamDataBlocksFromS3.size());
assertEquals(0, streamDataBlocksFromS3.get(0).getBlockStartPosition());
assertTrue(contains(streamDataBlocks, streamDataBlocksFromS3.get(0)));
}
Assertions.assertEquals(7, streamObjects.size());
}

private StreamDataBlock get(List<StreamDataBlock> streamDataBlocks, StreamObject streamObject) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,30 @@ public void testBuildObjectStreamRanges() {
assertEquals(40, result.get(1).getStartOffset());
assertEquals(150, result.get(1).getEndOffset());
}

@Test
public void testMergeStreamDataBlocks() {
List<StreamDataBlock> streamDataBlocks = List.of(
new StreamDataBlock(STREAM_0, 0, 15, 0, 1, 0, 20, 1),
new StreamDataBlock(STREAM_0, 15, 30, 1, 1, 20, 5, 1),
new StreamDataBlock(STREAM_2, 40, 100, 2, 1, 25, 80, 1),
new StreamDataBlock(STREAM_2, 120, 150, 3, 1, 105, 30, 1));
List<StreamDataBlock> result = CompactionUtils.mergeStreamDataBlocks(streamDataBlocks);
assertEquals(3, result.size());
assertEquals(STREAM_0, result.get(0).getStreamId());
assertEquals(0, result.get(0).getStartOffset());
assertEquals(30, result.get(0).getEndOffset());
assertEquals(0, result.get(0).getBlockStartPosition());
assertEquals(25, result.get(0).getBlockEndPosition());
assertEquals(STREAM_2, result.get(1).getStreamId());
assertEquals(40, result.get(1).getStartOffset());
assertEquals(100, result.get(1).getEndOffset());
assertEquals(25, result.get(1).getBlockStartPosition());
assertEquals(105, result.get(1).getBlockEndPosition());
assertEquals(STREAM_2, result.get(2).getStreamId());
assertEquals(120, result.get(2).getStartOffset());
assertEquals(150, result.get(2).getEndOffset());
assertEquals(105, result.get(2).getBlockStartPosition());
assertEquals(135, result.get(2).getBlockEndPosition());
}
}

0 comments on commit 012d299

Please sign in to comment.