diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index a62fe0fef..ee4913a52 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -312,29 +312,30 @@ public CompletableFuture forceSplitAll() { * * @param streamMetadataList metadata of opened streams * @param objectMetadata stream set object to split - * @return List of CompletableFuture of StreamObject + * @param cfs List of CompletableFuture of StreamObject + * @return true if split succeed, false otherwise */ - private Collection> splitStreamSetObject(List streamMetadataList, - S3ObjectMetadata objectMetadata) { + private boolean splitStreamSetObject(List streamMetadataList, + S3ObjectMetadata objectMetadata, Collection> cfs) { if (objectMetadata == null) { - return new ArrayList<>(); + return false; } Map> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList, Collections.singletonList(objectMetadata), s3Operator, logger); if (streamDataBlocksMap.isEmpty()) { - // object not exist, metadata is out of date - logger.warn("Object {} not exist, metadata is out of date", objectMetadata.objectId()); - return new ArrayList<>(); + logger.warn("Read index for object {} failed", objectMetadata.objectId()); + return false; } List streamDataBlocks = streamDataBlocksMap.get(objectMetadata.objectId()); if (streamDataBlocks.isEmpty()) { // object is empty, metadata is out of date - logger.warn("Object {} is empty, metadata is out of date", objectMetadata.objectId()); - return new ArrayList<>(); + logger.info("Object {} is out of date, will be deleted after compaction", objectMetadata.objectId()); + return true; } - return groupAndSplitStreamDataBlocks(objectMetadata, streamDataBlocks); + cfs.addAll(groupAndSplitStreamDataBlocks(objectMetadata, streamDataBlocks)); + return true; } Collection> groupAndSplitStreamDataBlocks(S3ObjectMetadata objectMetadata, @@ -414,8 +415,9 @@ Collection> groupAndSplitStreamDataBlocks(S3Obje CommitStreamSetObjectRequest buildSplitRequest(List streamMetadataList, S3ObjectMetadata objectToSplit) throws CompletionException { - Collection> cfs = splitStreamSetObject(streamMetadataList, objectToSplit); - if (cfs.isEmpty()) { + List> cfs = new ArrayList<>(); + boolean status = splitStreamSetObject(streamMetadataList, objectToSplit, cfs); + if (!status) { logger.error("Force split object {} failed, no stream object generated", objectToSplit.objectId()); return null; } @@ -468,6 +470,12 @@ CommitStreamSetObjectRequest buildCompactRequest(List streamMeta executeCompactionPlans(request, compactionPlans, objectsToCompact); compactionPlans.forEach(c -> c.streamDataBlocksMap().values().forEach(v -> v.forEach(b -> compactedObjectIds.add(b.getObjectId())))); + // compact out-dated objects directly + streamDataBlockMap.entrySet().stream().filter(e -> e.getValue().isEmpty()).forEach(e -> { + logger.info("Object {} is out of date, will be deleted after compaction", e.getKey()); + compactedObjectIds.add(e.getKey()); + }); + request.setCompactedObjectIds(new ArrayList<>(compactedObjectIds)); List compactedObjectMetadata = objectsToCompact.stream() .filter(e -> compactedObjectIds.contains(e.objectId())).toList(); diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java index 6d900a990..9045c956a 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java @@ -152,6 +152,37 @@ public void testForceSplit() { Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(2)), request)); } + @Test + public void testForceSplitWithOutDatedObject() { + when(streamManager.getStreams(Collections.emptyList())).thenReturn(CompletableFuture.completedFuture( + List.of(new StreamMetadata(STREAM_0, 0, 999, 9999, StreamState.OPENED), + new StreamMetadata(STREAM_1, 0, 999, 9999, StreamState.OPENED), + new StreamMetadata(STREAM_2, 0, 999, 9999, StreamState.OPENED)))); + + List streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join(); + List s3ObjectMetadata = this.objectManager.getServerObjects().join(); + when(config.streamSetObjectCompactionForceSplitPeriod()).thenReturn(0); + compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator); + + CommitStreamSetObjectRequest request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(0)); + Assertions.assertEquals(-1, request.getObjectId()); + Assertions.assertEquals(List.of(OBJECT_0), request.getCompactedObjectIds()); + Assertions.assertTrue(request.getStreamObjects().isEmpty()); + Assertions.assertTrue(request.getStreamRanges().isEmpty()); + + request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(1)); + Assertions.assertEquals(-1, request.getObjectId()); + Assertions.assertEquals(List.of(OBJECT_1), request.getCompactedObjectIds()); + Assertions.assertTrue(request.getStreamObjects().isEmpty()); + Assertions.assertTrue(request.getStreamRanges().isEmpty()); + + request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(2)); + Assertions.assertEquals(-1, request.getObjectId()); + Assertions.assertEquals(List.of(OBJECT_2), request.getCompactedObjectIds()); + Assertions.assertTrue(request.getStreamObjects().isEmpty()); + Assertions.assertTrue(request.getStreamRanges().isEmpty()); + } + @Test public void testForceSplitWithException() { S3AsyncClient s3AsyncClient = Mockito.mock(S3AsyncClient.class); @@ -320,6 +351,26 @@ public void testCompactionWithDataTrimmed4() { Assertions.assertTrue(checkDataIntegrity(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, request)); } + @Test + public void testCompactWithOutdatedObject() { + when(streamManager.getStreams(Collections.emptyList())).thenReturn(CompletableFuture.completedFuture( + List.of(new StreamMetadata(STREAM_0, 0, 15, 20, StreamState.OPENED), + new StreamMetadata(STREAM_1, 0, 60, 500, StreamState.OPENED), + new StreamMetadata(STREAM_2, 0, 60, 270, StreamState.OPENED)))); + compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator); + List streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join(); + CommitStreamSetObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST); + + assertEquals(List.of(OBJECT_0, OBJECT_1, OBJECT_2), request.getCompactedObjectIds()); + assertEquals(OBJECT_0, request.getOrderId()); + assertTrue(request.getObjectId() > OBJECT_2); + request.getStreamObjects().forEach(s -> assertTrue(s.getObjectId() > OBJECT_2)); + assertEquals(2, request.getStreamObjects().size()); + assertEquals(2, request.getStreamRanges().size()); + + Assertions.assertTrue(checkDataIntegrity(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, request)); + } + @Test public void testCompactWithNonExistStream() { when(streamManager.getStreams(Collections.emptyList())).thenReturn(CompletableFuture.completedFuture(