From c06d630987b70142bcca65663c08844df89bca80 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Tue, 9 Jan 2024 14:33:07 +0800 Subject: [PATCH] feat(kafka_issues642): stream object compact group small data blocks Signed-off-by: Robin Han --- .../com/automq/stream/s3/ObjectReader.java | 8 +- .../com/automq/stream/s3/ObjectWriter.java | 2 - .../stream/s3/StreamObjectCompactor.java | 65 +++++++++--- .../stream/s3/cache/DataBlockRecords.java | 6 +- .../stream/s3/DeltaWALUploadTaskTest.java | 8 +- .../automq/stream/s3/ObjectReaderTest.java | 6 +- .../stream/s3/StreamObjectCompactorTest.java | 100 +++++++++++++----- .../cache/DataBlockReadAccumulatorTest.java | 12 +-- .../stream/s3/cache/StreamReaderTest.java | 30 +++--- 9 files changed, 160 insertions(+), 77 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java index 1bf97f78d..5e9001250 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java @@ -68,9 +68,9 @@ public CompletableFuture find(long streamId, long startOffset, return basicObjectInfoCf.thenApply(basicObjectInfo -> basicObjectInfo.indexBlock().find(streamId, startOffset, endOffset, maxBytes)); } - public CompletableFuture read(DataBlockIndex block) { + public CompletableFuture read(DataBlockIndex block) { CompletableFuture rangeReadCf = s3Operator.rangeRead(objectKey, block.startPosition(), block.endPosition(), ThrottleStrategy.THROTTLE_1); - return rangeReadCf.thenApply(DataBlock::new); + return rangeReadCf.thenApply(DataBlockGroup::new); } void asyncGetBasicObjectInfo() { @@ -275,11 +275,11 @@ public IndexBlockParseException(long indexBlockPosition) { } - public static class DataBlock implements AutoCloseable { + public static class DataBlockGroup implements AutoCloseable { private final ByteBuf buf; private final int recordCount; - public DataBlock(ByteBuf buf) { + public DataBlockGroup(ByteBuf buf) { this.buf = buf.duplicate(); this.recordCount = check(buf); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java b/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java index cd91eebe3..0634238e9 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java @@ -30,8 +30,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -// TODO: memory optimization - /** * Write stream records to a single object. */ diff --git a/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java b/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java index 706687f95..0bf5e7cb3 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java +++ b/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java @@ -46,20 +46,23 @@ public class StreamObjectCompactor { */ private static final int MAX_OBJECT_GROUP_COUNT = Math.min(5000, Writer.MAX_PART_COUNT / 2); private static final Logger LOGGER = LoggerFactory.getLogger(StreamObjectCompactor.class); + public static final int DEFAULT_DATA_BLOCK_GROUP_SIZE_THRESHOLD = 1024 * 1024; // 1MiB private final Logger s3ObjectLogger; private final long maxStreamObjectSize; private final S3Stream stream; private final ObjectManager objectManager; private final S3Operator s3Operator; + private final int dataBlockGroupSizeThreshold; private StreamObjectCompactor(ObjectManager objectManager, S3Operator s3Operator, S3Stream stream, - long maxStreamObjectSize) { + long maxStreamObjectSize, int dataBlockGroupSizeThreshold) { this.objectManager = objectManager; this.s3Operator = s3Operator; this.stream = stream; this.maxStreamObjectSize = Math.min(maxStreamObjectSize, Writer.MAX_OBJECT_SIZE); String logIdent = "[StreamObjectsCompactionTask streamId=" + stream.streamId() + "] "; this.s3ObjectLogger = S3ObjectLogger.logger(logIdent); + this.dataBlockGroupSizeThreshold = dataBlockGroupSizeThreshold; } public void compact() { @@ -75,7 +78,13 @@ void compact0() throws ExecutionException, InterruptedException { long streamId = stream.streamId(); long startOffset = stream.startOffset(); for (List objectGroup : objectGroups) { - Optional requestOpt = new StreamObjectGroupCompactor(streamId, startOffset, objectGroup, objectManager, s3Operator).compact(); + // the object group is single object and there is no data block need to be removed. + if (objectGroup.size() == 1 && objectGroup.get(0).startOffset() >= startOffset) { + continue; + } + long objectId = objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(60)).get(); + Optional requestOpt = new StreamObjectGroupCompactor(streamId, startOffset, + objectGroup, objectId, dataBlockGroupSizeThreshold, s3Operator).compact(); if (requestOpt.isPresent()) { CompactStreamObjectRequest request = requestOpt.get(); objectManager.compactStreamObject(request).get(); @@ -95,24 +104,22 @@ static class StreamObjectGroupCompactor { private final List objectGroup; private final long streamId; private final long startOffset; - private final ObjectManager objectManager; + // compact object group to the new object + private final long objectId; private final S3Operator s3Operator; + private final int dataBlockGroupSizeThreshold; public StreamObjectGroupCompactor(long streamId, long startOffset, List objectGroup, - ObjectManager objectManager, S3Operator s3Operator) { + long objectId, int dataBlockGroupSizeThreshold, S3Operator s3Operator) { this.streamId = streamId; this.startOffset = startOffset; this.objectGroup = objectGroup; - this.objectManager = objectManager; + this.objectId = objectId; + this.dataBlockGroupSizeThreshold = dataBlockGroupSizeThreshold; this.s3Operator = s3Operator; } public Optional compact() throws ExecutionException, InterruptedException { - // the object group is single object and there is no data block need to be removed. - if (objectGroup.size() == 1 && objectGroup.get(0).startOffset() >= startOffset) { - return Optional.empty(); - } - long objectId = objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(60)).get(); long nextBlockPosition = 0; long objectSize = 0; long compactedStartOffset = objectGroup.get(0).startOffset(); @@ -120,6 +127,11 @@ public Optional compact() throws ExecutionException, List compactedObjectIds = new LinkedList<>(); CompositeByteBuf indexes = DirectByteBufAlloc.compositeByteBuffer(); Writer writer = s3Operator.writer(ObjectUtils.genKey(0, objectId), ThrottleStrategy.THROTTLE_2); + long groupStartOffset = -1L; + long groupStartPosition = -1L; + int groupSize = 0; + int groupRecordCount = 0; + DataBlockIndex lastIndex = null; for (S3ObjectMetadata object : objectGroup) { try (ObjectReader reader = new ObjectReader(object, s3Operator)) { ObjectReader.BasicObjectInfo basicObjectInfo = reader.basicObjectInfo().get(); @@ -133,9 +145,23 @@ public Optional compact() throws ExecutionException, compactedStartOffset = dataBlock.endOffset(); continue; } - new DataBlockIndex(streamId, dataBlock.startOffset(), dataBlock.endOffsetDelta(), - dataBlock.recordCount(), nextBlockPosition, dataBlock.size()).encode(subIndexes); + if (groupSize == 0 // the first data block + || (long) groupSize + dataBlock.size() > dataBlockGroupSizeThreshold + || (long) groupRecordCount + dataBlock.recordCount() > Integer.MAX_VALUE + || dataBlock.endOffset() - groupStartOffset > Integer.MAX_VALUE) { + if (groupSize != 0) { + new DataBlockIndex(streamId, groupStartOffset, (int) (lastIndex.endOffset() - groupStartOffset), + groupRecordCount, groupStartPosition, groupSize).encode(subIndexes); + } + groupStartOffset = dataBlock.startOffset(); + groupStartPosition = nextBlockPosition; + groupSize = 0; + groupRecordCount = 0; + } + groupSize += dataBlock.size(); + groupRecordCount += dataBlock.recordCount(); nextBlockPosition += dataBlock.size(); + lastIndex = dataBlock; } writer.copyWrite(ObjectUtils.genKey(0, object.objectId()), validDataBlockStartPosition, basicObjectInfo.dataBlockSize()); objectSize += basicObjectInfo.dataBlockSize() - validDataBlockStartPosition; @@ -143,6 +169,13 @@ public Optional compact() throws ExecutionException, compactedObjectIds.add(object.objectId()); } } + if (lastIndex != null) { + ByteBuf subIndexes = DirectByteBufAlloc.byteBuffer(DataBlockIndex.BLOCK_INDEX_SIZE); + new DataBlockIndex(streamId, groupStartOffset, (int) (lastIndex.endOffset() - groupStartOffset), + groupRecordCount, groupStartPosition, groupSize).encode(subIndexes); + indexes.addComponent(true, subIndexes); + } + CompositeByteBuf indexBlockAndFooter = DirectByteBufAlloc.compositeByteBuffer(); indexBlockAndFooter.addComponent(true, indexes); indexBlockAndFooter.addComponent(true, new ObjectWriter.Footer(nextBlockPosition, indexBlockAndFooter.readableBytes()).buffer()); @@ -205,6 +238,7 @@ public static class Builder { private S3Operator s3Operator; private S3Stream stream; private long maxStreamObjectSize; + private int dataBlockGroupSizeThreshold = DEFAULT_DATA_BLOCK_GROUP_SIZE_THRESHOLD; public Builder objectManager(ObjectManager objectManager) { this.objectManager = objectManager; @@ -234,8 +268,13 @@ public Builder maxStreamObjectSize(long maxStreamObjectSize) { return this; } + public Builder dataBlockGroupSizeThreshold(int dataBlockGroupSizeThreshold) { + this.dataBlockGroupSizeThreshold = dataBlockGroupSizeThreshold; + return this; + } + public StreamObjectCompactor build() { - return new StreamObjectCompactor(objectManager, s3Operator, stream, maxStreamObjectSize); + return new StreamObjectCompactor(objectManager, s3Operator, stream, maxStreamObjectSize, dataBlockGroupSizeThreshold); } } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/DataBlockRecords.java b/s3stream/src/main/java/com/automq/stream/s3/cache/DataBlockRecords.java index 6622c070e..5348878c8 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/DataBlockRecords.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/DataBlockRecords.java @@ -40,10 +40,10 @@ public void registerListener(BiConsumer listener) { listeners.add(listener); } - public void complete(ObjectReader.DataBlock dataBlock, Throwable ex) { + public void complete(ObjectReader.DataBlockGroup dataBlockGroup, Throwable ex) { if (ex == null) { - records = new ArrayList<>(dataBlock.recordCount()); - try (CloseableIterator it = dataBlock.iterator()) { + records = new ArrayList<>(dataBlockGroup.recordCount()); + try (CloseableIterator it = dataBlockGroup.iterator()) { while (it.hasNext()) { records.add(it.next()); } diff --git a/s3stream/src/test/java/com/automq/stream/s3/DeltaWALUploadTaskTest.java b/s3stream/src/test/java/com/automq/stream/s3/DeltaWALUploadTaskTest.java index 5adefdce2..1648d8e4a 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/DeltaWALUploadTaskTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/DeltaWALUploadTaskTest.java @@ -119,8 +119,8 @@ public void testUpload() throws Exception { ObjectReader objectReader = new ObjectReader(s3ObjectMetadata, s3Operator); DataBlockIndex blockIndex = objectReader.find(234, 20, 24).get() .streamDataBlocks().get(0).dataBlockIndex(); - ObjectReader.DataBlock dataBlock = objectReader.read(blockIndex).get(); - try (CloseableIterator it = dataBlock.iterator()) { + ObjectReader.DataBlockGroup dataBlockGroup = objectReader.read(blockIndex).get(); + try (CloseableIterator it = dataBlockGroup.iterator()) { StreamRecordBatch record = it.next(); assertEquals(20, record.getBaseOffset()); record = it.next(); @@ -134,8 +134,8 @@ record = it.next(); ObjectReader objectReader = new ObjectReader(streamObjectMetadata, s3Operator); DataBlockIndex blockIndex = objectReader.find(233, 10, 16).get() .streamDataBlocks().get(0).dataBlockIndex(); - ObjectReader.DataBlock dataBlock = objectReader.read(blockIndex).get(); - try (CloseableIterator it = dataBlock.iterator()) { + ObjectReader.DataBlockGroup dataBlockGroup = objectReader.read(blockIndex).get(); + try (CloseableIterator it = dataBlockGroup.iterator()) { StreamRecordBatch r1 = it.next(); assertEquals(10, r1.getBaseOffset()); r1.release(); diff --git a/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java b/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java index 666b8f752..e6c99978c 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java @@ -131,9 +131,9 @@ public void testReadBlockGroup() throws ExecutionException, InterruptedException try (ObjectReader reader = new ObjectReader(new S3ObjectMetadata(1L, objectSize, S3ObjectType.STREAM), s3Operator)) { ObjectReader.FindIndexResult rst = reader.find(233L, 10L, 14L, 1024).get(); assertEquals(1, rst.streamDataBlocks().size()); - try (ObjectReader.DataBlock dataBlock = reader.read(rst.streamDataBlocks().get(0).dataBlockIndex()).get()) { - assertEquals(3, dataBlock.recordCount()); - Iterator it = dataBlock.iterator(); + try (ObjectReader.DataBlockGroup dataBlockGroup = reader.read(rst.streamDataBlocks().get(0).dataBlockIndex()).get()) { + assertEquals(3, dataBlockGroup.recordCount()); + Iterator it = dataBlockGroup.iterator(); assertEquals(10, it.next().getBaseOffset()); assertEquals(11, it.next().getBaseOffset()); assertEquals(13, it.next().getBaseOffset()); diff --git a/s3stream/src/test/java/com/automq/stream/s3/StreamObjectCompactorTest.java b/s3stream/src/test/java/com/automq/stream/s3/StreamObjectCompactorTest.java index 74574193b..e4eb91c18 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/StreamObjectCompactorTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/StreamObjectCompactorTest.java @@ -62,8 +62,7 @@ void setUp() { stream = Mockito.mock(S3Stream.class); } - @Test - public void testCompact() throws ExecutionException, InterruptedException { + List prepareData() throws ExecutionException, InterruptedException { // prepare object List objects = new LinkedList<>(); { @@ -119,6 +118,12 @@ public void testCompact() throws ExecutionException, InterruptedException { objects.add(new S3ObjectMetadata(4, S3ObjectType.STREAM, List.of(new StreamOffsetRange(streamId, 31, 33)), System.currentTimeMillis(), System.currentTimeMillis(), writer.size(), 4)); } + return objects; + } + + @Test + public void testCompact() throws ExecutionException, InterruptedException { + List objects = prepareData(); when(objectManager.getStreamObjects(eq(streamId), eq(14L), eq(32L), eq(Integer.MAX_VALUE))) .thenReturn(CompletableFuture.completedFuture(objects)); AtomicLong nextObjectId = new AtomicLong(5); @@ -129,7 +134,7 @@ public void testCompact() throws ExecutionException, InterruptedException { when(stream.confirmOffset()).thenReturn(32L); StreamObjectCompactor task = StreamObjectCompactor.builder().objectManager(objectManager).s3Operator(s3Operator) - .maxStreamObjectSize(1024 * 1024 * 1024).stream(stream).build(); + .maxStreamObjectSize(1024 * 1024 * 1024).stream(stream).dataBlockGroupSizeThreshold(1).build(); task.compact(); ArgumentCaptor ac = ArgumentCaptor.forClass(CompactStreamObjectRequest.class); @@ -157,25 +162,25 @@ public void testCompact() throws ExecutionException, InterruptedException { assertEquals(3, objectReader.basicObjectInfo().get().indexBlock().count()); ObjectReader.FindIndexResult rst = objectReader.find(streamId, 13L, 18L).get(); assertEquals(3, rst.streamDataBlocks().size()); - ObjectReader.DataBlock dataBlock1 = objectReader.read(rst.streamDataBlocks().get(0).dataBlockIndex()).get(); - try (dataBlock1) { - assertEquals(3, dataBlock1.recordCount()); - Iterator it = dataBlock1.iterator(); + ObjectReader.DataBlockGroup dataBlockGroup1 = objectReader.read(rst.streamDataBlocks().get(0).dataBlockIndex()).get(); + try (dataBlockGroup1) { + assertEquals(3, dataBlockGroup1.recordCount()); + Iterator it = dataBlockGroup1.iterator(); assertEquals(13L, it.next().getBaseOffset()); assertEquals(14L, it.next().getBaseOffset()); assertEquals(15L, it.next().getBaseOffset()); assertFalse(it.hasNext()); } - ObjectReader.DataBlock dataBlock2 = objectReader.read(rst.streamDataBlocks().get(1).dataBlockIndex()).get(); - try (dataBlock2) { - assertEquals(1, dataBlock2.recordCount()); - Iterator it = dataBlock2.iterator(); + ObjectReader.DataBlockGroup dataBlockGroup2 = objectReader.read(rst.streamDataBlocks().get(1).dataBlockIndex()).get(); + try (dataBlockGroup2) { + assertEquals(1, dataBlockGroup2.recordCount()); + Iterator it = dataBlockGroup2.iterator(); assertEquals(16L, it.next().getBaseOffset()); } - ObjectReader.DataBlock dataBlock3 = objectReader.read(rst.streamDataBlocks().get(2).dataBlockIndex()).get(); - try (dataBlock3) { - assertEquals(1, dataBlock3.recordCount()); - Iterator it = dataBlock3.iterator(); + ObjectReader.DataBlockGroup dataBlockGroup3 = objectReader.read(rst.streamDataBlocks().get(2).dataBlockIndex()).get(); + try (dataBlockGroup3) { + assertEquals(1, dataBlockGroup3.recordCount()); + Iterator it = dataBlockGroup3.iterator(); assertEquals(17L, it.next().getBaseOffset()); } objectReader.close(); @@ -185,29 +190,70 @@ public void testCompact() throws ExecutionException, InterruptedException { assertEquals(3, objectReader.basicObjectInfo().get().indexBlock().count()); ObjectReader.FindIndexResult rst = objectReader.find(streamId, 30L, 33L).get(); assertEquals(3, rst.streamDataBlocks().size()); - ObjectReader.DataBlock dataBlock1 = objectReader.read(rst.streamDataBlocks().get(0).dataBlockIndex()).get(); - try (dataBlock1) { - assertEquals(1, dataBlock1.recordCount()); - Iterator it = dataBlock1.iterator(); + ObjectReader.DataBlockGroup dataBlockGroup1 = objectReader.read(rst.streamDataBlocks().get(0).dataBlockIndex()).get(); + try (dataBlockGroup1) { + assertEquals(1, dataBlockGroup1.recordCount()); + Iterator it = dataBlockGroup1.iterator(); assertEquals(30L, it.next().getBaseOffset()); assertFalse(it.hasNext()); } - ObjectReader.DataBlock dataBlock2 = objectReader.read(rst.streamDataBlocks().get(1).dataBlockIndex()).get(); - try (dataBlock2) { - assertEquals(1, dataBlock2.recordCount()); - Iterator it = dataBlock2.iterator(); + ObjectReader.DataBlockGroup dataBlockGroup2 = objectReader.read(rst.streamDataBlocks().get(1).dataBlockIndex()).get(); + try (dataBlockGroup2) { + assertEquals(1, dataBlockGroup2.recordCount()); + Iterator it = dataBlockGroup2.iterator(); assertEquals(31L, it.next().getBaseOffset()); } - ObjectReader.DataBlock dataBlock3 = objectReader.read(rst.streamDataBlocks().get(2).dataBlockIndex()).get(); - try (dataBlock3) { - assertEquals(1, dataBlock3.recordCount()); - Iterator it = dataBlock3.iterator(); + ObjectReader.DataBlockGroup dataBlockGroup3 = objectReader.read(rst.streamDataBlocks().get(2).dataBlockIndex()).get(); + try (dataBlockGroup3) { + assertEquals(1, dataBlockGroup3.recordCount()); + Iterator it = dataBlockGroup3.iterator(); assertEquals(32L, it.next().getBaseOffset()); } objectReader.close(); } } + @Test + public void testCompact_groupBlocks() throws ExecutionException, InterruptedException { + List objects = prepareData(); + + CompactStreamObjectRequest req = new StreamObjectCompactor.StreamObjectGroupCompactor(streamId, 14L, + objects.subList(0, 2), 5, 5000, s3Operator).compact().get(); + // verify compact request + assertEquals(5, req.getObjectId()); + assertEquals(233L, req.getStreamId()); + assertEquals(13L, req.getStartOffset()); + assertEquals(18L, req.getEndOffset()); + assertEquals(List.of(1L, 2L), req.getSourceObjectIds()); + + // verify compacted object record, expect [13,16) + [16, 17) compact to one data block group. + { + ObjectReader objectReader = new ObjectReader(new S3ObjectMetadata(5, req.getObjectSize(), S3ObjectType.STREAM), s3Operator); + assertEquals(2, objectReader.basicObjectInfo().get().indexBlock().count()); + ObjectReader.FindIndexResult rst = objectReader.find(streamId, 13L, 18L).get(); + assertEquals(2, rst.streamDataBlocks().size()); + ObjectReader.DataBlockGroup dataBlockGroup1 = objectReader.read(rst.streamDataBlocks().get(0).dataBlockIndex()).get(); + try (dataBlockGroup1) { + assertEquals(4, dataBlockGroup1.recordCount()); + Iterator it = dataBlockGroup1.iterator(); + assertEquals(13L, it.next().getBaseOffset()); + assertEquals(14L, it.next().getBaseOffset()); + assertEquals(15L, it.next().getBaseOffset()); + assertEquals(16L, it.next().getBaseOffset()); + assertFalse(it.hasNext()); + } + ObjectReader.DataBlockGroup dataBlockGroup2 = objectReader.read(rst.streamDataBlocks().get(1).dataBlockIndex()).get(); + try (dataBlockGroup2) { + assertEquals(1, dataBlockGroup2.recordCount()); + Iterator it = dataBlockGroup2.iterator(); + StreamRecordBatch record = it.next(); + assertEquals(17L, record.getBaseOffset()); + assertEquals(18L, record.getLastOffset()); + } + objectReader.close(); + } + } + @Test public void testGroup() { List objects = List.of( diff --git a/s3stream/src/test/java/com/automq/stream/s3/cache/DataBlockReadAccumulatorTest.java b/s3stream/src/test/java/com/automq/stream/s3/cache/DataBlockReadAccumulatorTest.java index be9e4c02a..cf18c50b4 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/cache/DataBlockReadAccumulatorTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/cache/DataBlockReadAccumulatorTest.java @@ -54,7 +54,7 @@ public void test() throws ExecutionException, InterruptedException, TimeoutExcep ObjectReader reader = mock(ObjectReader.class); DataBlockIndex dataBlockIndex = new DataBlockIndex(10, 0, 12, 2, 10, 100); StreamDataBlock streamDataBlock = new StreamDataBlock(1, dataBlockIndex); - CompletableFuture readerCf = new CompletableFuture<>(); + CompletableFuture readerCf = new CompletableFuture<>(); when(reader.read(eq(dataBlockIndex))).thenReturn(readerCf); List reserveResults = accumulator.reserveDataBlock(List.of(new ImmutablePair<>(reader, streamDataBlock))); @@ -67,13 +67,13 @@ public void test() throws ExecutionException, InterruptedException, TimeoutExcep accumulator.readDataBlock(reader, dataBlockIndex); - ObjectReader.DataBlock dataBlock = mock(ObjectReader.DataBlock.class); + ObjectReader.DataBlockGroup dataBlockGroup = mock(ObjectReader.DataBlockGroup.class); List records = List.of( newRecord(10, 10, 2, 1), newRecord(10, 12, 2, 1) ); - when(dataBlock.recordCount()).thenReturn(2); - when(dataBlock.iterator()).thenAnswer(args -> { + when(dataBlockGroup.recordCount()).thenReturn(2); + when(dataBlockGroup.iterator()).thenAnswer(args -> { Iterator it = records.iterator(); return new CloseableIterator() { @@ -93,8 +93,8 @@ public void close() { } }; }); - when(dataBlock.recordCount()).thenReturn(2); - readerCf.complete(dataBlock); + when(dataBlockGroup.recordCount()).thenReturn(2); + readerCf.complete(dataBlockGroup); verify(reader, times(1)).read(any()); diff --git a/s3stream/src/test/java/com/automq/stream/s3/cache/StreamReaderTest.java b/s3stream/src/test/java/com/automq/stream/s3/cache/StreamReaderTest.java index 01df04771..d9293e828 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/cache/StreamReaderTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/cache/StreamReaderTest.java @@ -143,7 +143,7 @@ public void testSyncReadAhead() { new StreamDataBlock(1, index1)))); ObjectReader reader = Mockito.mock(ObjectReader.class); - ObjectReader.DataBlock dataBlock1 = Mockito.mock(ObjectReader.DataBlock.class); + ObjectReader.DataBlockGroup dataBlockGroup1 = Mockito.mock(ObjectReader.DataBlockGroup.class); StreamRecordBatch record1 = new StreamRecordBatch(233L, 0, 0, 64, TestUtils.random(128)); record1.release(); StreamRecordBatch record2 = new StreamRecordBatch(233L, 0, 64, 64, TestUtils.random(128)); @@ -152,7 +152,7 @@ public void testSyncReadAhead() { AtomicInteger remaining = new AtomicInteger(0); Assertions.assertEquals(1, record1.getPayload().refCnt()); Assertions.assertEquals(1, record2.getPayload().refCnt()); - Mockito.when(dataBlock1.iterator()).thenReturn(new CloseableIterator<>() { + Mockito.when(dataBlockGroup1.iterator()).thenReturn(new CloseableIterator<>() { @Override public void close() { } @@ -170,7 +170,7 @@ public StreamRecordBatch next() { return records.get(remaining.getAndIncrement()); } }); - Mockito.when(reader.read(index1)).thenReturn(CompletableFuture.completedFuture(dataBlock1)); + Mockito.when(reader.read(index1)).thenReturn(CompletableFuture.completedFuture(dataBlockGroup1)); context.objectReaderMap = new HashMap<>(Map.of(1L, reader)); CompletableFuture> cf = streamReader.handleSyncReadAhead(TraceContext.DEFAULT, 233L, 0, 999, 64, Mockito.mock(ReadAheadAgent.class), UUID.randomUUID(), new TimerUtil(), context); @@ -202,7 +202,7 @@ public void testSyncReadAheadNotAlign() { new StreamDataBlock(1, index1)))); ObjectReader reader = Mockito.mock(ObjectReader.class); - ObjectReader.DataBlock dataBlock1 = Mockito.mock(ObjectReader.DataBlock.class); + ObjectReader.DataBlockGroup dataBlockGroup1 = Mockito.mock(ObjectReader.DataBlockGroup.class); StreamRecordBatch record1 = new StreamRecordBatch(233L, 0, 0, 64, TestUtils.random(128)); record1.release(); StreamRecordBatch record2 = new StreamRecordBatch(233L, 0, 64, 64, TestUtils.random(128)); @@ -211,7 +211,7 @@ public void testSyncReadAheadNotAlign() { AtomicInteger remaining = new AtomicInteger(0); Assertions.assertEquals(1, record1.getPayload().refCnt()); Assertions.assertEquals(1, record2.getPayload().refCnt()); - Mockito.when(dataBlock1.iterator()).thenReturn(new CloseableIterator<>() { + Mockito.when(dataBlockGroup1.iterator()).thenReturn(new CloseableIterator<>() { @Override public void close() { } @@ -229,7 +229,7 @@ public StreamRecordBatch next() { return records.get(remaining.getAndIncrement()); } }); - Mockito.when(reader.read(index1)).thenReturn(CompletableFuture.completedFuture(dataBlock1)); + Mockito.when(reader.read(index1)).thenReturn(CompletableFuture.completedFuture(dataBlockGroup1)); context.objectReaderMap = new HashMap<>(Map.of(1L, reader)); ReadAheadTaskKey key = new ReadAheadTaskKey(233L, startOffset); context.taskKeySet.add(key); @@ -265,7 +265,7 @@ public void testSyncReadAheadException() { new StreamDataBlock(1, index2)))); ObjectReader reader = Mockito.mock(ObjectReader.class); - ObjectReader.DataBlock dataBlock1 = Mockito.mock(ObjectReader.DataBlock.class); + ObjectReader.DataBlockGroup dataBlockGroup1 = Mockito.mock(ObjectReader.DataBlockGroup.class); StreamRecordBatch record1 = new StreamRecordBatch(233L, 0, 0, 64, TestUtils.random(128)); record1.release(); StreamRecordBatch record2 = new StreamRecordBatch(233L, 0, 0, 64, TestUtils.random(128)); @@ -274,7 +274,7 @@ public void testSyncReadAheadException() { AtomicInteger remaining = new AtomicInteger(records.size()); Assertions.assertEquals(1, record1.getPayload().refCnt()); Assertions.assertEquals(1, record2.getPayload().refCnt()); - Mockito.when(dataBlock1.iterator()).thenReturn(new CloseableIterator<>() { + Mockito.when(dataBlockGroup1.iterator()).thenReturn(new CloseableIterator<>() { @Override public void close() { } @@ -292,7 +292,7 @@ public StreamRecordBatch next() { return records.get(remaining.get()); } }); - Mockito.when(reader.read(index1)).thenReturn(CompletableFuture.completedFuture(dataBlock1)); + Mockito.when(reader.read(index1)).thenReturn(CompletableFuture.completedFuture(dataBlockGroup1)); Mockito.when(reader.read(index2)).thenReturn(CompletableFuture.failedFuture(new RuntimeException("exception"))); context.objectReaderMap = new HashMap<>(Map.of(1L, reader)); CompletableFuture> cf = streamReader.handleSyncReadAhead(TraceContext.DEFAULT, 233L, 0, @@ -330,7 +330,7 @@ public void testAsyncReadAhead() { new StreamDataBlock(1, index1)))); ObjectReader reader = Mockito.mock(ObjectReader.class); - ObjectReader.DataBlock dataBlock1 = Mockito.mock(ObjectReader.DataBlock.class); + ObjectReader.DataBlockGroup dataBlockGroup1 = Mockito.mock(ObjectReader.DataBlockGroup.class); StreamRecordBatch record1 = new StreamRecordBatch(233L, 0, 0, 64, TestUtils.random(128)); record1.release(); StreamRecordBatch record2 = new StreamRecordBatch(233L, 0, 64, 64, TestUtils.random(128)); @@ -339,7 +339,7 @@ public void testAsyncReadAhead() { AtomicInteger remaining = new AtomicInteger(0); Assertions.assertEquals(1, record1.getPayload().refCnt()); Assertions.assertEquals(1, record2.getPayload().refCnt()); - Mockito.when(dataBlock1.iterator()).thenReturn(new CloseableIterator<>() { + Mockito.when(dataBlockGroup1.iterator()).thenReturn(new CloseableIterator<>() { @Override public void close() { } @@ -357,7 +357,7 @@ public StreamRecordBatch next() { return records.get(remaining.getAndIncrement()); } }); - Mockito.when(reader.read(index1)).thenReturn(CompletableFuture.completedFuture(dataBlock1)); + Mockito.when(reader.read(index1)).thenReturn(CompletableFuture.completedFuture(dataBlockGroup1)); context.objectReaderMap = new HashMap<>(Map.of(1L, reader)); CompletableFuture cf = streamReader.handleAsyncReadAhead(233L, 0, 999, 1024, Mockito.mock(ReadAheadAgent.class), new TimerUtil(), context); @@ -387,7 +387,7 @@ public void testAsyncReadAheadException() { new StreamDataBlock(1, index2)))); ObjectReader reader = Mockito.mock(ObjectReader.class); - ObjectReader.DataBlock dataBlock1 = Mockito.mock(ObjectReader.DataBlock.class); + ObjectReader.DataBlockGroup dataBlockGroup1 = Mockito.mock(ObjectReader.DataBlockGroup.class); StreamRecordBatch record1 = new StreamRecordBatch(233L, 0, 0, 64, TestUtils.random(128)); record1.release(); StreamRecordBatch record2 = new StreamRecordBatch(233L, 0, 64, 64, TestUtils.random(128)); @@ -396,7 +396,7 @@ public void testAsyncReadAheadException() { AtomicInteger remaining = new AtomicInteger(0); Assertions.assertEquals(1, record1.getPayload().refCnt()); Assertions.assertEquals(1, record2.getPayload().refCnt()); - Mockito.when(dataBlock1.iterator()).thenReturn(new CloseableIterator<>() { + Mockito.when(dataBlockGroup1.iterator()).thenReturn(new CloseableIterator<>() { @Override public void close() { } @@ -414,7 +414,7 @@ public StreamRecordBatch next() { return records.get(remaining.getAndIncrement()); } }); - Mockito.when(reader.read(index1)).thenReturn(CompletableFuture.completedFuture(dataBlock1)); + Mockito.when(reader.read(index1)).thenReturn(CompletableFuture.completedFuture(dataBlockGroup1)); Mockito.when(reader.read(index2)).thenReturn(CompletableFuture.failedFuture(new RuntimeException("exception"))); context.objectReaderMap = new HashMap<>(Map.of(1L, reader));