Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(s3stream): fix stream reader busy loop when metadata replay delay #1215

Merged
merged 2 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package kafka.log.stream.s3.objects;


import com.automq.stream.s3.exceptions.AutoMQException;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.objects.CommitStreamSetObjectRequest;
import com.automq.stream.s3.objects.CommitStreamSetObjectResponse;
Expand Down Expand Up @@ -202,7 +203,8 @@ public Builder toRequestBuilder() {
public CompletableFuture<List<S3ObjectMetadata>> getObjects(long streamId, long startOffset, long endOffset, int limit) {
return this.metadataManager.fetch(streamId, startOffset, endOffset, limit).thenApply(inRangeObjects -> {
if (inRangeObjects == null || inRangeObjects == InRangeObjects.INVALID) {
return Collections.emptyList();
LOGGER.error("Unexpect getObjects result={} from streamId={} [{}, {}) limit={}", inRangeObjects, streamId, startOffset, endOffset, limit);
throw new AutoMQException("Unexpect getObjects result");
}
return inRangeObjects.objects();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import com.automq.stream.s3.metadata.S3StreamConstant;
import com.automq.stream.s3.metadata.StreamState;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.common.metadata.RangeRecord;
import org.apache.kafka.common.metadata.RemoveRangeRecord;
import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord;
Expand All @@ -32,14 +35,11 @@
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.automq.AutoMQVersion;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;

@Timeout(value = 40)
Expand All @@ -59,16 +59,16 @@ public void testRanges() {
S3StreamMetadataDelta delta0 = new S3StreamMetadataDelta(image0);
// 1. create stream0
delta0Records.add(new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(STREAM0)
.setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX)
.setStreamState(StreamState.OPENED.toByte())
.setStartOffset(S3StreamConstant.INIT_START_OFFSET)
.setEpoch(0L)
.setStartOffset(0L), (short) 0));
.setStreamId(STREAM0)
.setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX)
.setStreamState(StreamState.OPENED.toByte())
.setStartOffset(S3StreamConstant.INIT_START_OFFSET)
.setEpoch(0L)
.setStartOffset(0L), (short) 0));
RecordTestUtils.replayAll(delta0, delta0Records);
// verify delta and check image's write
S3StreamMetadataImage image1 = new S3StreamMetadataImage(
STREAM0, 0L, StreamState.OPENED, S3StreamConstant.INIT_START_OFFSET, Collections.emptyList(), Collections.emptyList());
STREAM0, 0L, StreamState.OPENED, S3StreamConstant.INIT_START_OFFSET, Collections.emptyList(), Collections.emptyList());
assertEquals(image1, delta0.apply());
testToImageAndBack(image1);

Expand All @@ -77,110 +77,126 @@ public void testRanges() {
List<ApiMessageAndVersion> delta1Records = new ArrayList<>();
S3StreamMetadataDelta delta1 = new S3StreamMetadataDelta(image1);
delta1Records.add(new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(STREAM0)
.setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX)
.setStreamState(StreamState.OPENED.toByte())
.setStartOffset(S3StreamConstant.INIT_START_OFFSET)
.setEpoch(1L), (short) 0));
.setStreamId(STREAM0)
.setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX)
.setStreamState(StreamState.OPENED.toByte())
.setStartOffset(S3StreamConstant.INIT_START_OFFSET)
.setEpoch(1L), (short) 0));
delta1Records.add(new ApiMessageAndVersion(new RangeRecord()
.setStreamId(STREAM0)
.setRangeIndex(0)
.setEpoch(1L)
.setNodeId(BROKER0)
.setStartOffset(0L), (short) 0));
.setStreamId(STREAM0)
.setRangeIndex(0)
.setEpoch(1L)
.setNodeId(BROKER0)
.setStartOffset(0L), (short) 0));
RecordTestUtils.replayAll(delta1, delta1Records);
// verify delta and check image's write
S3StreamMetadataImage image2 = new S3StreamMetadataImage(
STREAM0, 1L, StreamState.OPENED, S3StreamConstant.INIT_START_OFFSET,
List.of(new RangeMetadata(STREAM0, 1L, 0, 0L, 0L, BROKER0)), Collections.emptyList());
STREAM0, 1L, StreamState.OPENED, S3StreamConstant.INIT_START_OFFSET,
List.of(new RangeMetadata(STREAM0, 1L, 0, 0L, 0L, BROKER0)), Collections.emptyList());
assertEquals(image2, delta1.apply());
testToImageAndBack(image2);

// 3. advance range 0_0, node1 is the new leader, and create range0_1
List<ApiMessageAndVersion> delta2Records = new ArrayList<>();
S3StreamMetadataDelta delta2 = new S3StreamMetadataDelta(image2);
delta2Records.add(new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(STREAM0)
.setRangeIndex(0)
.setStreamState(StreamState.OPENED.toByte())
.setStartOffset(S3StreamConstant.INIT_START_OFFSET)
.setEpoch(2L), (short) 0));
.setStreamId(STREAM0)
.setRangeIndex(0)
.setStreamState(StreamState.OPENED.toByte())
.setStartOffset(S3StreamConstant.INIT_START_OFFSET)
.setEpoch(2L), (short) 0));
delta2Records.add(new ApiMessageAndVersion(new RangeRecord()
.setStreamId(STREAM0)
.setRangeIndex(1)
.setEpoch(2L)
.setNodeId(BROKER1)
.setStartOffset(100L)
.setEndOffset(100L), (short) 0));
.setStreamId(STREAM0)
.setRangeIndex(1)
.setEpoch(2L)
.setNodeId(BROKER1)
.setStartOffset(100L)
.setEndOffset(100L), (short) 0));
RecordTestUtils.replayAll(delta2, delta2Records);
// verify delta and check image's write
S3StreamMetadataImage image3 = new S3StreamMetadataImage(
STREAM0, 2L, StreamState.OPENED, 0L, List.of(
new RangeMetadata(STREAM0, 1L, 0, 0, 0, BROKER0),
new RangeMetadata(STREAM0, 2L, 1, 100, 100, BROKER1)), Collections.emptyList());
STREAM0, 2L, StreamState.OPENED, 0L, List.of(
new RangeMetadata(STREAM0, 1L, 0, 0, 0, BROKER0),
new RangeMetadata(STREAM0, 2L, 1, 100, 100, BROKER1)), Collections.emptyList());
assertEquals(image3, delta2.apply());
testToImageAndBack(image3);

// 4. trim stream to start in 100 and remove range 0_0
List<ApiMessageAndVersion> delta3Records = new ArrayList<>();
S3StreamMetadataDelta delta3 = new S3StreamMetadataDelta(image3);
delta3Records.add(new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(STREAM0)
.setEpoch(2L)
.setRangeIndex(0)
.setStreamState(StreamState.OPENED.toByte())
.setStartOffset(100L), (short) 0));
.setStreamId(STREAM0)
.setEpoch(2L)
.setRangeIndex(0)
.setStreamState(StreamState.OPENED.toByte())
.setStartOffset(100L), (short) 0));
delta3Records.add(new ApiMessageAndVersion(new RemoveRangeRecord()
.setStreamId(STREAM0)
.setRangeIndex(0), (short) 0));
.setStreamId(STREAM0)
.setRangeIndex(0), (short) 0));
RecordTestUtils.replayAll(delta3, delta3Records);
// verify delta and check image's write
S3StreamMetadataImage image4 = new S3StreamMetadataImage(
STREAM0, 2L, StreamState.OPENED, 100L, List.of(
new RangeMetadata(STREAM0, 2L, 1, 100L, 100L, BROKER1)), Collections.emptyList());
STREAM0, 2L, StreamState.OPENED, 100L, List.of(
new RangeMetadata(STREAM0, 2L, 1, 100L, 100L, BROKER1)), Collections.emptyList());
assertEquals(image4, delta3.apply());
}

@Test
public void testStreamObjects() {
S3StreamMetadataImage image0 = new S3StreamMetadataImage(
STREAM0, 0L, StreamState.OPENED, 0L, Collections.emptyList(), Collections.emptyList());
STREAM0, 0L, StreamState.OPENED, 0L, Collections.emptyList(), Collections.emptyList());
List<ApiMessageAndVersion> delta0Records = new ArrayList<>();
S3StreamMetadataDelta delta0 = new S3StreamMetadataDelta(image0);
// 1. create streamObject0 and streamObject1
delta0Records.add(new ApiMessageAndVersion(new S3StreamObjectRecord()
.setObjectId(0L)
.setStreamId(STREAM0)
.setStartOffset(0L)
.setEndOffset(100L), (short) 0));
.setObjectId(0L)
.setStreamId(STREAM0)
.setStartOffset(0L)
.setEndOffset(100L), (short) 0));
delta0Records.add(new ApiMessageAndVersion(new S3StreamObjectRecord()
.setObjectId(1L)
.setStreamId(STREAM0)
.setStartOffset(100L)
.setEndOffset(200L), (short) 0));
.setObjectId(1L)
.setStreamId(STREAM0)
.setStartOffset(100L)
.setEndOffset(200L), (short) 0));
RecordTestUtils.replayAll(delta0, delta0Records);
// verify delta and check image's write
S3StreamMetadataImage image1 = new S3StreamMetadataImage(
STREAM0, 0L, StreamState.OPENED, 0L, Collections.emptyList(), List.of(
new S3StreamObject(0L, 999, STREAM0, 0L, 100L),
new S3StreamObject(1L, 999, STREAM0, 100L, 200L)));
STREAM0, 0L, StreamState.OPENED, 0L, Collections.emptyList(), List.of(
new S3StreamObject(0L, 999, STREAM0, 0L, 100L),
new S3StreamObject(1L, 999, STREAM0, 100L, 200L)));
assertEquals(image1, delta0.apply());
testToImageAndBack(image1);

// 2. remove streamObject0
List<ApiMessageAndVersion> delta1Records = new ArrayList<>();
S3StreamMetadataDelta delta1 = new S3StreamMetadataDelta(image1);
delta1Records.add(new ApiMessageAndVersion(new RemoveS3StreamObjectRecord()
.setObjectId(0L), (short) 0));
.setObjectId(0L), (short) 0));
RecordTestUtils.replayAll(delta1, delta1Records);
// verify delta and check image's write
S3StreamMetadataImage image2 = new S3StreamMetadataImage(
STREAM0, 0L, StreamState.OPENED, 0L, Collections.emptyList(), List.of(
new S3StreamObject(1L, 999, STREAM0, 100L, 200L)));
STREAM0, 0L, StreamState.OPENED, 0L, Collections.emptyList(), List.of(
new S3StreamObject(1L, 999, STREAM0, 100L, 200L)));
assertEquals(image2, delta1.apply());
testToImageAndBack(image2);
}

@Test
public void testGetRangeContainsOffset() {
List<RangeMetadata> ranges = List.of(
new RangeMetadata(STREAM0, 0L, 1, 0, 10, 100),
new RangeMetadata(STREAM0, 1L, 2, 10, 10, 101),
new RangeMetadata(STREAM0, 2L, 3, 10, 20, 102),
new RangeMetadata(STREAM0, 3L, 4, 20, 0, 103)
);
S3StreamMetadataImage image = new S3StreamMetadataImage(
STREAM0, 3L, StreamState.OPENED, 0L, ranges, Collections.emptyList());
Assertions.assertEquals(0, image.getRangeContainsOffset(0));
Assertions.assertEquals(0, image.getRangeContainsOffset(5));
Assertions.assertEquals(2, image.getRangeContainsOffset(10));
Assertions.assertEquals(3, image.getRangeContainsOffset(100));
}

private void testToImageAndBack(S3StreamMetadataImage image) {
RecordListWriter writer = new RecordListWriter();
MetadataVersion metadataVersion = MetadataVersion.LATEST_PRODUCTION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,8 @@ private CompletableFuture<ReadDataBlock> read0(FetchContext context,
if (!logCacheRecords.isEmpty()) {
endOffset = logCacheRecords.get(0).getBaseOffset();
}
Timeout timeout = timeoutDetect.newTimeout(t -> LOGGER.warn("read from block cache timeout, stream={}, {}, maxBytes: {}", streamId, startOffset, maxBytes), 1, TimeUnit.MINUTES);
long finalEndOffset = endOffset;
Timeout timeout = timeoutDetect.newTimeout(t -> LOGGER.error("[POTENTIAL_BUG] read from block cache timeout, stream={}, [{},{}), maxBytes: {}", streamId, startOffset, finalEndOffset, maxBytes), 1, TimeUnit.MINUTES);
return blockCache.read(context, streamId, startOffset, endOffset, maxBytes).thenApply(blockCacheRst -> {
List<StreamRecordBatch> rst = new ArrayList<>(blockCacheRst.getRecords());
int remainingBytesSize = maxBytes - rst.stream().mapToInt(StreamRecordBatch::size).sum();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.cache.CacheAccessType;
import com.automq.stream.s3.cache.ReadDataBlock;
import com.automq.stream.s3.exceptions.AutoMQException;
import com.automq.stream.s3.exceptions.BlockNotContinuousException;
import com.automq.stream.s3.exceptions.ObjectNotExistException;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
Expand All @@ -27,6 +28,7 @@
import com.automq.stream.utils.LogSuppressor;
import com.automq.stream.utils.threads.EventLoop;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
Expand Down Expand Up @@ -231,7 +233,8 @@ void afterRead(ReadDataBlock readDataBlock, ReadContext ctx) {
readahead.tryReadahead(readDataBlock.getCacheAccessType() == BLOCK_CACHE_MISS);
}

private CompletableFuture<List<Block>> getBlocks(long startOffset, long endOffset, int maxBytes, boolean readahead) {
private CompletableFuture<List<Block>> getBlocks(long startOffset, long endOffset, int maxBytes,
boolean readahead) {
GetBlocksContext context = new GetBlocksContext(readahead);
try {
getBlocks0(context, startOffset, endOffset, maxBytes);
Expand All @@ -250,7 +253,7 @@ private void getBlocks0(GetBlocksContext ctx, long startOffset, long endOffset,
CompletableFuture<Map<Long, Block>> loadMoreBlocksCf;
int remainingSize = maxBytes;
if (floorKey == null || startOffset >= loadedBlockIndexEndOffset) {
loadMoreBlocksCf = loadMoreBlocksWithoutData();
loadMoreBlocksCf = loadMoreBlocksWithoutData(endOffset);
} else {
boolean firstBlock = true;
boolean fulfill = false;
Expand Down Expand Up @@ -281,14 +284,20 @@ private void getBlocks0(GetBlocksContext ctx, long startOffset, long endOffset,
ctx.cf.complete(ctx.blocks);
return;
} else {
loadMoreBlocksCf = loadMoreBlocksWithoutData();
loadMoreBlocksCf = loadMoreBlocksWithoutData(endOffset);
}
}
int finalRemainingSize = remainingSize;
loadMoreBlocksCf.thenAccept(rst -> {
if (rst.isEmpty()) {
// it's already load to the end
ctx.cf.complete(ctx.blocks);
if (endOffset != -1L && endOffset > loadedBlockIndexEndOffset) {
String errMsg = String.format("[BUG] streamId=%s expect load blocks to endOffset=%s, " +
"current loadedBlockIndexEndOffset=%s", streamId, endOffset, loadedBlockIndexEndOffset);
ctx.cf.completeExceptionally(new AutoMQException(errMsg));
} else {
ctx.cf.complete(ctx.blocks);
}
} else {
long nextStartOffset = ctx.blocks.isEmpty() ? startOffset : ctx.blocks.get(ctx.blocks.size() - 1).index.endOffset();
getBlocks0(ctx, nextStartOffset, endOffset, finalRemainingSize);
Expand All @@ -304,17 +313,21 @@ private void getBlocks0(GetBlocksContext ctx, long startOffset, long endOffset,
*
* @return new block indexes
*/
private CompletableFuture<Map<Long, Block>> loadMoreBlocksWithoutData() {
private CompletableFuture<Map<Long, Block>> loadMoreBlocksWithoutData(long endOffset) {
if (inflightLoadIndexCf != null) {
return inflightLoadIndexCf;
return inflightLoadIndexCf.thenCompose(rst -> loadMoreBlocksWithoutData(endOffset));
}
if (endOffset != -1L && endOffset <= loadedBlockIndexEndOffset) {
return CompletableFuture.completedFuture(Collections.emptyMap());
}

inflightLoadIndexCf = new CompletableFuture<>();
long nextLoadingOffset = calWindowBlocksEndOffset();
AtomicLong nextFindStartOffset = new AtomicLong(nextLoadingOffset);
Map<Long, Block> newDataBlockIndex = new HashMap<>();
TimerUtil time = new TimerUtil();
// 1. get objects
CompletableFuture<List<S3ObjectMetadata>> getObjectsCf = objectManager.getObjects(streamId, nextLoadingOffset, -1L, GET_OBJECT_STEP);
CompletableFuture<List<S3ObjectMetadata>> getObjectsCf = objectManager.getObjects(streamId, nextLoadingOffset, endOffset, GET_OBJECT_STEP);
// 2. get block indexes from objects
CompletableFuture<Void> findBlockIndexesCf = getObjectsCf.whenComplete((rst, ex) -> {
StorageOperationStats.getInstance().getIndicesTimeGetObjectStats.record(time.elapsedAndResetAs(TimeUnit.NANOSECONDS));
Expand Down
Loading
Loading