diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index 2da713784..2b0073a49 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -217,7 +217,8 @@ List> splitWALObjects(List obj List> splitFutureList = new ArrayList<>(); for (Map.Entry> entry : streamDataBlocksMap.entrySet()) { List streamDataBlocks = entry.getValue(); - for (StreamDataBlock streamDataBlock : streamDataBlocks) { + List mergedStreamDataBlocks = CompactionUtils.mergeStreamDataBlocks(streamDataBlocks); + for (StreamDataBlock streamDataBlock : mergedStreamDataBlocks) { CompletableFuture streamObjectCf = new CompletableFuture<>(); splitFutureList.add(streamObjectCf); objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)) diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUtils.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUtils.java index fb0c45abc..b2272b420 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUtils.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUtils.java @@ -84,4 +84,28 @@ public static Map> blockWaitObjectIndices(List mergeStreamDataBlocks(List streamDataBlocks) { + List mergedStreamDataBlocks = new ArrayList<>(); + StreamDataBlock currStreamDataBlock = null; + for (StreamDataBlock streamDataBlock : streamDataBlocks) { + if (currStreamDataBlock == null) { + currStreamDataBlock = streamDataBlock; + } else { + if (currStreamDataBlock.getStreamId() == streamDataBlock.getStreamId() + && currStreamDataBlock.getEndOffset() == streamDataBlock.getStartOffset()) { + currStreamDataBlock = new StreamDataBlock(currStreamDataBlock.getStreamId(), + currStreamDataBlock.getStartOffset(), streamDataBlock.getEndOffset(), + currStreamDataBlock.getBlockId(), currStreamDataBlock.getObjectId(), + currStreamDataBlock.getBlockStartPosition(), currStreamDataBlock.getBlockSize() + streamDataBlock.getBlockSize(), + currStreamDataBlock.getRecordCount() + streamDataBlock.getRecordCount()); + } else { + mergedStreamDataBlocks.add(currStreamDataBlock); + currStreamDataBlock = streamDataBlock; + } + } + } + mergedStreamDataBlocks.add(currStreamDataBlock); + return mergedStreamDataBlocks; + } } diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java index cb2ef20c3..a543a4325 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java @@ -95,25 +95,11 @@ public void testFilterS3Object() { @Test public void testForceSplit() { List s3ObjectMetadata = this.objectManager.getServerObjects().join(); - List streamDataBlocks = CompactionUtils.blockWaitObjectIndices(s3ObjectMetadata, s3Operator) - .values().stream().flatMap(Collection::stream).collect(Collectors.toList()); compactionManager = new CompactionManager(config, objectManager, s3Operator); List> cfList = compactionManager.splitWALObjects(s3ObjectMetadata); - List streamObjects = cfList.stream().map(CompletableFuture::join).collect(Collectors.toList()); + List streamObjects = cfList.stream().map(CompletableFuture::join).toList(); - for (StreamObject streamObject : streamObjects) { - StreamDataBlock streamDataBlock = get(streamDataBlocks, streamObject); - Assertions.assertNotNull(streamDataBlock); - assertEquals(calculateObjectSize(List.of(streamDataBlock)), streamObject.getObjectSize()); - - DataBlockReader reader = new DataBlockReader(new S3ObjectMetadata(streamObject.getObjectId(), - streamObject.getObjectSize(), S3ObjectType.STREAM), s3Operator); - reader.parseDataBlockIndex(); - List streamDataBlocksFromS3 = reader.getDataBlockIndex().join(); - assertEquals(1, streamDataBlocksFromS3.size()); - assertEquals(0, streamDataBlocksFromS3.get(0).getBlockStartPosition()); - assertTrue(contains(streamDataBlocks, streamDataBlocksFromS3.get(0))); - } + Assertions.assertEquals(7, streamObjects.size()); } private StreamDataBlock get(List streamDataBlocks, StreamObject streamObject) { diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java index 9f4d6effc..cc3e5f971 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java @@ -50,4 +50,30 @@ public void testBuildObjectStreamRanges() { assertEquals(40, result.get(1).getStartOffset()); assertEquals(150, result.get(1).getEndOffset()); } + + @Test + public void testMergeStreamDataBlocks() { + List streamDataBlocks = List.of( + new StreamDataBlock(STREAM_0, 0, 15, 0, 1, 0, 20, 1), + new StreamDataBlock(STREAM_0, 15, 30, 1, 1, 20, 5, 1), + new StreamDataBlock(STREAM_2, 40, 100, 2, 1, 25, 80, 1), + new StreamDataBlock(STREAM_2, 120, 150, 3, 1, 105, 30, 1)); + List result = CompactionUtils.mergeStreamDataBlocks(streamDataBlocks); + assertEquals(3, result.size()); + assertEquals(STREAM_0, result.get(0).getStreamId()); + assertEquals(0, result.get(0).getStartOffset()); + assertEquals(30, result.get(0).getEndOffset()); + assertEquals(0, result.get(0).getBlockStartPosition()); + assertEquals(25, result.get(0).getBlockEndPosition()); + assertEquals(STREAM_2, result.get(1).getStreamId()); + assertEquals(40, result.get(1).getStartOffset()); + assertEquals(100, result.get(1).getEndOffset()); + assertEquals(25, result.get(1).getBlockStartPosition()); + assertEquals(105, result.get(1).getBlockEndPosition()); + assertEquals(STREAM_2, result.get(2).getStreamId()); + assertEquals(120, result.get(2).getStartOffset()); + assertEquals(150, result.get(2).getEndOffset()); + assertEquals(105, result.get(2).getBlockStartPosition()); + assertEquals(135, result.get(2).getBlockEndPosition()); + } }