Skip to content

Commit

Permalink
feat(s3stream/wal): a tool used to simulate recovery (#960)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Mar 12, 2024
1 parent e6e803c commit ebf8257
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,20 @@ public class BlockWALService implements WriteAheadLog {
private BlockWALService() {
}

/**
* A protected constructor for testing purpose.
*/
protected BlockWALService(BlockWALServiceBuilder builder) {
BlockWALService that = builder.build();
this.initialWindowSize = that.initialWindowSize;
this.walChannel = that.walChannel;
this.slidingWindowService = that.slidingWindowService;
this.walHeader = that.walHeader;
this.recoveryMode = that.recoveryMode;
this.nodeId = that.nodeId;
this.epoch = that.epoch;
}

public static BlockWALServiceBuilder builder(String path, long capacity) {
return new BlockWALServiceBuilder(path, capacity);
}
Expand Down Expand Up @@ -293,6 +307,13 @@ private long getCurrentStartOffset() {
}
}

/**
* Protected method for testing purpose.
*/
protected WALHeader tryReadWALHeader() {
return tryReadWALHeader(walChannel);
}

/**
* Try to read the header from WAL, return the latest one.
*/
Expand Down Expand Up @@ -792,7 +813,10 @@ public long getJumpNextRecoverOffset() {
}
}

class RecoverIterator implements Iterator<RecoverResult> {
/**
* Protected for testing purpose.
*/
protected class RecoverIterator implements Iterator<RecoverResult> {
private final long windowLength;
private final long skipRecordAtOffset;
private long nextRecoverOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* 8 - [4B] {@link WALHeader#crc8} CRC of the rest of the WAL header, used to verify the correctness of the
* WAL header
*/
class WALHeader {
public class WALHeader {
public static final int WAL_HEADER_MAGIC_CODE = 0x12345678;
public static final int WAL_HEADER_SIZE = 4 // magic code
+ 8 // capacity
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.wal.benchmark;

import com.automq.stream.s3.StreamRecordBatchCodec;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.wal.BlockWALService;
import com.automq.stream.s3.wal.WALHeader;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Iterator;
import java.util.function.Function;
import java.util.stream.StreamSupport;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;

import static com.automq.stream.s3.wal.benchmark.BenchTool.parseArgs;

/**
* RecoverTool is a tool to recover records in a WAL manually.
* It extends {@link BlockWALService} to use tools provided by {@link BlockWALService}
*/
public class RecoverTool extends BlockWALService implements AutoCloseable {

public RecoverTool(Config config) throws IOException {
super(BlockWALService.recoveryBuilder(config.path));
super.start();
}

public static void main(String[] args) throws IOException {
Namespace ns = parseArgs(Config.parser(), args);
Config config = new Config(ns);

try (RecoverTool tool = new RecoverTool(config)) {
tool.run(config);
}
}

private void run(Config config) {
WALHeader header = super.tryReadWALHeader();
System.out.println(header);

Iterable<RecoverResult> recordsSupplier = () -> recover(header, config);
Function<ByteBuf, StreamRecordBatch> decoder = StreamRecordBatchCodec::decode;
Function<ByteBuf, String> stringer = decoder.andThen(StreamRecordBatch::toString);
StreamSupport.stream(recordsSupplier.spliterator(), false)
.map(it -> new RecoverResultWrapper(it, stringer))
.peek(System.out::println)
.forEach(RecoverResultWrapper::release);
}

private Iterator<RecoverResult> recover(WALHeader header, Config config) {
long recoverOffset = config.offset != null ? config.offset : header.getTrimOffset();
long windowLength = header.getSlidingWindowMaxLength();
long skipRecordAtOffset = config.skipTrimmed ? header.getTrimOffset() : -1;
return new RecoverIterator(recoverOffset, windowLength, skipRecordAtOffset);
}

@Override
public void close() {
super.shutdownGracefully();
}

/**
* A wrapper for {@link RecoverResult} to provide a function to convert {@link RecoverResult#record} to string
*/
public static class RecoverResultWrapper {
private final RecoverResult inner;
/**
* A function to convert {@link RecoverResult#record} to string
*/
private final Function<ByteBuf, String> stringer;

public RecoverResultWrapper(RecoverResult inner, Function<ByteBuf, String> stringer) {
this.inner = inner;
this.stringer = stringer;
}

public void release() {
inner.record().release();
}

@Override
public String toString() {
return String.format("%s{", inner.getClass().getSimpleName())
+ String.format("record=(%d)", inner.record().readableBytes()) + stringer.apply(inner.record())
+ ", offset=" + inner.recordOffset()
+ '}';
}
}

public static class Config {
final String path;
final Long offset;
final Boolean skipTrimmed;

Config(Namespace ns) {
this.path = ns.getString("path");
this.offset = ns.getLong("offset");
this.skipTrimmed = ns.getBoolean("skipTrimmed");
}

static ArgumentParser parser() {
ArgumentParser parser = ArgumentParsers
.newFor("RecoverTool")
.build()
.defaultHelp(true)
.description("Recover records in a WAL file");
parser.addArgument("-p", "--path")
.required(true)
.help("Path of the WAL file");
parser.addArgument("--offset")
.type(Long.class)
.help("Offset to start recovering, default to the trimmed offset in the WAL header");
parser.addArgument("--skip-trimmed")
.dest("skipTrimmed")
.type(Boolean.class)
.setDefault(true)
.help("Whether to skip the record at the trimmed offset");
return parser;
}
}
}

0 comments on commit ebf8257

Please sign in to comment.