Skip to content

Commit

Permalink
Merge branch 'main' into optimzie_stream_record_batch_list
Browse files Browse the repository at this point in the history
  • Loading branch information
lifepuzzlefun authored Jan 29, 2024
2 parents ce15556 + 5cbd19c commit 004b110
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 21 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/s3-stream-e2e.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: E2E-TEST for AutoMQ S3Stream
on:
pull_request:
pull_request_target:
types:
- opened
- reopened
Expand Down Expand Up @@ -47,4 +47,4 @@ jobs:
report_paths: '**/surefire-reports/TEST-*.xml'
annotate_only: true
include_passed: true
detailed_summary: true
detailed_summary: true
8 changes: 3 additions & 5 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -456,11 +456,9 @@ public CompletableFuture<Void> forceUpload(long streamId) {
StorageOperationStats.getInstance().forceUploadWALAwaitStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS));
uploadDeltaWAL(streamId, true);
// Wait for all tasks contains streamId complete.
List<CompletableFuture<Void>> tasksContainsStream = this.inflightWALUploadTasks.stream()
.filter(it -> it.cache.containsStream(streamId))
.map(it -> it.cf)
.toList();
FutureUtil.propagate(CompletableFuture.allOf(tasksContainsStream.toArray(new CompletableFuture[0])), cf);
FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.stream()
.filter(it -> it.cache.containsStream(streamId))
.map(it -> it.cf).toArray(CompletableFuture[]::new)), cf);
if (LogCache.MATCH_ALL_STREAMS != streamId) {
callbackSequencer.tryFree(streamId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -146,7 +148,7 @@ public void shutdown() {
openedStreams.forEach((streamId, stream) -> streamCloseFutures.put(streamId, stream.close()));
for (; ; ) {
Threads.sleep(1000);
List<Long> closingStreams = streamCloseFutures.entrySet().stream().filter(e -> !e.getValue().isDone()).map(Map.Entry::getKey).toList();
List<Long> closingStreams = streamCloseFutures.entrySet().stream().filter(e -> !e.getValue().isDone()).map(Map.Entry::getKey).collect(Collectors.toList());
LOGGER.info("waiting streams close, closed {} / all {}, closing[{}]", streamCloseFutures.size() - closingStreams.size(), streamCloseFutures.size(), closingStreams);
if (closingStreams.isEmpty()) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void shutdown() {
public CompletableFuture<Void> compact() {
return this.objectManager.getServerObjects().thenComposeAsync(objectMetadataList -> {
List<Long> streamIds = objectMetadataList.stream().flatMap(e -> e.getOffsetRanges().stream())
.map(StreamOffsetRange::streamId).distinct().toList();
.map(StreamOffsetRange::streamId).distinct().collect(Collectors.toList());
return this.streamManager.getStreams(streamIds).thenAcceptAsync(streamMetadataList ->
this.compact(streamMetadataList, objectMetadataList), compactThreadPool);
}, compactThreadPool);
Expand Down Expand Up @@ -289,7 +289,7 @@ public CompletableFuture<Void> forceSplitAll() {
//TODO: deal with metadata delay
this.compactScheduledExecutor.execute(() -> this.objectManager.getServerObjects().thenAcceptAsync(objectMetadataList -> {
List<Long> streamIds = objectMetadataList.stream().flatMap(e -> e.getOffsetRanges().stream())
.map(StreamOffsetRange::streamId).distinct().toList();
.map(StreamOffsetRange::streamId).distinct().collect(Collectors.toList());
this.streamManager.getStreams(streamIds).thenAcceptAsync(streamMetadataList -> {
if (objectMetadataList.isEmpty()) {
logger.info("No stream set objects to force split");
Expand Down Expand Up @@ -370,7 +370,7 @@ Collection<CompletableFuture<StreamObject>> groupAndSplitStreamDataBlocks(S3Obje
// prepare N stream objects at one time
objectManager.prepareObject(batchGroup.size(), TimeUnit.MINUTES.toMillis(CompactionConstants.S3_OBJECT_TTL_MINUTES))
.thenComposeAsync(objectId -> {
List<StreamDataBlock> blocksToRead = batchGroup.stream().flatMap(p -> p.getLeft().stream()).toList();
List<StreamDataBlock> blocksToRead = batchGroup.stream().flatMap(p -> p.getLeft().stream()).collect(Collectors.toList());
DataBlockReader reader = new DataBlockReader(objectMetadata, s3Operator, compactionBucket, bucketCallbackScheduledExecutor);
// batch read
reader.readBlocks(blocksToRead, Math.min(CompactionConstants.S3_OBJECT_MAX_READ_BATCH, networkBandwidth));
Expand Down Expand Up @@ -478,7 +478,7 @@ CommitStreamSetObjectRequest buildCompactRequest(List<StreamMetadata> streamMeta

request.setCompactedObjectIds(new ArrayList<>(compactedObjectIds));
List<S3ObjectMetadata> compactedObjectMetadata = objectsToCompact.stream()
.filter(e -> compactedObjectIds.contains(e.objectId())).toList();
.filter(e -> compactedObjectIds.contains(e.objectId())).collect(Collectors.toList());
if (isSanityCheckFailed(streamMetadataList, compactedObjectMetadata, request)) {
logger.error("Sanity check failed, compaction result is illegal");
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,11 @@ public synchronized CompletableFuture<List<S3ObjectMetadata>> getObjects(long st
.stream()
.map(Pair::getRight)
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.streamId() == streamId && r.endOffset() > startOffset && (r.startOffset() < endOffset || endOffset == -1)))
.toList();
.collect(Collectors.toList());
List<S3ObjectMetadata> streamObjectList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>())
.stream()
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.streamId() == streamId && r.endOffset() > startOffset && (r.startOffset() < endOffset || endOffset == -1)))
.toList();
.collect(Collectors.toList());

List<S3ObjectMetadata> result = new ArrayList<>();
result.addAll(streamSetObjectList);
Expand All @@ -182,15 +182,15 @@ public synchronized CompletableFuture<List<S3ObjectMetadata>> getObjects(long st
return Long.compare(startOffset1, startOffset2);
});

return CompletableFuture.completedFuture(result.stream().limit(limit).toList());
return CompletableFuture.completedFuture(result.stream().limit(limit).collect(Collectors.toList()));
}

@Override
public synchronized CompletableFuture<List<S3ObjectMetadata>> getServerObjects() {
List<S3ObjectMetadata> result = streamSetObjects.values()
.stream()
.filter(pair -> pair.getLeft() == NODE_ID_ALLOC.get())
.map(Pair::getRight).toList();
.map(Pair::getRight).collect(Collectors.toList());
return CompletableFuture.completedFuture(result);
}

Expand All @@ -201,18 +201,18 @@ public synchronized CompletableFuture<List<S3ObjectMetadata>> getStreamObjects(l
.stream()
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.streamId() == streamId && r.endOffset() > startOffset && (r.startOffset() < endOffset || endOffset == -1)))
.limit(limit)
.toList();
.collect(Collectors.toList());
return CompletableFuture.completedFuture(streamObjectList);
}

@Override
public synchronized CompletableFuture<List<StreamMetadata>> getOpeningStreams() {
return CompletableFuture.completedFuture(streams.values().stream().filter(stream -> stream.state() == StreamState.OPENED).toList());
return CompletableFuture.completedFuture(streams.values().stream().filter(stream -> stream.state() == StreamState.OPENED).collect(Collectors.toList()));
}

@Override
public CompletableFuture<List<StreamMetadata>> getStreams(List<Long> streamIds) {
return CompletableFuture.completedFuture(streamIds.stream().map(streams::get).filter(Objects::nonNull).toList());
return CompletableFuture.completedFuture(streamIds.stream().map(streams::get).filter(Objects::nonNull).collect(Collectors.toList()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ public void testCompactWithLimit() {
assertEquals(1, request.getStreamRanges().size());

Set<Long> compactedObjectIds = new HashSet<>(request.getCompactedObjectIds());
s3ObjectMetadata = s3ObjectMetadata.stream().filter(s -> compactedObjectIds.contains(s.objectId())).toList();
s3ObjectMetadata = s3ObjectMetadata.stream().filter(s -> compactedObjectIds.contains(s.objectId())).collect(Collectors.toList());
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, s3ObjectMetadata, request));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
Expand Down Expand Up @@ -70,7 +72,7 @@ void testDeleteObjectsSuccess() {
.map(o -> DeletedObject.builder()
.key(o.key())
.build())
.toList())
.collect(Collectors.toList()))
.build();
return CompletableFuture.completedFuture(response);
});
Expand Down

0 comments on commit 004b110

Please sign in to comment.