* This stream also allows us to set a limit on how many bytes we can read
* without getting an exception.
*/
@@ -141,11 +149,9 @@ public long skip(long amt) throws IOException {
private final SegmentedRaftLogMetrics raftLogMetrics;
private final SizeInBytes maxOpSize;
- SegmentedRaftLogReader(File file, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics)
- throws FileNotFoundException {
+ SegmentedRaftLogReader(File file, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) throws IOException {
this.file = file;
- this.limiter = new LimitedInputStream(
- new BufferedInputStream(new FileInputStream(file)));
+ this.limiter = new LimitedInputStream(new BufferedInputStream(FileUtils.newInputStream(file)));
in = new DataInputStream(limiter);
checksum = new PureJavaCrc32C();
this.maxOpSize = maxOpSize;
@@ -154,13 +160,10 @@ public long skip(long amt) throws IOException {
/**
* Read header from the log file:
- *
* (1) The header in file is verified successfully.
* Then, return true.
- *
* (2) The header in file is partially written.
* Then, return false.
- *
* (3) The header in file is corrupted or there is some other {@link IOException}.
* Then, throw an exception.
*/
@@ -197,7 +200,7 @@ LogEntryProto readEntry() throws IOException {
final Timekeeper timekeeper = Optional.ofNullable(raftLogMetrics)
.map(SegmentedRaftLogMetrics::getReadEntryTimer)
.orElse(null);
- try(AutoCloseable readEntryContext = Timekeeper.start(timekeeper)) {
+ try(AutoCloseable ignored = Timekeeper.start(timekeeper)) {
return decodeEntry();
} catch (EOFException eof) {
in.reset();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
index dfed2790f7..65bfc8b809 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
@@ -21,12 +21,12 @@
import org.apache.ratis.proto.RaftProtos.FileChunkProto;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
+import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import java.io.Closeable;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
@@ -57,13 +57,19 @@ public FileChunkReader(FileInfo info, Path relativePath) throws IOException {
final File f = info.getPath().toFile();
if (info.getFileDigest() == null) {
digester = MD5Hash.getDigester();
- this.in = new DigestInputStream(new FileInputStream(f), digester);
+ this.in = new DigestInputStream(FileUtils.newInputStream(f), digester);
} else {
digester = null;
- this.in = new FileInputStream(f);
+ this.in = FileUtils.newInputStream(f);
}
}
+ static ByteString readFileChunk(int chunkLength, InputStream in) throws IOException {
+ final byte[] chunkBuffer = new byte[chunkLength];
+ IOUtils.readFully(in, chunkBuffer, 0, chunkBuffer.length);
+ return UnsafeByteOperations.unsafeWrap(chunkBuffer);
+ }
+
/**
* Read the next chunk.
*
@@ -74,9 +80,7 @@ public FileChunkReader(FileInfo info, Path relativePath) throws IOException {
public FileChunkProto readFileChunk(int chunkMaxSize) throws IOException {
final long remaining = info.getFileSize() - offset;
final int chunkLength = remaining < chunkMaxSize ? (int) remaining : chunkMaxSize;
- final byte[] chunkBuffer = new byte[chunkLength];
- IOUtils.readFully(in, chunkBuffer, 0, chunkBuffer.length);
- final ByteString data = UnsafeByteOperations.unsafeWrap(chunkBuffer);
+ final ByteString data = readFileChunk(chunkLength, in);
// whether this chunk is the last chunk of current file
final boolean isDone = offset + chunkLength == info.getFileSize();
final ByteString fileDigest;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
index 4fcf46389c..fbb7bf7d46 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
@@ -17,21 +17,23 @@
*/
package org.apache.ratis.server.storage;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.NoSuchFileException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.storage.RaftStorageDirectoryImpl.StorageState;
+import org.apache.ratis.util.AtomicFileOutputStream;
+import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.SizeInBytes;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.nio.file.Files;
import java.util.Optional;
/** The storage of a {@link org.apache.ratis.server.RaftServer}. */
@@ -98,7 +100,7 @@ private void format() throws IOException {
}
private void cleanMetaTmpFile() throws IOException {
- Files.deleteIfExists(storageDir.getMetaTmpFile().toPath());
+ FileUtils.deleteIfExists(storageDir.getMetaTmpFile());
}
private StorageState analyzeAndRecoverStorage(boolean toLock) throws IOException {
@@ -142,7 +144,7 @@ public RaftStorageMetadataFile getMetadataFile() {
public void writeRaftConfiguration(LogEntryProto conf) {
File confFile = storageDir.getMetaConfFile();
- try (FileOutputStream fio = new FileOutputStream(confFile)) {
+ try (OutputStream fio = new AtomicFileOutputStream(confFile)) {
conf.writeTo(fio);
} catch (Exception e) {
LOG.error("Failed writing configuration to file:" + confFile, e);
@@ -151,10 +153,10 @@ public void writeRaftConfiguration(LogEntryProto conf) {
public RaftConfiguration readRaftConfiguration() {
File confFile = storageDir.getMetaConfFile();
- try (FileInputStream fio = new FileInputStream(confFile)) {
+ try (InputStream fio = FileUtils.newInputStream(confFile)) {
LogEntryProto confProto = LogEntryProto.newBuilder().mergeFrom(fio).build();
return LogProtoUtils.toRaftConfiguration(confProto);
- } catch (FileNotFoundException e) {
+ } catch (FileNotFoundException | NoSuchFileException e) {
return null;
} catch (Exception e) {
LOG.error("Failed reading configuration from file:" + confFile, e);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFileImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFileImpl.java
index 1fb723f811..896a5f21e6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFileImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFileImpl.java
@@ -20,12 +20,12 @@
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.util.AtomicFileOutputStream;
import org.apache.ratis.util.ConcurrentUtils;
+import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
@@ -110,7 +110,7 @@ static RaftStorageMetadata load(File file) throws IOException {
return RaftStorageMetadata.getDefault();
}
try(BufferedReader br = new BufferedReader(new InputStreamReader(
- new FileInputStream(file), StandardCharsets.UTF_8))) {
+ FileUtils.newInputStream(file), StandardCharsets.UTF_8))) {
Properties properties = new Properties();
properties.load(br);
return RaftStorageMetadata.valueOf(getTerm(properties), getVotedFor(properties));
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
index 8d291acb28..c49a86ec59 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -27,7 +27,6 @@
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.util.FileUtils;
-import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MD5FileUtil;
import org.apache.ratis.util.MemoizedSupplier;
@@ -37,11 +36,12 @@
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileOutputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
-import java.security.DigestOutputStream;
+import java.nio.file.StandardOpenOption;
import java.security.MessageDigest;
import java.util.Optional;
import java.util.function.Function;
@@ -77,6 +77,28 @@ public class SnapshotManager {
new File(dir.get().getRoot(), c.getFilename()).toPath()).toString();
}
+ private FileChannel open(FileChunkProto chunk, File tmpSnapshotFile) throws IOException {
+ final FileChannel out;
+ final boolean exists = tmpSnapshotFile.exists();
+ if (chunk.getOffset() == 0) {
+ // if offset is 0, delete any existing temp snapshot file if it has the same last index.
+ if (exists) {
+ FileUtils.deleteFully(tmpSnapshotFile);
+ }
+ // create the temp snapshot file and put padding inside
+ out = FileUtils.newFileChannel(tmpSnapshotFile, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
+ digester.get().reset();
+ } else {
+ if (!exists) {
+ throw new FileNotFoundException("Chunk offset is non-zero but file is not found: " + tmpSnapshotFile
+ + ", chunk=" + chunk);
+ }
+ out = FileUtils.newFileChannel(tmpSnapshotFile, StandardOpenOption.WRITE)
+ .position(chunk.getOffset());
+ }
+ return out;
+ }
+
public void installSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine) throws IOException {
final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk();
final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex();
@@ -103,30 +125,15 @@ public void installSnapshot(InstallSnapshotRequestProto request, StateMachine st
final File tmpSnapshotFile = new File(tmpDir, getRelativePath.apply(chunk));
FileUtils.createDirectoriesDeleteExistingNonDirectory(tmpSnapshotFile.getParentFile());
- FileOutputStream out = null;
- try {
- // if offset is 0, delete any existing temp snapshot file if it has the
- // same last index.
- if (chunk.getOffset() == 0) {
- if (tmpSnapshotFile.exists()) {
- FileUtils.deleteFully(tmpSnapshotFile);
- }
- // create the temp snapshot file and put padding inside
- out = new FileOutputStream(tmpSnapshotFile);
- digester.get().reset();
- } else {
- Preconditions.assertTrue(tmpSnapshotFile.exists());
- out = new FileOutputStream(tmpSnapshotFile, true);
- FileChannel fc = out.getChannel();
- fc.position(chunk.getOffset());
- }
+ try (FileChannel out = open(chunk, tmpSnapshotFile)) {
+ final ByteBuffer data = chunk.getData().asReadOnlyByteBuffer();
+ digester.get().update(data.duplicate());
- // write data to the file
- try (DigestOutputStream digestOut = new DigestOutputStream(out, digester.get())) {
- digestOut.write(chunk.getData().toByteArray());
+ int written = 0;
+ for(; data.remaining() > 0; ) {
+ written += out.write(data);
}
- } finally {
- IOUtils.cleanup(null, out);
+ Preconditions.assertSame(chunk.getData().size(), written, "written");
}
// rename the temp snapshot file if this is the last chunk. also verify
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index dcb54be3d8..959cb3fc04 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -32,13 +32,15 @@
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.util.CollectionUtils;
+import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.Timestamp;
import org.apache.ratis.util.function.CheckedConsumer;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
-import java.io.FileInputStream;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
@@ -151,8 +153,8 @@ static CheckedConsumer