Skip to content

Discussion: Improve DirectIO Directory for Java 24/25 #14928

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.foreign.Arena;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
Expand Down Expand Up @@ -117,7 +118,8 @@ public class DirectIODirectory extends FilterDirectory {
* Create a new DirectIODirectory for the named location.
*
* @param delegate Directory for non-merges, also used as reference to file system path.
* @param mergeBufferSize Size of buffer to use for merging.
* @param mergeBufferSize Size of buffer to use for merging. The buffer size will be rounded up to
* next multiple of filesystem's block size
* @param minBytesDirect Merges, or files to be opened for reading, smaller than this will not use
* direct IO. See {@link #DEFAULT_MIN_BYTES_DIRECT} and {@link #useDirectIO}.
* @throws IOException If there is a low-level I/O error
Expand All @@ -126,7 +128,9 @@ public DirectIODirectory(FSDirectory delegate, int mergeBufferSize, long minByte
throws IOException {
super(delegate);
this.blockSize = Math.toIntExact(Files.getFileStore(delegate.getDirectory()).getBlockSize());
this.mergeBufferSize = mergeBufferSize;
// ensure that the size of mergeBuffer is a multiple of the blockSize:
this.mergeBufferSize =
((mergeBufferSize + this.blockSize - 1) / this.blockSize) * this.blockSize;
this.minBytesDirect = minBytesDirect;
}

Expand Down Expand Up @@ -209,6 +213,7 @@ private static OpenOption getDirectOpenOption() {
}

private static final class DirectIOIndexOutput extends IndexOutput {
private final Arena arena;
private final ByteBuffer buffer;
private final FileChannel channel;
private final Checksum digest;
Expand All @@ -231,7 +236,8 @@ public DirectIOIndexOutput(Path path, String name, int blockSize, int bufferSize
channel =
FileChannel.open(
path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW, getDirectOpenOption());
buffer = ByteBuffer.allocateDirect(bufferSize + blockSize - 1).alignedSlice(blockSize);
arena = Arena.ofConfined();
buffer = arena.allocate(bufferSize, blockSize).asByteBuffer();
digest = new BufferedChecksum(new CRC32());

isOpen = true;
Expand Down Expand Up @@ -294,7 +300,8 @@ public void close() throws IOException {
try {
dump();
} finally {
try (FileChannel ch = channel) {
try (FileChannel ch = channel;
Arena _ = arena) {
ch.truncate(getFilePointer());
}
}
Expand All @@ -303,6 +310,7 @@ public void close() throws IOException {
}

private static final class DirectIOIndexInput extends IndexInput {
private final Arena arena;
private final ByteBuffer buffer;
private final FileChannel channel;
private final int blockSize;
Expand All @@ -324,7 +332,8 @@ public DirectIOIndexInput(Path path, int blockSize, int bufferSize) throws IOExc
super("DirectIOIndexInput(path=\"" + path + "\")");
this.channel = FileChannel.open(path, StandardOpenOption.READ, getDirectOpenOption());
this.blockSize = blockSize;
this.buffer = allocateBuffer(bufferSize, blockSize);
this.arena = Arena.ofAuto(); // fix this to be confined
this.buffer = allocateBuffer(arena, bufferSize, blockSize);
this.isOpen = true;
this.isClosable = true;
this.length = channel.size();
Expand All @@ -339,7 +348,8 @@ private DirectIOIndexInput(
super(description);
Objects.checkFromIndexSize(offset, length, other.channel.size());
final int bufferSize = other.buffer.capacity();
this.buffer = allocateBuffer(bufferSize, other.blockSize);
this.arena = Arena.ofAuto(); // fix this to be confined
this.buffer = allocateBuffer(arena, bufferSize, other.blockSize);
this.blockSize = other.blockSize;
this.channel = other.channel;
this.isOpen = true;
Expand All @@ -350,16 +360,14 @@ private DirectIOIndexInput(
buffer.limit(0);
}

private static ByteBuffer allocateBuffer(int bufferSize, int blockSize) {
return ByteBuffer.allocateDirect(bufferSize + blockSize - 1)
.alignedSlice(blockSize)
.order(LITTLE_ENDIAN);
private static ByteBuffer allocateBuffer(Arena arena, int bufferSize, int blockSize) {
return arena.allocate(bufferSize, blockSize).asByteBuffer().order(LITTLE_ENDIAN);
}

@Override
public void close() throws IOException {
if (isOpen && isClosable) {
channel.close();
try (FileChannel _ = channel; /* NOT YET: Arena _ = arena */ ) {}
isOpen = false;
}
}
Expand Down
Loading