Skip to content

Commit

Permalink
RATIS-1220. FileStore stream to send small packets for MappedByteBuff…
Browse files Browse the repository at this point in the history
…er and NettyFileRegion. (apache#338)

* RATIS-1220. FileStore stream to send small packets for MappedByteBuffer and NettyFileRegion.

* Fix a bug.

* Fix checkstyle

* Use closeAsync() instaed of try-with-resource.
  • Loading branch information
szetszwo authored and symious committed Feb 15, 2024
1 parent f83c66a commit 4b26f16
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
Expand Down Expand Up @@ -68,10 +67,6 @@ long getCommittedSize() {
"File " + getRelativePath() + " size is unknown.");
}

void flush() throws IOException {
// no-op
}

ByteString read(CheckedFunction<Path, Path, IOException> resolver, long offset, long length, boolean readCommitted)
throws IOException {
if (readCommitted && offset + length > getCommittedSize()) {
Expand Down Expand Up @@ -186,8 +181,7 @@ CompletableFuture<Integer> submitCreate(
+ close + ") @" + id + ":" + index;
final CheckedSupplier<Integer, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> {
if (out == null) {
out = new FileStore.FileStoreDataChannel(new RandomAccessFile(resolver.apply(getRelativePath()).toFile(),
"rw"));
out = new FileStore.FileStoreDataChannel(resolver.apply(getRelativePath()));
}
return write(0L, data, close, sync);
}, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,22 +259,25 @@ public CompletableFuture<FileStoreDataChannel> createDataChannel(String p) {
return CompletableFuture.supplyAsync(() -> {
try {
final Path full = resolve(normalize(p));
return new FileStoreDataChannel(new RandomAccessFile(full.toFile(), "rw"));
return new FileStoreDataChannel(full);
} catch (IOException e) {
throw new CompletionException("Failed to create " + p, e);
}
}, writer);
}

static class FileStoreDataChannel implements StateMachine.DataChannel {
private final Path path;
private final RandomAccessFile randomAccessFile;

FileStoreDataChannel(RandomAccessFile file) {
randomAccessFile = file;
FileStoreDataChannel(Path path) throws FileNotFoundException {
this.path = path;
this.randomAccessFile = new RandomAccessFile(path.toFile(), "rw");
}

@Override
public void force(boolean metadata) throws IOException {
LOG.debug("force({}) at {}", metadata, path);
randomAccessFile.getChannel().force(metadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public long write(String path, long offset, boolean close, ByteBuffer buffer, bo
return WriteReplyProto.parseFrom(reply).getLength();
}

public DataStreamOutput getStreamOutput(String path, int dataSize) {
public DataStreamOutput getStreamOutput(String path, long dataSize) {
final StreamWriteRequestProto header = StreamWriteRequestProto.newBuilder()
.setPath(ProtoUtils.toByteString(path))
.setLength(dataSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.ratis.examples.filestore.FileStoreClient;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufAllocator;
import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;

import java.io.File;
Expand All @@ -34,35 +34,74 @@
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;

/**
* Subcommand to generate load in filestore data stream state machine.
*/
@Parameters(commandDescription = "Load Generator for FileStore DataStream")
public class DataStream extends Client {
enum Type {
DirectByteBuffer(DirectByteBufferType::new),
MappedByteBuffer(MappedByteBufferType::new),
NettyFileRegion(NettyFileRegionType::new);

@Parameter(names = {"--type"}, description = "DirectByteBuffer, MappedByteBuffer, NettyFileRegion", required = true)
private String dataStreamType = "NettyFileRegion";
private final BiFunction<String, DataStream, TransferType> constructor;

Type(BiFunction<String, DataStream, TransferType> constructor) {
this.constructor = constructor;
}

BiFunction<String, DataStream, TransferType> getConstructor() {
return constructor;
}

static Type valueOfIgnoreCase(String s) {
for (Type type : values()) {
if (type.name().equalsIgnoreCase(s)) {
return type;
}
}
return null;
}
}

// To be used as a Java annotation attribute value
private static final String DESCRIPTION = "[DirectByteBuffer, MappedByteBuffer, NettyFileRegion]";

{
// Assert if the description is correct.
final String expected = Arrays.asList(Type.values()).toString();
Preconditions.assertTrue(expected.equals(DESCRIPTION),
() -> "Unexpected description: " + DESCRIPTION + " does not equal to the expected string " + expected);
}

@Parameter(names = {"--type"}, description = DESCRIPTION, required = true)
private String dataStreamType = Type.NettyFileRegion.name();

@Parameter(names = {"--syncSize"}, description = "Sync every syncSize, syncSize % bufferSize should be zero," +
"-1 means on sync", required = true)
private int syncSize = -1;

int getSyncSize() {
return syncSize;
}

private boolean checkParam() {
if (syncSize != -1 && syncSize % getBufferSizeInBytes() != 0) {
System.err.println("Error: syncSize % bufferSize should be zero");
return false;
}

if (!dataStreamType.equals("DirectByteBuffer") &&
!dataStreamType.equals("MappedByteBuffer") &&
!dataStreamType.equals("NettyFileRegion")) {
System.err.println("Error: dataStreamType should be one of DirectByteBuffer, MappedByteBuffer, transferTo");
if (Type.valueOfIgnoreCase(dataStreamType) == null) {
System.err.println("Error: dataStreamType should be one of " + DESCRIPTION);
return false;
}

Expand Down Expand Up @@ -101,18 +140,11 @@ private Map<String, List<CompletableFuture<DataStreamReply>>> streamWrite(
final long fileLength = file.length();
Preconditions.assertTrue(fileLength == getFileSizeInBytes(), "Unexpected file size: expected size is "
+ getFileSizeInBytes() + " but actual size is " + fileLength);
FileInputStream fis = new FileInputStream(file);
final DataStreamOutput dataStreamOutput = fileStoreClient.getStreamOutput(path, (int) file.length());

if (dataStreamType.equals("DirectByteBuffer")) {
fileMap.put(path, writeByDirectByteBuffer(dataStreamOutput, fis.getChannel()));
} else if (dataStreamType.equals("MappedByteBuffer")) {
fileMap.put(path, writeByMappedByteBuffer(dataStreamOutput, fis.getChannel()));
} else if (dataStreamType.equals("NettyFileRegion")) {
fileMap.put(path, writeByNettyFileRegion(dataStreamOutput, file));
}

dataStreamOutput.closeAsync();
final Type type = Optional.ofNullable(Type.valueOfIgnoreCase(dataStreamType))
.orElseThrow(IllegalStateException::new);
final TransferType writer = type.getConstructor().apply(path, this);
fileMap.put(path, writer.transfer(fileStoreClient));
}
return fileMap;
}
Expand All @@ -134,46 +166,126 @@ private long waitStreamFinish(Map<String, List<CompletableFuture<DataStreamReply
return totalBytes;
}

private List<CompletableFuture<DataStreamReply>> writeByDirectByteBuffer(DataStreamOutput dataStreamOutput,
FileChannel fileChannel) throws IOException {
final int fileSize = getFileSizeInBytes();
final int bufferSize = getBufferSizeInBytes();
if (fileSize <= 0) {
return Collections.emptyList();
abstract static class TransferType {
private final String path;
private final File file;
private final long fileSize;
private final int bufferSize;
private final long syncSize;
private long syncPosition = 0;

TransferType(String path, DataStream cli) {
this.path = path;
this.file = new File(path);
this.fileSize = cli.getFileSizeInBytes();
this.bufferSize = cli.getBufferSizeInBytes();
this.syncSize = cli.getSyncSize();

final long actualSize = file.length();
Preconditions.assertTrue(actualSize == fileSize, () -> "Unexpected file size: expected size is "
+ fileSize + " but actual size is " + actualSize + ", path=" + path);
}

File getFile() {
return file;
}

int getBufferSize() {
return bufferSize;
}

long getPacketSize(long offset) {
return Math.min(bufferSize, fileSize - offset);
}
List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
final ByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT;

for(long offset = 0L; offset < fileSize;) {
final ByteBuf buf = alloc.directBuffer(bufferSize);
final int bytesRead = buf.writeBytes(fileChannel, bufferSize);
boolean isSync(long position) {
if (syncSize > 0) {
if (position >= fileSize || position - syncPosition >= syncSize) {
syncPosition = position;
return true;
}
}
return false;
}

List<CompletableFuture<DataStreamReply>> transfer(FileStoreClient client) throws IOException {
if (fileSize <= 0) {
return Collections.emptyList();
}

final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
final DataStreamOutput out = client.getStreamOutput(path, fileSize);
try (FileInputStream fis = new FileInputStream(file)) {
final FileChannel in = fis.getChannel();
for (long offset = 0L; offset < fileSize; ) {
offset += write(in, out, offset, futures);
}
} catch (Throwable e) {
throw new IOException("Failed to transfer " + path);
} finally {
futures.add(out.closeAsync());
}
return futures;
}

abstract long write(FileChannel in, DataStreamOutput out, long offset,
List<CompletableFuture<DataStreamReply>> futures) throws IOException;

@Override
public String toString() {
return JavaUtils.getClassSimpleName(getClass()) + "{" + path + ", size=" + fileSize + "}";
}
}

static class DirectByteBufferType extends TransferType {
DirectByteBufferType(String path, DataStream cli) {
super(path, cli);
}

@Override
long write(FileChannel in, DataStreamOutput out, long offset, List<CompletableFuture<DataStreamReply>> futures)
throws IOException {
final int bufferSize = getBufferSize();
final ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize);
final int bytesRead = buf.writeBytes(in, bufferSize);
if (bytesRead < 0) {
throw new IllegalStateException("Failed to read " + fileSize
+ " byte(s). The channel has reached end-of-stream at " + offset);
throw new IllegalStateException("Failed to read " + bufferSize + " byte(s) from " + this
+ ". The channel has reached end-of-stream at " + offset);
} else if (bytesRead > 0) {
offset += bytesRead;
final CompletableFuture<DataStreamReply> f = dataStreamOutput.writeAsync(buf.nioBuffer(),
syncSize > 0 && (offset == fileSize || offset % syncSize == 0));
final CompletableFuture<DataStreamReply> f = out.writeAsync(buf.nioBuffer(), isSync(offset + bytesRead));
f.thenRun(buf::release);
futures.add(f);
}
return bytesRead;
}

return futures;
}

private List<CompletableFuture<DataStreamReply>> writeByMappedByteBuffer(DataStreamOutput dataStreamOutput,
FileChannel fileChannel) throws IOException {
List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, getFileSizeInBytes());
futures.add(dataStreamOutput.writeAsync(mappedByteBuffer));
return futures;
static class MappedByteBufferType extends TransferType {
MappedByteBufferType(String path, DataStream cli) {
super(path, cli);
}

@Override
long write(FileChannel in, DataStreamOutput out, long offset, List<CompletableFuture<DataStreamReply>> futures)
throws IOException {
final long packetSize = getPacketSize(offset);
final MappedByteBuffer mappedByteBuffer = in.map(FileChannel.MapMode.READ_ONLY, offset, packetSize);
final int remaining = mappedByteBuffer.remaining();
futures.add(out.writeAsync(mappedByteBuffer, isSync(offset + remaining)));
return remaining;
}
}

private List<CompletableFuture<DataStreamReply>> writeByNettyFileRegion(
DataStreamOutput dataStreamOutput, File file) {
List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
futures.add(dataStreamOutput.writeAsync(file));
return futures;
static class NettyFileRegionType extends TransferType {
NettyFileRegionType(String path, DataStream cli) {
super(path, cli);
}

@Override
long write(FileChannel in, DataStreamOutput out, long offset, List<CompletableFuture<DataStreamReply>> futures) {
final long packetSize = getPacketSize(offset);
futures.add(out.writeAsync(getFile(), offset, packetSize, isSync(offset + packetSize)));
return packetSize;
}
}
}

1 comment on commit 4b26f16

@symious
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Add TrasferType to split the file to packets, and let subclasses to implement write function.

Please sign in to comment.