Skip to content

Commit

Permalink
feat(s3stream): handle out of order records during recovery (#932)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Feb 19, 2024
1 parent e46b193 commit 498690b
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 6 deletions.
68 changes: 65 additions & 3 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -136,12 +138,39 @@ static LogCache.LogCacheBlock recoverContinuousRecords(Iterator<WriteAheadLog.Re
return recoverContinuousRecords(it, openingStreams, LOGGER);
}

/**
* Recover continuous records in each stream from the WAL, and put them into the returned {@link LogCache.LogCacheBlock}.
* It will filter out
* <ul>
* <li>the records that are not in the opening streams</li>
* <li>the records that have been committed</li>
* <li>the records that are not continuous, which means, all records after the first discontinuous record</li>
* </ul>
*
* It throws {@link IllegalStateException} if the start offset of the first recovered record mismatches
* the end offset of any opening stream, which indicates data loss.
* <p>
* If there are out of order records (which should never happen or there is a BUG), it will try to re-order them.
* <p>
* For example, if we recover following records from the WAL in a stream:
* <pre> 1, 2, 3, 5, 4, 6, 10, 11</pre>
* and the {@link StreamMetadata#endOffset()} of this stream is 3. Then the returned {@link LogCache.LogCacheBlock}
* will contain records
* <pre> 3, 4, 5, 6</pre>
* Here,
* <ul>
* <li>The record 1 and 2 are discarded because they have been committed (less than 3, the end offset of the stream)</li>
* <li>The record 10 and 11 are discarded because they are not continuous (10 is not 7, the next offset of 6)</li>
* <li>The record 5 and 4 are reordered because they are out of order, and we handle this bug here</li>
* </ul>
*/
static LogCache.LogCacheBlock recoverContinuousRecords(Iterator<WriteAheadLog.RecoverResult> it,
List<StreamMetadata> openingStreams, Logger logger) {
Map<Long, Long> openingStreamEndOffsets = openingStreams.stream().collect(Collectors.toMap(StreamMetadata::streamId, StreamMetadata::endOffset));
LogCache.LogCacheBlock cacheBlock = new LogCache.LogCacheBlock(1024L * 1024 * 1024);
long logEndOffset = -1L;
Map<Long, Long> streamNextOffsets = new HashMap<>();
Map<Long, Queue<StreamRecordBatch>> streamDiscontinuousRecords = new HashMap<>();
while (it.hasNext()) {
WriteAheadLog.RecoverResult recoverResult = it.next();
logEndOffset = recoverResult.recordOffset();
Expand All @@ -159,15 +188,48 @@ static LogCache.LogCacheBlock recoverContinuousRecords(Iterator<WriteAheadLog.Re
recordBuf.release();
continue;
}

Long expectNextOffset = streamNextOffsets.get(streamId);
Queue<StreamRecordBatch> discontinuousRecords = streamDiscontinuousRecords.get(streamId);
if (expectNextOffset == null || expectNextOffset == streamRecordBatch.getBaseOffset()) {
// continuous record, put it into cache.
cacheBlock.put(streamRecordBatch);
streamNextOffsets.put(streamRecordBatch.getStreamId(), streamRecordBatch.getLastOffset());
expectNextOffset = streamRecordBatch.getLastOffset();
// check if there are some out of order records in the queue.
if (discontinuousRecords != null) {
while (!discontinuousRecords.isEmpty()) {
StreamRecordBatch peek = discontinuousRecords.peek();
if (peek.getBaseOffset() == expectNextOffset) {
// should never happen, log it.
logger.error("[BUG] recover an out of order record, streamId={}, expectNextOffset={}, record={}", streamId, expectNextOffset, peek);
cacheBlock.put(peek);
discontinuousRecords.poll();
expectNextOffset = peek.getLastOffset();
} else {
break;
}
}
}
// update next offset.
streamNextOffsets.put(streamRecordBatch.getStreamId(), expectNextOffset);
} else {
logger.error("unexpected WAL record, streamId={}, expectNextOffset={}, record={}", streamId, expectNextOffset, streamRecordBatch);
streamRecordBatch.release();
// unexpected record, put it into discontinuous records queue.
if (discontinuousRecords == null) {
discontinuousRecords = new PriorityQueue<>(Comparator.comparingLong(StreamRecordBatch::getBaseOffset));
streamDiscontinuousRecords.put(streamId, discontinuousRecords);
}
discontinuousRecords.add(streamRecordBatch);
}
}
// release all discontinuous records.
streamDiscontinuousRecords.values().forEach(queue -> {
if (queue.isEmpty()) {
return;
}
logger.info("drop discontinuous records, records={}", queue);
queue.forEach(StreamRecordBatch::release);
});

if (logEndOffset >= 0L) {
cacheBlock.confirmOffset(logEndOffset);
}
Expand Down
31 changes: 28 additions & 3 deletions s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -223,26 +223,51 @@ public void testRecoverContinuousRecords() {
);

List<StreamMetadata> openingStreams = List.of(new StreamMetadata(233L, 0L, 0L, 11L, StreamState.OPENED));
LogCache.LogCacheBlock cacheBlock = storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams);
LogCache.LogCacheBlock cacheBlock = S3Storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams);
// ignore closed stream and noncontinuous records.
assertEquals(1, cacheBlock.records().size());
List<StreamRecordBatch> streamRecords = cacheBlock.records().get(233L);
assertEquals(2, streamRecords.size());
assertEquals(11L, streamRecords.get(0).getBaseOffset());
assertEquals(12L, streamRecords.get(1).getBaseOffset());

//
// simulate data loss
openingStreams = List.of(
new StreamMetadata(233L, 0L, 0L, 5L, StreamState.OPENED));
boolean exception = false;
try {
storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams);
S3Storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams);
} catch (IllegalStateException e) {
exception = true;
}
Assertions.assertTrue(exception);
}

@Test
public void testRecoverOutOfOrderRecords() {
List<WriteAheadLog.RecoverResult> recoverResults = List.of(
new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 9L))),
new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 10L))),
new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 13L))),
new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 11L))),
new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 12L))),
new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 14L))),
new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 20L)))
);

List<StreamMetadata> openingStreams = List.of(new StreamMetadata(42L, 0L, 0L, 10L, StreamState.OPENED));
LogCache.LogCacheBlock cacheBlock = S3Storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams);
// ignore closed stream and noncontinuous records.
assertEquals(1, cacheBlock.records().size());
List<StreamRecordBatch> streamRecords = cacheBlock.records().get(42L);
assertEquals(5, streamRecords.size());
assertEquals(10L, streamRecords.get(0).getBaseOffset());
assertEquals(11L, streamRecords.get(1).getBaseOffset());
assertEquals(12L, streamRecords.get(2).getBaseOffset());
assertEquals(13L, streamRecords.get(3).getBaseOffset());
assertEquals(14L, streamRecords.get(4).getBaseOffset());
}

@Test
public void testWALOverCapacity() throws WriteAheadLog.OverCapacityException {
storage.append(newRecord(233L, 10L));
Expand Down

0 comments on commit 498690b

Please sign in to comment.