diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionAnalyzer.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionAnalyzer.java index 85014af4e..6e5df5622 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionAnalyzer.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionAnalyzer.java @@ -21,6 +21,7 @@ import com.automq.stream.s3.compact.objects.CompactedObject; import com.automq.stream.s3.compact.objects.CompactedObjectBuilder; import com.automq.stream.s3.compact.objects.CompactionType; +import com.automq.stream.s3.compact.utils.CompactionUtils; import com.automq.stream.utils.LogContext; import java.util.ArrayList; import java.util.Collection; @@ -31,7 +32,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -87,7 +87,7 @@ public List analyze(Map> streamDataB */ List groupObjectWithLimits(Map> streamDataBlockMap, Set excludedObjectIds) { - List sortedStreamDataBlocks = sortStreamRangePositions(streamDataBlockMap); + List sortedStreamDataBlocks = CompactionUtils.sortStreamRangePositions(streamDataBlockMap); List compactedObjectBuilders = new ArrayList<>(); CompactionStats stats = null; int streamNumInStreamSet = -1; @@ -354,24 +354,6 @@ private CompactedObjectBuilder splitObject(CompactedObjectBuilder builder, return builder; } - /** - * Sort stream data blocks by stream id and {@code startOffset). - * - * @param streamDataBlocksMap stream data blocks map, key: object id, value: stream data blocks - * @return sorted stream data blocks - */ - List sortStreamRangePositions(Map> streamDataBlocksMap) { - //TODO: use merge sort - Map> sortedStreamObjectMap = new TreeMap<>(); - for (List streamDataBlocks : streamDataBlocksMap.values()) { - streamDataBlocks.forEach(e -> sortedStreamObjectMap.computeIfAbsent(e.getStreamId(), k -> new ArrayList<>()).add(e)); - } - return sortedStreamObjectMap.values().stream().flatMap(list -> { - list.sort(StreamDataBlock.STREAM_OFFSET_COMPARATOR); - return list.stream(); - }).collect(Collectors.toList()); - } - private static abstract class AbstractCompactedObjectComparator implements Comparator { protected final Map objectStatsMap; diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index 3429b3af1..a62fe0fef 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -23,6 +23,8 @@ import com.automq.stream.s3.compact.objects.CompactionType; import com.automq.stream.s3.compact.operator.DataBlockReader; import com.automq.stream.s3.compact.operator.DataBlockWriter; +import com.automq.stream.s3.compact.utils.CompactionUtils; +import com.automq.stream.s3.compact.utils.GroupByOffsetPredicate; import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.metadata.StreamMetadata; import com.automq.stream.s3.metadata.StreamOffsetRange; @@ -338,7 +340,7 @@ private Collection> splitStreamSetObject(List> groupAndSplitStreamDataBlocks(S3ObjectMetadata objectMetadata, List streamDataBlocks) { List, CompletableFuture>> groupedDataBlocks = new ArrayList<>(); - List> groupedStreamDataBlocks = CompactionUtils.groupStreamDataBlocks(streamDataBlocks); + List> groupedStreamDataBlocks = CompactionUtils.groupStreamDataBlocks(streamDataBlocks, new GroupByOffsetPredicate()); for (List group : groupedStreamDataBlocks) { groupedDataBlocks.add(new ImmutablePair<>(group, new CompletableFuture<>())); } @@ -611,7 +613,8 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List objectStreamRanges = CompactionUtils.buildObjectStreamRange(sortedStreamDataBlocks); + List objectStreamRanges = CompactionUtils.buildObjectStreamRangeFromGroup( + CompactionUtils.groupStreamDataBlocks(sortedStreamDataBlocks, new GroupByOffsetPredicate())); objectStreamRanges.forEach(request::addStreamRange); request.setObjectId(uploader.getStreamSetObjectId()); // set stream set object id to be the first object id of compacted objects diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java index 5177e8423..95ace3e7d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java @@ -22,6 +22,7 @@ import com.automq.stream.s3.compact.objects.CompactedObject; import com.automq.stream.s3.compact.objects.CompactionType; import com.automq.stream.s3.compact.operator.DataBlockWriter; +import com.automq.stream.s3.compact.utils.CompactionUtils; import com.automq.stream.s3.objects.ObjectManager; import com.automq.stream.s3.objects.StreamObject; import com.automq.stream.s3.operator.S3Operator; diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java index 9be47cb74..64beec00e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java @@ -20,6 +20,8 @@ import com.automq.stream.s3.DataBlockIndex; import com.automq.stream.s3.DirectByteBufAlloc; import com.automq.stream.s3.StreamDataBlock; +import com.automq.stream.s3.compact.utils.CompactionUtils; +import com.automq.stream.s3.compact.utils.GroupByLimitPredicate; import com.automq.stream.s3.metadata.ObjectUtils; import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.stats.CompactionStats; @@ -147,22 +149,19 @@ public long size() { } class IndexBlock { + private static final int DEFAULT_DATA_BLOCK_GROUP_SIZE_THRESHOLD = 1024 * 1024; // 1MiB private final ByteBuf buf; private final long position; public IndexBlock() { position = nextDataBlockPosition; - buf = DirectByteBufAlloc.byteBuffer(calculateIndexBlockSize(), "write_index_block"); - long nextPosition = 0; - for (StreamDataBlock block : completedBlocks) { - new DataBlockIndex(block.getStreamId(), block.getStartOffset(), (int) (block.getEndOffset() - block.getStartOffset()), - block.dataBlockIndex().recordCount(), nextPosition, block.getBlockSize()).encode(buf); - nextPosition += block.getBlockSize(); - } - } - private int calculateIndexBlockSize() { - return completedBlocks.size() * DataBlockIndex.BLOCK_INDEX_SIZE; + List dataBlockIndices = CompactionUtils.buildDataBlockIndicesFromGroup( + CompactionUtils.groupStreamDataBlocks(completedBlocks, new GroupByLimitPredicate(DEFAULT_DATA_BLOCK_GROUP_SIZE_THRESHOLD))); + buf = DirectByteBufAlloc.byteBuffer(dataBlockIndices.size() * DataBlockIndex.BLOCK_INDEX_SIZE, "write_index_block"); + for (DataBlockIndex dataBlockIndex : dataBlockIndices) { + dataBlockIndex.encode(buf); + } } public ByteBuf buffer() { diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUtils.java b/s3stream/src/main/java/com/automq/stream/s3/compact/utils/CompactionUtils.java similarity index 66% rename from s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUtils.java rename to s3stream/src/main/java/com/automq/stream/s3/compact/utils/CompactionUtils.java index 825a97421..7b633c84e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUtils.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/utils/CompactionUtils.java @@ -15,8 +15,9 @@ * limitations under the License. */ -package com.automq.stream.s3.compact; +package com.automq.stream.s3.compact.utils; +import com.automq.stream.s3.DataBlockIndex; import com.automq.stream.s3.StreamDataBlock; import com.automq.stream.s3.compact.objects.CompactedObjectBuilder; import com.automq.stream.s3.compact.operator.DataBlockReader; @@ -31,8 +32,10 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -110,26 +113,42 @@ public static Map> blockWaitObjectIndices(List> groupStreamDataBlocks(List streamDataBlocks) { + public static List sortStreamRangePositions(Map> streamDataBlocksMap) { + //TODO: use merge sort + Map> sortedStreamObjectMap = new TreeMap<>(); + for (List streamDataBlocks : streamDataBlocksMap.values()) { + streamDataBlocks.forEach(e -> sortedStreamObjectMap.computeIfAbsent(e.getStreamId(), k -> new ArrayList<>()).add(e)); + } + return sortedStreamObjectMap.values().stream().flatMap(list -> { + list.sort(StreamDataBlock.STREAM_OFFSET_COMPARATOR); + return list.stream(); + }).collect(Collectors.toList()); + } + + /** + * Group stream data blocks by certain conditions. + * + * @param streamDataBlocks stream data blocks to be grouped + * @param predicate the predicate to check whether a stream data block should be grouped with the previous one + * @return grouped stream data blocks + */ + public static List> groupStreamDataBlocks(List streamDataBlocks, + Predicate predicate) { List> groupedStreamDataBlocks = new ArrayList<>(); List currGroup = new ArrayList<>(); - StreamDataBlock currStreamDataBlock = null; for (StreamDataBlock streamDataBlock : streamDataBlocks) { - if (currGroup.isEmpty() || currStreamDataBlock == null) { + if (predicate.test(streamDataBlock)) { + currGroup.add(streamDataBlock); + } else if (!currGroup.isEmpty()) { + groupedStreamDataBlocks.add(currGroup); + currGroup = new ArrayList<>(); currGroup.add(streamDataBlock); - } else { - if (currStreamDataBlock.getStreamId() == streamDataBlock.getStreamId() - && currStreamDataBlock.getEndOffset() == streamDataBlock.getStartOffset()) { - currGroup.add(streamDataBlock); - } else { - groupedStreamDataBlocks.add(currGroup); - currGroup = new ArrayList<>(); - currGroup.add(streamDataBlock); - } } - currStreamDataBlock = streamDataBlock; } if (!currGroup.isEmpty()) { groupedStreamDataBlocks.add(currGroup); @@ -137,6 +156,45 @@ public static List> groupStreamDataBlocks(List buildObjectStreamRangeFromGroup(List> streamDataBlockGroup) { + List objectStreamRanges = new ArrayList<>(); + + for (List streamDataBlocks : streamDataBlockGroup) { + if (streamDataBlocks.isEmpty()) { + continue; + } + objectStreamRanges.add(new ObjectStreamRange( + streamDataBlocks.get(0).getStreamId(), + -1L, + streamDataBlocks.get(0).getStartOffset(), + streamDataBlocks.get(streamDataBlocks.size() - 1).getEndOffset(), + streamDataBlocks.stream().mapToInt(StreamDataBlock::getBlockSize).sum())); + } + + return objectStreamRanges; + } + + public static List buildDataBlockIndicesFromGroup(List> streamDataBlockGroup) { + List dataBlockIndices = new ArrayList<>(); + + long blockStartPosition = 0; + for (List streamDataBlocks : streamDataBlockGroup) { + if (streamDataBlocks.isEmpty()) { + continue; + } + dataBlockIndices.add(new DataBlockIndex( + streamDataBlocks.get(0).getStreamId(), + streamDataBlocks.get(0).getStartOffset(), + (int) (streamDataBlocks.get(streamDataBlocks.size() - 1).getEndOffset() - streamDataBlocks.get(0).getStartOffset()), + streamDataBlocks.stream().map(StreamDataBlock::dataBlockIndex).mapToInt(DataBlockIndex::recordCount).sum(), + blockStartPosition, + streamDataBlocks.stream().mapToInt(StreamDataBlock::getBlockSize).sum())); + blockStartPosition += streamDataBlocks.stream().mapToInt(StreamDataBlock::getBlockSize).sum(); + } + + return dataBlockIndices; + } + public static int getTotalObjectStats(CompactedObjectBuilder o, Map objectStatsMap) { int totalCompactedObjects = 0; for (Long objectId : o.uniqueObjectIds()) { diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/utils/GroupByLimitPredicate.java b/s3stream/src/main/java/com/automq/stream/s3/compact/utils/GroupByLimitPredicate.java new file mode 100644 index 000000000..a207edfd6 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/utils/GroupByLimitPredicate.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.stream.s3.compact.utils; + +import com.automq.stream.s3.StreamDataBlock; +import java.util.function.Predicate; + +public class GroupByLimitPredicate implements Predicate { + private final long blockSizeThreshold; + private long streamId = -1; + private long startOffset = 0; + private long nextStartOffset = 0; + private int blockSize = 0; + private int recordCnt = 0; + + public GroupByLimitPredicate(long blockSizeThreshold) { + this.blockSizeThreshold = blockSizeThreshold; + } + + @Override + public boolean test(StreamDataBlock block) { + boolean flag = true; + if (streamId == -1 // first block + || block.getStreamId() != streamId // iterate to next stream + || block.getStartOffset() != nextStartOffset // block start offset is not continuous for same stream (unlikely to happen) + || (long) blockSize + block.getBlockSize() >= blockSizeThreshold // group size exceeds threshold + || (long) recordCnt + block.dataBlockIndex().recordCount() > Integer.MAX_VALUE // group record count exceeds int32 + || (block.getEndOffset() - startOffset) > Integer.MAX_VALUE) { // group delta offset exceeds int32 + + if (streamId != -1) { + flag = false; + } + + streamId = block.getStreamId(); + startOffset = block.getStartOffset(); + blockSize = 0; + recordCnt = 0; + } + + nextStartOffset = block.getEndOffset(); + blockSize += block.getBlockSize(); + recordCnt += block.dataBlockIndex().recordCount(); + return flag; + } +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/utils/GroupByOffsetPredicate.java b/s3stream/src/main/java/com/automq/stream/s3/compact/utils/GroupByOffsetPredicate.java new file mode 100644 index 000000000..1713c2ccf --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/utils/GroupByOffsetPredicate.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.stream.s3.compact.utils; + +import com.automq.stream.s3.StreamDataBlock; +import java.util.function.Predicate; + +public class GroupByOffsetPredicate implements Predicate { + + private long currStreamId = -1; + private long nextStartOffset = 0; + + @Override + public boolean test(StreamDataBlock block) { + if (currStreamId == -1) { + currStreamId = block.getStreamId(); + nextStartOffset = block.getEndOffset(); + return true; + } else { + if (currStreamId == block.getStreamId() && nextStartOffset == block.getStartOffset()) { + nextStartOffset = block.getEndOffset(); + return true; + } else { + currStreamId = block.getStreamId(); + nextStartOffset = block.getEndOffset(); + return false; + } + } + } +} diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionAnalyzerTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionAnalyzerTest.java index 061c00f99..6babd057c 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionAnalyzerTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionAnalyzerTest.java @@ -21,6 +21,7 @@ import com.automq.stream.s3.compact.objects.CompactedObject; import com.automq.stream.s3.compact.objects.CompactedObjectBuilder; import com.automq.stream.s3.compact.objects.CompactionType; +import com.automq.stream.s3.compact.utils.CompactionUtils; import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.metadata.S3ObjectType; import com.automq.stream.s3.metadata.StreamMetadata; @@ -152,7 +153,7 @@ public void testSortStreamRangePositions() { CompactionAnalyzer compactionAnalyzer = new CompactionAnalyzer(CACHE_SIZE, STREAM_SPLIT_SIZE, MAX_STREAM_NUM_IN_WAL, MAX_STREAM_OBJECT_NUM); List streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join(); Map> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, s3Operator); - List sortedStreamDataBlocks = compactionAnalyzer.sortStreamRangePositions(streamDataBlocksMap); + List sortedStreamDataBlocks = CompactionUtils.sortStreamRangePositions(streamDataBlocksMap); List expectedBlocks = List.of( new StreamDataBlock(STREAM_0, 0, 15, OBJECT_0, -1, -1, 1), new StreamDataBlock(STREAM_0, 15, 20, OBJECT_1, -1, -1, 1), diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java index 83f1d1167..a5b327257 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java @@ -23,6 +23,7 @@ import com.automq.stream.s3.StreamDataBlock; import com.automq.stream.s3.TestUtils; import com.automq.stream.s3.compact.operator.DataBlockReader; +import com.automq.stream.s3.compact.utils.CompactionUtils; import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.metadata.S3ObjectType; import com.automq.stream.s3.metadata.S3StreamConstant; @@ -473,13 +474,37 @@ private boolean checkDataIntegrity(List streamMetadataList, List compactedObjectMap.get(objectId).objectSize(), S3ObjectType.STREAM_SET), s3Operator); reader.readBlocks(entry.getValue()); } - List expectedStreamDataBlocks = streamDataBlocks.values().stream().flatMap(Collection::stream).toList(); - List compactedStreamDataBlocks = compactedStreamDataBlocksMap.values().stream().flatMap(Collection::stream).toList(); - if (expectedStreamDataBlocks.size() != compactedStreamDataBlocks.size()) { - return false; - } + List expectedStreamDataBlocks = CompactionUtils.sortStreamRangePositions(streamDataBlocks); + List compactedStreamDataBlocks = CompactionUtils.sortStreamRangePositions(compactedStreamDataBlocksMap); + + int i = 0; for (StreamDataBlock compactedStreamDataBlock : compactedStreamDataBlocks) { - if (expectedStreamDataBlocks.stream().noneMatch(s -> compare(compactedStreamDataBlock, s))) { + long currStreamId = compactedStreamDataBlock.getStreamId(); + long startOffset = compactedStreamDataBlock.getStartOffset(); + if (i == expectedStreamDataBlocks.size()) { + return false; + } + List groupedStreamDataBlocks = new ArrayList<>(); + for (; i < expectedStreamDataBlocks.size(); i++) { + StreamDataBlock expectedBlock = expectedStreamDataBlocks.get(i); + + if (startOffset == compactedStreamDataBlock.getEndOffset()) { + break; + } + if (currStreamId != expectedBlock.getStreamId()) { + return false; + } + if (startOffset != expectedBlock.getStartOffset()) { + return false; + } + if (expectedBlock.getEndOffset() > compactedStreamDataBlock.getEndOffset()) { + return false; + } + startOffset = expectedBlock.getEndOffset(); + groupedStreamDataBlocks.add(expectedBlock); + } + List compactedGroupedStreamDataBlocks = mergeStreamDataBlocksForGroup(List.of(groupedStreamDataBlocks)); + if (!compare(compactedStreamDataBlock, compactedGroupedStreamDataBlocks.get(0))) { return false; } } diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java index ce368a3d5..aef549d14 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java @@ -18,11 +18,14 @@ package com.automq.stream.s3.compact; import com.automq.stream.s3.DataBlockIndex; +import com.automq.stream.s3.DirectByteBufAlloc; import com.automq.stream.s3.ObjectWriter; import com.automq.stream.s3.StreamDataBlock; import com.automq.stream.s3.TestUtils; import com.automq.stream.s3.compact.objects.CompactedObject; import com.automq.stream.s3.compact.objects.CompactedObjectBuilder; +import com.automq.stream.s3.compact.utils.CompactionUtils; +import com.automq.stream.s3.compact.utils.GroupByOffsetPredicate; import com.automq.stream.s3.memory.MemoryMetadataManager; import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.metadata.S3ObjectType; @@ -32,7 +35,10 @@ import com.automq.stream.s3.model.StreamRecordBatch; import com.automq.stream.s3.operator.MemoryS3Operator; import com.automq.stream.s3.operator.S3Operator; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -203,10 +209,35 @@ protected boolean compare(CompactedObject compactedObject1, CompactedObject comp return compare(compactedObject1.streamDataBlocks(), compactedObject2.streamDataBlocks()); } - protected long calculateObjectSize(List streamDataBlocks) { - long bodySize = streamDataBlocks.stream().mapToLong(StreamDataBlock::getBlockSize).sum(); - int indexBlockSize = DataBlockIndex.BLOCK_INDEX_SIZE * streamDataBlocks.size(); + protected long calculateObjectSize(List streamDataBlocksGroups) { + long bodySize = streamDataBlocksGroups.stream().mapToLong(StreamDataBlock::getBlockSize).sum(); + int indexBlockSize = DataBlockIndex.BLOCK_INDEX_SIZE * streamDataBlocksGroups.size(); long tailSize = ObjectWriter.Footer.FOOTER_SIZE; return bodySize + indexBlockSize + tailSize; } + + protected List mergeStreamDataBlocksForGroup(List> streamDataBlockGroups) { + List mergedStreamDataBlocks = new ArrayList<>(); + for (List streamDataBlocks : streamDataBlockGroups) { + StreamDataBlock mergedBlock = new StreamDataBlock( + streamDataBlocks.get(0).getStreamId(), + streamDataBlocks.get(0).getStartOffset(), + streamDataBlocks.get(streamDataBlocks.size() - 1).getEndOffset(), + streamDataBlocks.get(0).getObjectId(), + streamDataBlocks.get(0).getBlockStartPosition(), + streamDataBlocks.stream().mapToInt(StreamDataBlock::getBlockSize).sum(), + streamDataBlocks.stream().map(StreamDataBlock::dataBlockIndex).mapToInt(DataBlockIndex::recordCount).sum()); + mergedBlock.getDataCf().complete(mergeStreamDataBlocksData(streamDataBlocks)); + mergedStreamDataBlocks.add(mergedBlock); + } + return mergedStreamDataBlocks; + } + + private ByteBuf mergeStreamDataBlocksData(List streamDataBlocks) { + CompositeByteBuf buf = DirectByteBufAlloc.compositeByteBuffer(); + for (StreamDataBlock block : streamDataBlocks) { + buf.addComponent(true, block.getDataCf().join()); + } + return buf; + } } diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUploaderTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUploaderTest.java index 6d9f21939..1c7b729c4 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUploaderTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUploaderTest.java @@ -23,6 +23,8 @@ import com.automq.stream.s3.compact.objects.CompactedObject; import com.automq.stream.s3.compact.objects.CompactionType; import com.automq.stream.s3.compact.operator.DataBlockReader; +import com.automq.stream.s3.compact.utils.CompactionUtils; +import com.automq.stream.s3.compact.utils.GroupByOffsetPredicate; import com.automq.stream.s3.memory.MemoryMetadataManager; import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.metadata.S3ObjectType; @@ -74,19 +76,21 @@ public void testWriteWALObject() { uploader.forceUploadStreamSetObject().join(); long walObjectSize = uploader.complete(); System.out.printf("write size: %d%n", walObjectSize); - assertEquals(walObjectSize, calculateObjectSize(streamDataBlocks)); + + List group = mergeStreamDataBlocksForGroup(CompactionUtils.groupStreamDataBlocks(streamDataBlocks, new GroupByOffsetPredicate())); + assertEquals(walObjectSize, calculateObjectSize(group)); //check s3 object DataBlockReader reader = new DataBlockReader(new S3ObjectMetadata(OBJECT_0, walObjectSize, S3ObjectType.STREAM_SET), s3Operator); reader.parseDataBlockIndex(); List streamDataBlocksFromS3 = reader.getDataBlockIndex().join(); - assertEquals(streamDataBlocksFromS3.size(), streamDataBlocks.size()); + assertEquals(streamDataBlocksFromS3.size(), group.size()); reader.readBlocks(streamDataBlocksFromS3); long expectedBlockPosition = 0; - for (int i = 0; i < streamDataBlocks.size(); i++) { + for (int i = 0; i < group.size(); i++) { assertEquals(expectedBlockPosition, streamDataBlocksFromS3.get(i).getBlockStartPosition()); expectedBlockPosition += streamDataBlocksFromS3.get(i).getBlockSize(); - compare(streamDataBlocksFromS3.get(i), streamDataBlocks.get(i)); + compare(streamDataBlocksFromS3.get(i), group.get(i)); } } @@ -122,19 +126,20 @@ public void testWriteWALObject2() { List expectedDataBlocks = new ArrayList<>(streamDataBlocks1); expectedDataBlocks.addAll(streamDataBlocks2); - assertEquals(walObjectSize, calculateObjectSize(expectedDataBlocks)); + List group = mergeStreamDataBlocksForGroup(CompactionUtils.groupStreamDataBlocks(expectedDataBlocks, new GroupByOffsetPredicate())); + assertEquals(walObjectSize, calculateObjectSize(group)); //check s3 object DataBlockReader reader = new DataBlockReader(new S3ObjectMetadata(OBJECT_0, walObjectSize, S3ObjectType.STREAM_SET), s3Operator); reader.parseDataBlockIndex(); List streamDataBlocksFromS3 = reader.getDataBlockIndex().join(); - assertEquals(streamDataBlocksFromS3.size(), expectedDataBlocks.size()); + assertEquals(streamDataBlocksFromS3.size(), group.size()); reader.readBlocks(streamDataBlocksFromS3); long expectedBlockPosition = 0; - for (int i = 0; i < expectedDataBlocks.size(); i++) { + for (int i = 0; i < group.size(); i++) { assertEquals(expectedBlockPosition, streamDataBlocksFromS3.get(i).getBlockStartPosition()); expectedBlockPosition += streamDataBlocksFromS3.get(i).getBlockSize(); - compare(streamDataBlocksFromS3.get(i), expectedDataBlocks.get(i)); + compare(streamDataBlocksFromS3.get(i), group.get(i)); } } @@ -151,19 +156,20 @@ public void testWriteStreamObject() { streamDataBlock.getDataCf().complete(TestUtils.random((int) streamDataBlock.getStreamRangeSize())); } StreamObject streamObject = cf.join(); - assertEquals(streamObject.getObjectSize(), calculateObjectSize(streamDataBlocks)); + List group = mergeStreamDataBlocksForGroup(CompactionUtils.groupStreamDataBlocks(streamDataBlocks, new GroupByOffsetPredicate())); + assertEquals(streamObject.getObjectSize(), calculateObjectSize(group)); //check s3 object DataBlockReader reader = new DataBlockReader(new S3ObjectMetadata(OBJECT_0, streamObject.getObjectSize(), S3ObjectType.STREAM), s3Operator); reader.parseDataBlockIndex(); List streamDataBlocksFromS3 = reader.getDataBlockIndex().join(); - assertEquals(streamDataBlocksFromS3.size(), streamDataBlocks.size()); + assertEquals(streamDataBlocksFromS3.size(), group.size()); reader.readBlocks(streamDataBlocksFromS3); long expectedBlockPosition = 0; - for (int i = 0; i < streamDataBlocks.size(); i++) { + for (int i = 0; i < group.size(); i++) { assertEquals(expectedBlockPosition, streamDataBlocksFromS3.get(i).getBlockStartPosition()); expectedBlockPosition += streamDataBlocksFromS3.get(i).getBlockSize(); - compare(streamDataBlocksFromS3.get(i), streamDataBlocks.get(i)); + compare(streamDataBlocksFromS3.get(i), group.get(i)); } } } diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java index cf86392a1..ba81c7fc4 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java @@ -17,9 +17,13 @@ package com.automq.stream.s3.compact; +import com.automq.stream.s3.DataBlockIndex; import com.automq.stream.s3.StreamDataBlock; import com.automq.stream.s3.compact.objects.CompactedObject; import com.automq.stream.s3.compact.objects.CompactionType; +import com.automq.stream.s3.compact.utils.CompactionUtils; +import com.automq.stream.s3.compact.utils.GroupByLimitPredicate; +import com.automq.stream.s3.compact.utils.GroupByOffsetPredicate; import com.automq.stream.s3.objects.ObjectStreamRange; import java.util.List; import org.junit.jupiter.api.Assertions; @@ -33,6 +37,38 @@ @Tag("S3Unit") public class CompactionUtilTest extends CompactionTestBase { + @Test + public void testMergeStreamDataBlocks() { + List streamDataBlocks = List.of( + new StreamDataBlock(STREAM_0, 0, 15, 1, 0, 20, 1), + new StreamDataBlock(STREAM_0, 15, 30, 1, 20, 5, 1), + new StreamDataBlock(STREAM_0, 30, 100, 1, 25, 80, 1), + new StreamDataBlock(STREAM_2, 40, 100, 1, 105, 80, 1), + new StreamDataBlock(STREAM_2, 120, 150, 1, 185, 30, 1)); + List> result = CompactionUtils.groupStreamDataBlocks(streamDataBlocks, new GroupByOffsetPredicate()); + assertEquals(3, result.size()); + Assertions.assertEquals(List.of(streamDataBlocks.get(0), streamDataBlocks.get(1), streamDataBlocks.get(2)), result.get(0)); + Assertions.assertEquals(List.of(streamDataBlocks.get(3)), result.get(1)); + Assertions.assertEquals(List.of(streamDataBlocks.get(4)), result.get(2)); + } + + @Test + public void testMergeStreamDataBlocks2() { + List streamDataBlocks = List.of( + new StreamDataBlock(STREAM_0, 0, 15, 1, 0, 20, 1), + new StreamDataBlock(STREAM_0, 15, 30, 1, 20, 5, 1), + new StreamDataBlock(STREAM_0, 30, 100, 1, 25, 80, 1), + new StreamDataBlock(STREAM_2, 40, 100, 1, 105, 80, 1), + new StreamDataBlock(STREAM_2, 120, 150, 1, 185, 30, 1)); + + List> result = CompactionUtils.groupStreamDataBlocks(streamDataBlocks, new GroupByLimitPredicate(30)); + assertEquals(4, result.size()); + Assertions.assertEquals(List.of(streamDataBlocks.get(0), streamDataBlocks.get(1)), result.get(0)); + Assertions.assertEquals(List.of(streamDataBlocks.get(2)), result.get(1)); + Assertions.assertEquals(List.of(streamDataBlocks.get(3)), result.get(2)); + Assertions.assertEquals(List.of(streamDataBlocks.get(4)), result.get(3)); + } + @Test public void testBuildObjectStreamRanges() { List streamDataBlocks = List.of( @@ -41,7 +77,8 @@ public void testBuildObjectStreamRanges() { new StreamDataBlock(STREAM_2, 40, 120, 2, 25, 80, 1), new StreamDataBlock(STREAM_2, 120, 150, 3, 105, 30, 1)); CompactedObject compactedObject = new CompactedObject(CompactionType.COMPACT, streamDataBlocks); - List result = CompactionUtils.buildObjectStreamRange(compactedObject.streamDataBlocks()); + List result = CompactionUtils.buildObjectStreamRangeFromGroup( + CompactionUtils.groupStreamDataBlocks(compactedObject.streamDataBlocks(), new GroupByOffsetPredicate())); assertEquals(2, result.size()); assertEquals(STREAM_0, result.get(0).getStreamId()); assertEquals(0, result.get(0).getStartOffset()); @@ -52,17 +89,59 @@ public void testBuildObjectStreamRanges() { } @Test - public void testMergeStreamDataBlocks() { + public void testBuildDataIndices() { List streamDataBlocks = List.of( new StreamDataBlock(STREAM_0, 0, 15, 1, 0, 20, 1), - new StreamDataBlock(STREAM_0, 15, 30, 1, 20, 5, 1), - new StreamDataBlock(STREAM_0, 30, 100, 1, 25, 80, 1), - new StreamDataBlock(STREAM_2, 40, 100, 1, 105, 80, 1), - new StreamDataBlock(STREAM_2, 120, 150, 1, 185, 30, 1)); - List> result = CompactionUtils.groupStreamDataBlocks(streamDataBlocks); - assertEquals(3, result.size()); - Assertions.assertEquals(List.of(streamDataBlocks.get(0), streamDataBlocks.get(1), streamDataBlocks.get(2)), result.get(0)); - Assertions.assertEquals(List.of(streamDataBlocks.get(3)), result.get(1)); - Assertions.assertEquals(List.of(streamDataBlocks.get(4)), result.get(2)); + new StreamDataBlock(STREAM_0, 15, 30, 1, 20, 5, 2), + new StreamDataBlock(STREAM_0, 30, 100, 1, 25, 80, 3), + new StreamDataBlock(STREAM_2, 40, 100, 1, 105, 80, 4), + new StreamDataBlock(STREAM_2, 120, 150, 1, 185, 30, 5)); + CompactedObject compactedObject = new CompactedObject(CompactionType.COMPACT, streamDataBlocks); + List result = CompactionUtils.buildDataBlockIndicesFromGroup( + CompactionUtils.groupStreamDataBlocks(compactedObject.streamDataBlocks(), new GroupByLimitPredicate(30))); + + assertEquals(4, result.size()); + assertEquals(new DataBlockIndex(STREAM_0, 0, 30, 3, 0, 25), result.get(0)); + assertEquals(new DataBlockIndex(STREAM_0, 30, 70, 3, 25, 80), result.get(1)); + assertEquals(new DataBlockIndex(STREAM_2, 40, 60, 4, 105, 80), result.get(2)); + assertEquals(new DataBlockIndex(STREAM_2, 120, 30, 5, 185, 30), result.get(3)); + } + + @Test + public void testBuildDataIndices2() { + List streamDataBlocks = List.of( + new StreamDataBlock(STREAM_0, 0, 15, 1, 0, 20, 1), + new StreamDataBlock(STREAM_0, 15, 30, 1, 20, 5, 2), + new StreamDataBlock(STREAM_0, 30, (long) (Integer.MAX_VALUE) + 30, 1, 25, 80, 3), + new StreamDataBlock(STREAM_2, 40, 100, 1, 105, 80, 4), + new StreamDataBlock(STREAM_2, 120, 150, 1, 185, 30, 5)); + CompactedObject compactedObject = new CompactedObject(CompactionType.COMPACT, streamDataBlocks); + List result = CompactionUtils.buildDataBlockIndicesFromGroup( + CompactionUtils.groupStreamDataBlocks(compactedObject.streamDataBlocks(), new GroupByLimitPredicate(999))); + + assertEquals(4, result.size()); + assertEquals(new DataBlockIndex(STREAM_0, 0, 30, 3, 0, 25), result.get(0)); + assertEquals(new DataBlockIndex(STREAM_0, 30, Integer.MAX_VALUE, 3, 25, 80), result.get(1)); + assertEquals(new DataBlockIndex(STREAM_2, 40, 60, 4, 105, 80), result.get(2)); + assertEquals(new DataBlockIndex(STREAM_2, 120, 30, 5, 185, 30), result.get(3)); + } + + @Test + public void testBuildDataIndices3() { + List streamDataBlocks = List.of( + new StreamDataBlock(STREAM_0, 0, 15, 1, 0, 20, 1), + new StreamDataBlock(STREAM_0, 15, 30, 1, 20, 5, 2), + new StreamDataBlock(STREAM_0, 30, 100, 1, 25, 80, Integer.MAX_VALUE), + new StreamDataBlock(STREAM_2, 40, 100, 1, 105, 80, 4), + new StreamDataBlock(STREAM_2, 120, 150, 1, 185, 30, 5)); + CompactedObject compactedObject = new CompactedObject(CompactionType.COMPACT, streamDataBlocks); + List result = CompactionUtils.buildDataBlockIndicesFromGroup( + CompactionUtils.groupStreamDataBlocks(compactedObject.streamDataBlocks(), new GroupByLimitPredicate(999))); + + assertEquals(4, result.size()); + assertEquals(new DataBlockIndex(STREAM_0, 0, 30, 3, 0, 25), result.get(0)); + assertEquals(new DataBlockIndex(STREAM_0, 30, 70, Integer.MAX_VALUE, 25, 80), result.get(1)); + assertEquals(new DataBlockIndex(STREAM_2, 40, 60, 4, 105, 80), result.get(2)); + assertEquals(new DataBlockIndex(STREAM_2, 120, 30, 5, 185, 30), result.get(3)); } }