Skip to content

Commit

Permalink
Add MetadataGen.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed Sep 27, 2023
1 parent d461a01 commit 5a64f73
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.MemoizedCheckedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TaskQueue;
import org.apache.ratis.util.function.CheckedFunction;
Expand Down Expand Up @@ -59,12 +60,17 @@ Path getRelativePath() {

long getWriteSize() {
throw new UnsupportedOperationException(
"File " + getRelativePath() + " size is unknown.");
"File " + getRelativePath() + " writeSize is unknown.");
}

long getCommittedSize() {
throw new UnsupportedOperationException(
"File " + getRelativePath() + " size is unknown.");
"File " + getRelativePath() + " committedSize is unknown.");
}

boolean hasFile() {
throw new UnsupportedOperationException(
"File " + getRelativePath() + " hasFile is unknown.");
}

ByteString read(CheckedFunction<Path, Path, IOException> resolver, long offset, long length, boolean readCommitted)
Expand All @@ -79,6 +85,10 @@ ByteString read(CheckedFunction<Path, Path, IOException> resolver, long offset,
+ ", path=" + getRelativePath());
}

if (length == 0) {
return ByteString.EMPTY;
}

try(SeekableByteChannel in = Files.newByteChannel(
resolver.apply(getRelativePath()), StandardOpenOption.READ)) {
final ByteBuffer buffer = ByteBuffer.allocateDirect(FileStoreCommon.getChunkSize(length));
Expand Down Expand Up @@ -114,11 +124,13 @@ CompletableFuture<UnderConstruction> complete(UnderConstruction uc) {
static class ReadOnly extends FileInfo {
private final long committedSize;
private final long writeSize;
private final boolean hasFile;

ReadOnly(UnderConstruction f) {
super(f.getRelativePath());
this.committedSize = f.getCommittedSize();
this.writeSize = f.getWriteSize();
this.hasFile = f.hasFile();
}

@Override
Expand All @@ -130,6 +142,11 @@ long getCommittedSize() {
long getWriteSize() {
return writeSize;
}

@Override
boolean hasFile() {
return hasFile;
}
}

static class WriteInfo {
Expand Down Expand Up @@ -160,21 +177,22 @@ long getPreviousIndex() {
}

static class UnderConstruction extends FileInfo {
private FileStore.FileStoreDataChannel out;
private final MemoizedCheckedSupplier<FileStore.FileStoreDataChannel, IOException> channel;

/** The size written to a local file. */
private volatile long writeSize;
/** The size committed to client. */
private volatile long committedSize;
private final AtomicLong committedSize = new AtomicLong();

/** A queue to make sure that the writes are in order. */
private final TaskQueue writeQueue = new TaskQueue("writeQueue");
private final Map<Long, WriteInfo> writeInfos = new ConcurrentHashMap<>();

private final AtomicLong lastWriteIndex = new AtomicLong(-1L);

UnderConstruction(Path relativePath) {
UnderConstruction(Path relativePath, CheckedSupplier<FileStore.FileStoreDataChannel, IOException> supplyChannel) {
super(relativePath);
this.channel = MemoizedCheckedSupplier.valueOf(supplyChannel);
}

@Override
Expand All @@ -184,23 +202,29 @@ UnderConstruction asUnderConstruction() {

@Override
long getCommittedSize() {
return committedSize;
return committedSize.get();
}

@Override
long getWriteSize() {
return writeSize;
}

@Override
boolean hasFile() {
return channel.isInitialized();
}

CompletableFuture<Integer> submitCreate(
CheckedFunction<Path, Path, IOException> resolver, ByteString data, boolean close, boolean sync,
ExecutorService executor, RaftPeerId id, long index) {
ByteString data, boolean close, boolean sync,
ExecutorService executor, RaftPeerId id, long index, boolean skipEmptyFile) {
final Supplier<String> name = () -> "create(" + getRelativePath() + ", "
+ close + ") @" + id + ":" + index;
final CheckedSupplier<Integer, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> {
if (out == null) {
out = new FileStore.FileStoreDataChannel(resolver.apply(getRelativePath()));
if (data.isEmpty() && skipEmptyFile) {
return 0;
}
channel.get();
return write(0L, data, close, sync);
}, name);
return submitWrite(task, executor, id, index);
Expand Down Expand Up @@ -238,13 +262,14 @@ private int write(long offset, ByteString data, boolean close, boolean sync) thr
throw new IOException("Offset/size mismatched: offset = " + offset
+ " != writeSize = " + writeSize + ", path=" + getRelativePath());
}
if (out == null) {
if (!channel.isInitialized()) {
throw new IOException("File output is not initialized, path=" + getRelativePath());
}

final FileStore.FileStoreDataChannel out = channel.get();
synchronized (out) {
int n = 0;
if (data != null) {
if (data != null && !data.isEmpty()) {
final ByteBuffer buffer = data.asReadOnlyByteBuffer();
try {
for (; buffer.remaining() > 0; ) {
Expand Down Expand Up @@ -280,15 +305,18 @@ CompletableFuture<Integer> submitCommit(
}

final CheckedSupplier<Integer, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> {
if (offset != committedSize) {
throw new IOException("Offset/size mismatched: offset = "
+ offset + " != committedSize = " + committedSize
final long oldCommittedSize = getCommittedSize();
if (offset != oldCommittedSize) {
throw new IOException("Offset/size mismatched: offset = " + offset
+ " != committedSize = " + oldCommittedSize
+ ", path=" + getRelativePath());
} else if (committedSize + size > writeSize) {
throw new IOException("Offset/size mismatched: committed (=" + committedSize
}
final long newCommittedSize = oldCommittedSize + size;
if (newCommittedSize > writeSize) {
throw new IOException("Offset/size mismatched: committed (=" + oldCommittedSize
+ ") + size (=" + size + ") > writeSize = " + writeSize);
}
committedSize += size;
committedSize.compareAndSet(oldCommittedSize, newCommittedSize);

if (close) {
ReadOnly ignored = closeFunction.apply(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ CompletableFuture<Path> delete(long index, String relative) {
final Supplier<String> name = () -> "delete(" + relative + ") @" + getId() + ":" + index;
final CheckedSupplier<Path, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> {
final FileInfo info = files.remove(relative);
FileUtils.delete(resolve(info.getRelativePath()));
if (info.hasFile()) {
FileUtils.delete(resolve(info.getRelativePath()));
}
return info.getRelativePath();
}, name);
return submit(task, deleter);
Expand Down Expand Up @@ -251,14 +253,15 @@ CompletableFuture<WriteReplyProto> submitCommit(
}

CompletableFuture<Integer> write(
long index, String relative, boolean close, boolean sync, long offset, ByteString data) {
long index, String relative, boolean close, boolean sync, long offset, ByteString data, boolean skipEmptyFile) {
final int size = data != null? data.size(): 0;
LOG.trace("write {}, offset={}, size={}, close? {} @{}:{}",
relative, offset, size, close, getId(), index);
final boolean createNew = offset == 0L;
final UnderConstruction uc;
if (createNew) {
uc = new UnderConstruction(normalize(relative));
final Path normalized = normalize(relative);
uc = new UnderConstruction(normalized, () -> new FileStore.FileStoreDataChannel(resolve(normalized)));
files.putNew(uc);
} else {
try {
Expand All @@ -270,7 +273,7 @@ CompletableFuture<Integer> write(
}

return size == 0 && !close? CompletableFuture.completedFuture(0)
: createNew? uc.submitCreate(this::resolve, data, close, sync, writer, getId(), index)
: createNew? uc.submitCreate(data, close, sync, writer, getId(), index, skipEmptyFile)
: uc.submitWrite(offset, data, close, sync, writer, getId(), index);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.Empty;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
Expand Down Expand Up @@ -186,6 +187,10 @@ public DataStreamOutput getStreamOutput(String path, long dataSize, RoutingTable
return client.getDataStreamApi().stream(request.toByteString().asReadOnlyByteBuffer(), routingTable);
}

public CompletableFuture<Long> writeAsync(String path) {
return writeAsync(path, 0L, true, ByteString.EMPTY.asReadOnlyByteBuffer(), false);
}

public CompletableFuture<Long> writeAsync(String path, long offset, boolean close, ByteBuffer buffer, boolean sync) {
return writeImpl(this::sendAsync, path, offset, close, buffer, sync
).thenApply(reply -> JavaUtils.supplyAndWrapAsCompletionException(
Expand All @@ -196,16 +201,17 @@ private static <OUTPUT, THROWABLE extends Throwable> OUTPUT writeImpl(
CheckedFunction<Message, OUTPUT, THROWABLE> sendFunction,
String path, long offset, boolean close, ByteBuffer data, boolean sync)
throws THROWABLE {
final int length = data.remaining();
final WriteRequestHeaderProto.Builder header = WriteRequestHeaderProto.newBuilder()
.setPath(ProtoUtils.toByteString(path))
.setOffset(offset)
.setLength(data.remaining())
.setLength(length)
.setClose(close)
.setSync(sync);

final WriteRequestProto.Builder write = WriteRequestProto.newBuilder()
.setHeader(header)
.setData(ByteString.copyFrom(data));
.setData(length == 0? ByteString.EMPTY: ByteString.copyFrom(data));

final FileStoreRequestProto request = FileStoreRequestProto.newBuilder().setWrite(write).build();
return sendFunction.apply(Message.valueOf(request));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,45 +109,40 @@ public TransactionContext startTransaction(RaftClientRequest request) throws IOE
return b.build();
}

@Override
public CompletableFuture<Integer> write(LogEntryProto entry) {
final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry();
final ByteString data = smLog.getLogData();
static WriteRequestHeaderProto getWriteRequestHeaderProto(LogEntryProto entry) {
final ByteString data = entry.getStateMachineLogEntry().getLogData();
final FileStoreRequestProto proto;
try {
proto = FileStoreRequestProto.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
return FileStoreCommon.completeExceptionally(
entry.getIndex(), "Failed to parse data, entry=" + entry, e);
throw new IllegalArgumentException("Failed to parse data in entry " + entry, e);
}
if (proto.getRequestCase() != FileStoreRequestProto.RequestCase.WRITEHEADER) {
return proto.getRequestCase() == FileStoreRequestProto.RequestCase.WRITEHEADER ?
proto.getWriteHeader() : null;
}

@Override
public CompletableFuture<Integer> write(LogEntryProto entry) {
final WriteRequestHeaderProto h = getWriteRequestHeaderProto(entry);
if (h == null) {
return null;
}

final WriteRequestHeaderProto h = proto.getWriteHeader();
final CompletableFuture<Integer> f = files.write(entry.getIndex(),
h.getPath().toStringUtf8(), h.getClose(), h.getSync(), h.getOffset(),
smLog.getStateMachineEntry().getStateMachineData());
entry.getStateMachineLogEntry().getStateMachineEntry().getStateMachineData(),
h.getSkipEmptyFile());
// sync only if closing the file
return h.getClose()? f: null;
}

@Override
public CompletableFuture<ByteString> read(LogEntryProto entry) {
final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry();
final ByteString data = smLog.getLogData();
final FileStoreRequestProto proto;
try {
proto = FileStoreRequestProto.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
return FileStoreCommon.completeExceptionally(
entry.getIndex(), "Failed to parse data, entry=" + entry, e);
}
if (proto.getRequestCase() != FileStoreRequestProto.RequestCase.WRITEHEADER) {
final WriteRequestHeaderProto h = getWriteRequestHeaderProto(entry);
if (h == null) {
return null;
}

final WriteRequestHeaderProto h = proto.getWriteHeader();
CompletableFuture<ExamplesProtos.ReadReplyProto> reply =
files.read(h.getPath().toStringUtf8(), h.getOffset(), h.getLength(), false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Client to connect filestore example cluster.
Expand Down Expand Up @@ -179,15 +180,21 @@ private CompletableFuture<Long> writeFileAsync(String path, ExecutorService exec
return future;
}

protected List<String> generateFiles(ExecutorService executor) {
UUID uuid = UUID.randomUUID();
protected List<String> generatePaths() {
final UUID uuid = UUID.randomUUID();
List<String> paths = new ArrayList<>();
List<CompletableFuture<Long>> futures = new ArrayList<>();
for (int i = 0; i < numFiles; i ++) {
String path = getPath("file-" + uuid + "-" + i);
paths.add(path);
futures.add(writeFileAsync(path, executor));
}
return paths;
}

protected List<String> generateFiles(ExecutorService executor) {
final List<String> paths = generatePaths();
final List<CompletableFuture<Long>> futures = paths.stream()
.map(p -> writeFileAsync(p, executor))
.collect(Collectors.toList());

for (int i = 0; i < futures.size(); i ++) {
long size = futures.get(i).join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.ratis.examples.common.SubCommandBase;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
Expand All @@ -31,10 +31,11 @@ private FileStore() {
}

public static List<SubCommandBase> getSubCommands() {
List<SubCommandBase> commands = new ArrayList<>();
commands.add(new Server());
commands.add(new LoadGen());
commands.add(new DataStream());
return commands;
return Arrays.asList(
new Server(),
new LoadGen(),
new MetadataGen(),
new DataStream()
);
}
}
Loading

0 comments on commit 5a64f73

Please sign in to comment.