Skip to content

Commit

Permalink
WIP reuse opened file in worker
Browse files Browse the repository at this point in the history
  • Loading branch information
elega committed Oct 17, 2023
1 parent 3e0369a commit 4271e79
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,30 @@ public DataFileChannel getDataFileChannel(
if (pageOffset + bytesToRead > fileLength) {
bytesToRead = (int) (fileLength - (long) pageOffset);
}

DataFileChannel dataFileChannel = new DataFileChannel(pageFile, pageOffset, bytesToRead);
DataFileChannel lastDataFileChannel = mLastDataFileChannel;
final RandomAccessFile raf;
if (lastDataFileChannel != null && lastDataFileChannel.mPageIndex == pageId.getPageIndex()) {
raf = lastDataFileChannel.mFile;
} else {
try {
raf = new RandomAccessFile(pageFile, "r") {
@Override
public void close() throws IOException {
super.close();
}
};
} catch (Exception e) {
throw new RuntimeException(e);
}
}
DataFileChannel dataFileChannel = new DataFileChannel(
pageId.getPageIndex(), raf, pageOffset, bytesToRead);
mLastDataFileChannel = dataFileChannel;
return dataFileChannel;
}

volatile DataFileChannel mLastDataFileChannel;

@Override
public void close() {
// no-op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*/
public final class CompositeDataBuffer implements DataBuffer {

private final List<DataBuffer> mDataBufferList;
public final List<DataBuffer> mDataBufferList;

/**
* CompositeDataBuffer wraps multiple {@link DataBuffer}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,40 @@
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

/**
* A DataBuffer with the underlying data being a {@link FileChannel}.
*/
public final class DataFileChannel implements DataBuffer {
private final File mFile;
public final RandomAccessFile mFile;
private final long mOffset;
private final long mLength;
public final long mPageIndex;

/**
*
* @param file The file
* @param offset The offset into the FileChannel
* @param length The length of the data to read
*/
public DataFileChannel(File file, long offset, long length) {
public DataFileChannel(long pageIndex, RandomAccessFile file, long offset, long length) {
mFile = Preconditions.checkNotNull(file, "file");
try {
mFile.seek(offset);
} catch (Exception e) {
throw new RuntimeException(e);
}
mOffset = offset;
mLength = length;
mPageIndex = pageIndex;
}

@Override
public Object getNettyOutput() {
return new DefaultFileRegion(mFile, mOffset, mLength);
return new DefaultFileRegion(mFile.getChannel(), mOffset, mLength);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ protected void runPlainPath(AlluxioURI path, CommandLine cl)
if (status.isFolder()) {
throw new FileDoesNotExistException(ExceptionMessage.PATH_MUST_BE_FILE.getMessage(path));
}
byte[] buf = new byte[Constants.MB];
byte[] buf = new byte[Constants.MB * 16];
try (FileInStream is = mFileSystem.openFile(path)) {
int read = is.read(buf);
while (read != -1) {
System.out.write(buf, 0, read);
// System.out.write(buf, 0, read);
read = is.read(buf);
}
}
Expand Down

0 comments on commit 4271e79

Please sign in to comment.