Skip to content

Commit

Permalink
finish
Browse files Browse the repository at this point in the history
Signed-off-by: OneSizeFitQuorum <[email protected]>
  • Loading branch information
OneSizeFitsQuorum committed Jul 5, 2023
1 parent 85e5e20 commit 1f2dc7b
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ static void shutdownManagedChannel(ManagedChannel managedChannel) {
if (!managedChannel.awaitTermination(2, TimeUnit.SECONDS)) {
LOG.warn("Timed out forcefully shutting down connection: {}. ", managedChannel);
}
}catch (InterruptedException e) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
LOG.error("Unexpected exception while waiting for channel termination", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.impl.ServerImplUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.thirdparty.com.google.protobuf.AbstractMessage;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
Expand Down Expand Up @@ -52,8 +53,9 @@ public static String toLogEntryString(LogEntryProto entry, Function<StateMachine
s = "(c:" + metadata.getCommitIndex() + ")";
} else if (entry.hasConfigurationEntry()) {
final RaftConfigurationProto config = entry.getConfigurationEntry();
s = "(current:" + config.getPeersList().stream().map(p -> p.toString()).collect(Collectors.joining(",")) +
", old:" + config.getOldPeersList().stream().map(p -> p.toString()).collect(Collectors.joining(",")) + ")";
s = "(current:" + config.getPeersList().stream().map(AbstractMessage::toString).collect(Collectors.joining(",")) +
", old:" + config.getOldPeersList().stream().map(AbstractMessage::toString).collect(Collectors.joining(","))
+ ")";
} else {
s = "";
}
Expand All @@ -71,7 +73,7 @@ public static String toLogEntriesString(List<LogEntryProto> entries) {

public static String toLogEntriesShortString(List<LogEntryProto> entries) {
return entries == null ? null
: entries.size() == 0 ? "<empty>"
: entries.isEmpty()? "<empty>"
: "size=" + entries.size() + ", first=" + LogProtoUtils.toLogEntryString(entries.get(0));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.ratis.server.raftlog;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.RaftGroupMemberId;
Expand Down Expand Up @@ -78,7 +79,7 @@ public abstract class RaftLogBase implements RaftLog {
private final TimeDuration stateMachineDataReadTimeout;
private final long purgePreservation;

private volatile LogEntryProto lastMetadataEntry = null;
private final AtomicReference<LogEntryProto> lastMetadataEntry = new AtomicReference<>();

protected RaftLogBase(RaftGroupMemberId memberId,
LongSupplier getSnapshotIndexFromStateMachine,
Expand Down Expand Up @@ -207,15 +208,15 @@ private long appendMetadataImpl(long term, long newCommitIndex) {
entry = LogProtoUtils.toLogEntryProto(newCommitIndex, term, nextIndex);
appendEntry(entry);
}
lastMetadataEntry = entry;
lastMetadataEntry.set(entry);
return nextIndex;
}

private boolean shouldAppendMetadata(long newCommitIndex) {
if (newCommitIndex <= 0) {
// do not log the first conf entry
return false;
} else if (Optional.ofNullable(lastMetadataEntry)
} else if (Optional.ofNullable(lastMetadataEntry.get())
.filter(e -> e.getIndex() == newCommitIndex || e.getMetadataEntry().getCommitIndex() >= newCommitIndex)
.isPresent()) {
//log neither lastMetadataEntry, nor entries with a smaller commit index.
Expand Down Expand Up @@ -250,12 +251,12 @@ private long appendImpl(long term, RaftConfiguration newConf) {
public final void open(long lastIndexInSnapshot, Consumer<LogEntryProto> consumer) throws IOException {
openImpl(lastIndexInSnapshot, e -> {
if (e.hasMetadataEntry()) {
lastMetadataEntry = e;
lastMetadataEntry.set(e);
} else if (consumer != null) {
consumer.accept(e);
}
});
Optional.ofNullable(lastMetadataEntry).ifPresent(
Optional.ofNullable(lastMetadataEntry.get()).ifPresent(
e -> commitIndex.updateToMax(e.getMetadataEntry().getCommitIndex(), infoIndexChange));
state.open();

Expand Down Expand Up @@ -413,6 +414,9 @@ public LogEntryProto getEntry(TimeDuration timeout) throws RaftLogIOException, T
}
throw t;
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
final String err = getName() + ": Failed readStateMachineData for " + toLogEntryString(logEntry);
LOG.error(err, e);
throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
* This class is NOT threadsafe.
*/
class BufferedWriteChannel implements Closeable {

@SuppressWarnings("java:S2095")
static BufferedWriteChannel open(File file, boolean append, ByteBuffer buffer) throws IOException {
final RandomAccessFile raf = new RandomAccessFile(file, "rw");
final FileChannel fc = raf.getChannel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ private Task addIOTask(Task task) {
if (e instanceof InterruptedException && !running) {
LOG.info("Got InterruptedException when adding task " + task
+ ". The SegmentedRaftLogWorker already stopped.");
Thread.currentThread().interrupt();
} else {
LOG.error("Failed to add IO task {}", task, e);
Optional.ofNullable(server).ifPresent(RaftServer.Division::close);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void clearDirectory() throws IOException {

private static void clearDirectory(File dir) throws IOException {
if (dir.exists()) {
LOG.info(dir + " already exists. Deleting it ...");
LOG.info("{} already exists. Deleting it ...", dir);
FileUtils.deleteFully(dir);
}
FileUtils.createDirectories(dir);
Expand Down Expand Up @@ -135,16 +135,16 @@ StorageState analyzeStorage(boolean toLock) throws IOException {
String rootPath = root.getCanonicalPath();
try { // check that storage exists
if (!root.exists()) {
LOG.info("The storage directory " + rootPath + " does not exist. Creating ...");
LOG.info("The storage directory {} does not exist. Creating ...", rootPath);
FileUtils.createDirectories(root);
}
// or is inaccessible
if (!root.isDirectory()) {
LOG.warn(rootPath + " is not a directory");
LOG.warn("{} is not a directory", rootPath);
return StorageState.NON_EXISTENT;
}
if (!Files.isWritable(root.toPath())) {
LOG.warn("The storage directory " + rootPath + " is not writable.");
LOG.warn("The storage directory {} is not writable.", rootPath);
return StorageState.NON_EXISTENT;
}
} catch(SecurityException ex) {
Expand All @@ -158,9 +158,9 @@ StorageState analyzeStorage(boolean toLock) throws IOException {

// check enough space
if (!hasEnoughSpace()) {
LOG.warn("There are not enough space left for directory " + rootPath
+ " free space min required: " + freeSpaceMin
+ " free space actual: " + root.getFreeSpace());
LOG.warn("There are not enough space left for directory {}"
+ " free space min required: {} free space actual: {}",
rootPath, freeSpaceMin, root.getFreeSpace());
return StorageState.NO_SPACE;
}

Expand Down Expand Up @@ -225,11 +225,11 @@ private FileLock tryLock(File lockF) throws IOException {
try {
res = file.getChannel().tryLock();
if (null == res) {
LOG.error("Unable to acquire file lock on path " + lockF.toString());
LOG.error("Unable to acquire file lock on path {}", lockF);
throw new OverlappingFileLockException();
}
file.write(JVM_NAME.getBytes(StandardCharsets.UTF_8));
LOG.info("Lock on " + lockF + " acquired by nodename " + JVM_NAME);
LOG.info("Lock on {} acquired by nodename {}", lockF, JVM_NAME);
} catch (OverlappingFileLockException oe) {
// Cannot read from the locked file on Windows.
LOG.error("It appears that another process "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.ratis.server.storage;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
Expand All @@ -39,7 +40,7 @@ public class RaftStorageImpl implements RaftStorage {
private final StartupOption startupOption;
private final CorruptionPolicy logCorruptionPolicy;
private volatile StorageState state = StorageState.UNINITIALIZED;
private volatile RaftStorageMetadataFileImpl metaFile;
private final AtomicReference<RaftStorageMetadataFileImpl> metaFile = new AtomicReference<>();

RaftStorageImpl(File dir, SizeInBytes freeSpaceMin, StartupOption option, CorruptionPolicy logCorruptionPolicy) {
LOG.debug("newRaftStorage: {}, freeSpaceMin={}, option={}, logCorruptionPolicy={}",
Expand All @@ -62,7 +63,7 @@ public void initialize() throws IOException {
} else {
state = analyzeAndRecoverStorage(true); // metaFile is initialized here
}
} catch (Throwable t) {
} catch (Exception t) {
unlockOnFailure(storageDir);
throw t;
}
Expand All @@ -76,7 +77,7 @@ public void initialize() throws IOException {
static void unlockOnFailure(RaftStorageDirectoryImpl dir) {
try {
dir.unlock();
} catch (Throwable t) {
} catch (Exception t) {
LOG.warn("Failed to unlock " + dir, t);
}
}
Expand All @@ -91,9 +92,9 @@ public CorruptionPolicy getLogCorruptionPolicy() {

private void format() throws IOException {
storageDir.clearDirectory();
metaFile = new RaftStorageMetadataFileImpl(storageDir.getMetaFile());
metaFile.persist(RaftStorageMetadata.getDefault());
LOG.info("Storage directory " + storageDir.getRoot() + " has been successfully formatted.");
metaFile.set(new RaftStorageMetadataFileImpl(storageDir.getMetaFile()));
metaFile.get().persist(RaftStorageMetadata.getDefault());
LOG.info("Storage directory {} has been successfully formatted.", storageDir.getRoot());
}

private void cleanMetaTmpFile() throws IOException {
Expand All @@ -112,8 +113,8 @@ private StorageState analyzeAndRecoverStorage(boolean toLock) throws IOException
if (!f.exists()) {
throw new FileNotFoundException("Metadata file " + f + " does not exists.");
}
metaFile = new RaftStorageMetadataFileImpl(f);
final RaftStorageMetadata metadata = metaFile.getMetadata();
metaFile.set(new RaftStorageMetadataFileImpl(f));
final RaftStorageMetadata metadata = metaFile.get().getMetadata();
LOG.info("Read {} from {}", metadata, f);
return StorageState.NORMAL;
} else if (storageState == StorageState.NOT_FORMATTED &&
Expand All @@ -137,7 +138,7 @@ public void close() throws IOException {

@Override
public RaftStorageMetadataFile getMetadataFile() {
return metaFile;
return metaFile.get();
}

public void writeRaftConfiguration(LogEntryProto conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ public class SnapshotManager {
private final RaftPeerId selfId;

private final Supplier<File> snapshotDir;
private final Supplier<File> tmp;
private final Supplier<File> snapshotTmpDir;
private final Function<FileChunkProto, String> getRelativePath;
private final Supplier<MessageDigest> digester = JavaUtils.memoize(MD5Hash::getDigester);

SnapshotManager(RaftPeerId selfId, Supplier<RaftStorageDirectory> dir, StateMachineStorage smStorage) {
this.selfId = selfId;
this.snapshotDir = MemoizedSupplier.valueOf(
() -> Optional.ofNullable(smStorage.getSnapshotDir()).orElseGet(() -> dir.get().getStateMachineDir()));
this.tmp = MemoizedSupplier.valueOf(
this.snapshotTmpDir = MemoizedSupplier.valueOf(
() -> Optional.ofNullable(smStorage.getTmpDir()).orElseGet(() -> dir.get().getTmpDir()));

final Supplier<Path> smDir = MemoizedSupplier.valueOf(() -> dir.get().getStateMachineDir().toPath());
Expand All @@ -82,7 +82,7 @@ public void installSnapshot(InstallSnapshotRequestProto request, StateMachine st
final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex();

// create a unique temporary directory
final File tmpDir = new File(tmp.get(), "snapshot-" + snapshotChunkRequest.getRequestId());
final File tmpDir = new File(this.snapshotTmpDir.get(), "snapshot-" + snapshotChunkRequest.getRequestId());
FileUtils.createDirectories(tmpDir);
tmpDir.deleteOnExit();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public static SnapshotManager newSnapshotManager(RaftPeerId id,
}

/** Create a {@link RaftStorageImpl}. */
@SuppressWarnings("java:S2095")
public static RaftStorageImpl newRaftStorage(File dir, SizeInBytes freeSpaceMin,
RaftStorage.StartupOption option, Log.CorruptionPolicy logCorruptionPolicy) {
return new RaftStorageImpl(dir, freeSpaceMin, option, logCorruptionPolicy);
Expand Down Expand Up @@ -136,14 +137,14 @@ private RaftStorageImpl format() throws IOException {
+ " for " + storageDirName);
}

for (; !dirsPerVol.isEmpty(); ) {
while (!dirsPerVol.isEmpty()) {
final File vol = chooseMin(dirsPerVol);
final File dir = new File(vol, storageDirName);
try {
final RaftStorageImpl storage = newRaftStorage(dir, freeSpaceMin, StartupOption.FORMAT, logCorruptionPolicy);
storage.initialize();
return storage;
} catch (Throwable e) {
} catch (Exception e) {
LOG.warn("Failed to initialize a new directory " + dir.getAbsolutePath(), e);
dirsPerVol.remove(vol);
}
Expand All @@ -166,10 +167,9 @@ private RaftStorageImpl recover() throws IOException {
final RaftStorageImpl storage = newRaftStorage(dir, freeSpaceMin, StartupOption.RECOVER, logCorruptionPolicy);
storage.initialize();
return storage;
} catch (Throwable e) {
if (e instanceof IOException) {
throw e;
}
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException("Failed to initialize the existing directory " + dir.getAbsolutePath(), e);
}
}
Expand Down

0 comments on commit 1f2dc7b

Please sign in to comment.