Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3stream/wal): a tool used to simulate recovery #960

Merged
merged 4 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,20 @@ public class BlockWALService implements WriteAheadLog {
private BlockWALService() {
}

/**
* A protected constructor for testing purpose.
*/
protected BlockWALService(BlockWALServiceBuilder builder) {
BlockWALService that = builder.build();
this.initialWindowSize = that.initialWindowSize;
this.walChannel = that.walChannel;
this.slidingWindowService = that.slidingWindowService;
this.walHeader = that.walHeader;
this.recoveryMode = that.recoveryMode;
this.nodeId = that.nodeId;
this.epoch = that.epoch;
}

public static BlockWALServiceBuilder builder(String path, long capacity) {
return new BlockWALServiceBuilder(path, capacity);
}
Expand Down Expand Up @@ -293,6 +307,13 @@ private long getCurrentStartOffset() {
}
}

/**
* Protected method for testing purpose.
*/
protected WALHeader tryReadWALHeader() {
return tryReadWALHeader(walChannel);
}

/**
* Try to read the header from WAL, return the latest one.
*/
Expand Down Expand Up @@ -792,7 +813,10 @@ public long getJumpNextRecoverOffset() {
}
}

class RecoverIterator implements Iterator<RecoverResult> {
/**
* Protected for testing purpose.
*/
protected class RecoverIterator implements Iterator<RecoverResult> {
private final long windowLength;
private final long skipRecordAtOffset;
private long nextRecoverOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* 8 - [4B] {@link WALHeader#crc8} CRC of the rest of the WAL header, used to verify the correctness of the
* WAL header
*/
class WALHeader {
public class WALHeader {
public static final int WAL_HEADER_MAGIC_CODE = 0x12345678;
public static final int WAL_HEADER_SIZE = 4 // magic code
+ 8 // capacity
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.wal.benchmark;

import com.automq.stream.s3.StreamRecordBatchCodec;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.wal.BlockWALService;
import com.automq.stream.s3.wal.WALHeader;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Iterator;
import java.util.function.Function;
import java.util.stream.StreamSupport;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;

import static com.automq.stream.s3.wal.benchmark.BenchTool.parseArgs;

/**
* RecoverTool is a tool to recover records in a WAL manually.
* It extends {@link BlockWALService} to use tools provided by {@link BlockWALService}
*/
public class RecoverTool extends BlockWALService implements AutoCloseable {

public RecoverTool(Config config) throws IOException {
super(BlockWALService.recoveryBuilder(config.path));
super.start();
}

public static void main(String[] args) throws IOException {
Namespace ns = parseArgs(Config.parser(), args);
Config config = new Config(ns);

try (RecoverTool tool = new RecoverTool(config)) {
tool.run(config);
}
}

private void run(Config config) {
WALHeader header = super.tryReadWALHeader();
System.out.println(header);

Iterable<RecoverResult> recordsSupplier = () -> recover(header, config);
Function<ByteBuf, StreamRecordBatch> decoder = StreamRecordBatchCodec::decode;
Function<ByteBuf, String> stringer = decoder.andThen(StreamRecordBatch::toString);
StreamSupport.stream(recordsSupplier.spliterator(), false)
.map(it -> new RecoverResultWrapper(it, stringer))
.peek(System.out::println)
.forEach(RecoverResultWrapper::release);
}

private Iterator<RecoverResult> recover(WALHeader header, Config config) {
long recoverOffset = config.offset != null ? config.offset : header.getTrimOffset();
long windowLength = header.getSlidingWindowMaxLength();
long skipRecordAtOffset = config.skipTrimmed ? header.getTrimOffset() : -1;
return new RecoverIterator(recoverOffset, windowLength, skipRecordAtOffset);
}

@Override
public void close() {
super.shutdownGracefully();
}

/**
* A wrapper for {@link RecoverResult} to provide a function to convert {@link RecoverResult#record} to string
*/
public static class RecoverResultWrapper {
private final RecoverResult inner;
/**
* A function to convert {@link RecoverResult#record} to string
*/
private final Function<ByteBuf, String> stringer;

public RecoverResultWrapper(RecoverResult inner, Function<ByteBuf, String> stringer) {
this.inner = inner;
this.stringer = stringer;
}

public void release() {
inner.record().release();
}

@Override
public String toString() {
return String.format("%s{", inner.getClass().getSimpleName())
+ String.format("record=(%d)", inner.record().readableBytes()) + stringer.apply(inner.record())
+ ", offset=" + inner.recordOffset()
+ '}';
}
}

public static class Config {
final String path;
final Long offset;
final Boolean skipTrimmed;

Config(Namespace ns) {
this.path = ns.getString("path");
this.offset = ns.getLong("offset");
this.skipTrimmed = ns.getBoolean("skipTrimmed");
}

static ArgumentParser parser() {
ArgumentParser parser = ArgumentParsers
.newFor("RecoverTool")
.build()
.defaultHelp(true)
.description("Recover records in a WAL file");
parser.addArgument("-p", "--path")
.required(true)
.help("Path of the WAL file");
parser.addArgument("--offset")
.type(Long.class)
.help("Offset to start recovering, default to the trimmed offset in the WAL header");
parser.addArgument("--skip-trimmed")
.dest("skipTrimmed")
.type(Boolean.class)
.setDefault(true)
.help("Whether to skip the record at the trimmed offset");
return parser;
}
}
}
Loading