diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java index 4f5328302d8..298843cce8c 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java @@ -65,14 +65,14 @@ public class CelebornStateMachineStorage implements StateMachineStorage { .isPresent(); private volatile File stateMachineDir = null; - private final AtomicReference latestSnapshot = new AtomicReference<>(); + private volatile SingleFileSnapshotInfo latestSnapshot = null; File tmpDir = null; @Override public void init(RaftStorage storage) throws IOException { this.stateMachineDir = storage.getStorageDir().getStateMachineDir(); - getLatestSnapshot(); + loadLatestSnapshot(); tmpDir = storage.getStorageDir().getTmpDir(); } @@ -175,30 +175,13 @@ static SingleFileSnapshotInfo findLatestSnapshot(Path dir) throws IOException { return new SingleFileSnapshotInfo(info, latest.getTerm(), latest.getIndex()); } - public SingleFileSnapshotInfo updateLatestSnapshot(SingleFileSnapshotInfo info) { - return latestSnapshot.updateAndGet( - previous -> previous == null || info.getIndex() > previous.getIndex() ? info : previous); - } - public static String getSnapshotFileName(long term, long endIndex) { return SNAPSHOT_FILE_PREFIX + "." + term + "_" + endIndex; } @Override public SingleFileSnapshotInfo getLatestSnapshot() { - final SingleFileSnapshotInfo s = latestSnapshot.get(); - if (s != null) { - return s; - } - final File dir = stateMachineDir; - if (dir == null) { - return null; - } - try { - return updateLatestSnapshot(findLatestSnapshot(dir.toPath())); - } catch (IOException ignored) { - return null; - } + return latestSnapshot; } @Override @@ -206,13 +189,13 @@ public File getTmpDir() { return tmpDir; } - public void refreshLatestSnapshot() { + public void loadLatestSnapshot() { final File dir = stateMachineDir; if (dir == null) { return; } try { - updateLatestSnapshot(findLatestSnapshot(dir.toPath())); + latestSnapshot = findLatestSnapshot(dir.toPath()); } catch (IOException ignored) { } } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java index cacfa2eef18..c77dcd2de63 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java @@ -98,7 +98,7 @@ public void initialize(RaftServer server, RaftGroupId id, RaftStorage raftStorag public void reinitialize() throws IOException { LOG.info("Reinitializing state machine."); getLifeCycle().compareAndTransition(PAUSED, STARTING); - storage.refreshLatestSnapshot(); + storage.loadLatestSnapshot(); loadSnapshot(storage.getLatestSnapshot()); getLifeCycle().compareAndTransition(STARTING, RUNNING); } @@ -266,10 +266,7 @@ public long takeSnapshot() { LOG.warn("Failed to rename snapshot from {} to {}.", tempFile, snapshotFile); return RaftLog.INVALID_LOG_INDEX; } - - // update storage - final FileInfo info = new FileInfo(snapshotFile.toPath(), digest); - storage.updateLatestSnapshot(new SingleFileSnapshotInfo(info, lastTermIndex)); + storage.loadLatestSnapshot(); } catch (Exception e) { tempFile.delete(); LOG.warn("Failed to complete snapshot: {}.", snapshotFile, e);