Skip to content

Commit

Permalink
feat: add an option to specify the start point of recovery
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 committed Mar 12, 2024
1 parent 73f0411 commit e74266f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,10 @@ public long getJumpNextRecoverOffset() {
}
}

class RecoverIterator implements Iterator<RecoverResult> {
/**
* Protected for testing purpose.
*/
protected class RecoverIterator implements Iterator<RecoverResult> {
private final long windowLength;
private final long skipRecordAtOffset;
private long nextRecoverOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.automq.stream.s3.wal.WALHeader;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Iterator;
import java.util.function.Function;
import java.util.stream.StreamSupport;
import net.sourceforge.argparse4j.ArgumentParsers;
Expand All @@ -41,22 +42,30 @@ public static void main(String[] args) throws IOException {
Config config = new Config(ns);

try (RecoverTool tool = new RecoverTool(config)) {
tool.run();
tool.run(config);
}
}

private void run() {
private void run(Config config) {
WALHeader header = super.tryReadWALHeader();
System.out.println(header);

Iterable<RecoverResult> recordsSupplier = super::recover;
Iterable<RecoverResult> recordsSupplier = () -> recover(header, config);
Function<ByteBuf, StreamRecordBatch> decoder = StreamRecordBatchCodec::decode;
Function<ByteBuf, String> stringer = decoder.andThen(StreamRecordBatch::toString);
StreamSupport.stream(recordsSupplier.spliterator(), false)
.map(it -> new RecoverResultWrapper(it, decoder.andThen(StreamRecordBatch::toString)))
.map(it -> new RecoverResultWrapper(it, stringer))
.peek(System.out::println)
.forEach(RecoverResultWrapper::release);
}

private Iterator<RecoverResult> recover(WALHeader header, Config config) {
long recoverOffset = config.offset != null ? config.offset : header.getTrimOffset();
long windowLength = header.getSlidingWindowMaxLength();
long skipRecordAtOffset = config.skipTrimmed ? header.getTrimOffset() : -1;
return new RecoverIterator(recoverOffset, windowLength, skipRecordAtOffset);
}

@Override
public void close() {
super.shutdownGracefully();
Expand Down Expand Up @@ -92,9 +101,13 @@ public String toString() {

public static class Config {
final String path;
final Long offset;
final Boolean skipTrimmed;

Config(Namespace ns) {
this.path = ns.getString("path");
this.offset = ns.getLong("offset");
this.skipTrimmed = ns.getBoolean("skipTrimmed");
}

static ArgumentParser parser() {
Expand All @@ -106,6 +119,14 @@ static ArgumentParser parser() {
parser.addArgument("-p", "--path")
.required(true)
.help("Path of the WAL file");
parser.addArgument("--offset")
.type(Long.class)
.help("Offset to start recovering, default to the trimmed offset in the WAL header");
parser.addArgument("--skip-trimmed")
.dest("skipTrimmed")
.type(Boolean.class)
.setDefault(true)
.help("Whether to skip the record at the trimmed offset");
return parser;
}
}
Expand Down

0 comments on commit e74266f

Please sign in to comment.