Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
leixm committed Jun 12, 2024
1 parent a1567ea commit 8688a20
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ public class CelebornStateMachineStorage implements StateMachineStorage {
.isPresent();

private volatile File stateMachineDir = null;
private final AtomicReference<SingleFileSnapshotInfo> latestSnapshot = new AtomicReference<>();
private volatile SingleFileSnapshotInfo latestSnapshot = null;

File tmpDir = null;

@Override
public void init(RaftStorage storage) throws IOException {
this.stateMachineDir = storage.getStorageDir().getStateMachineDir();
getLatestSnapshot();
loadLatestSnapshot();
tmpDir = storage.getStorageDir().getTmpDir();
}

Expand Down Expand Up @@ -175,44 +175,27 @@ static SingleFileSnapshotInfo findLatestSnapshot(Path dir) throws IOException {
return new SingleFileSnapshotInfo(info, latest.getTerm(), latest.getIndex());
}

public SingleFileSnapshotInfo updateLatestSnapshot(SingleFileSnapshotInfo info) {
return latestSnapshot.updateAndGet(
previous -> previous == null || info.getIndex() > previous.getIndex() ? info : previous);
}

public static String getSnapshotFileName(long term, long endIndex) {
return SNAPSHOT_FILE_PREFIX + "." + term + "_" + endIndex;
}

@Override
public SingleFileSnapshotInfo getLatestSnapshot() {
final SingleFileSnapshotInfo s = latestSnapshot.get();
if (s != null) {
return s;
}
final File dir = stateMachineDir;
if (dir == null) {
return null;
}
try {
return updateLatestSnapshot(findLatestSnapshot(dir.toPath()));
} catch (IOException ignored) {
return null;
}
return latestSnapshot;
}

@Override
public File getTmpDir() {
return tmpDir;
}

public void refreshLatestSnapshot() {
public void loadLatestSnapshot() {
final File dir = stateMachineDir;
if (dir == null) {
return;
}
try {
updateLatestSnapshot(findLatestSnapshot(dir.toPath()));
latestSnapshot = findLatestSnapshot(dir.toPath());
} catch (IOException ignored) {
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void initialize(RaftServer server, RaftGroupId id, RaftStorage raftStorag
public void reinitialize() throws IOException {
LOG.info("Reinitializing state machine.");
getLifeCycle().compareAndTransition(PAUSED, STARTING);
storage.refreshLatestSnapshot();
storage.loadLatestSnapshot();
loadSnapshot(storage.getLatestSnapshot());
getLifeCycle().compareAndTransition(STARTING, RUNNING);
}
Expand Down Expand Up @@ -266,10 +266,7 @@ public long takeSnapshot() {
LOG.warn("Failed to rename snapshot from {} to {}.", tempFile, snapshotFile);
return RaftLog.INVALID_LOG_INDEX;
}

// update storage
final FileInfo info = new FileInfo(snapshotFile.toPath(), digest);
storage.updateLatestSnapshot(new SingleFileSnapshotInfo(info, lastTermIndex));
storage.loadLatestSnapshot();
} catch (Exception e) {
tempFile.delete();
LOG.warn("Failed to complete snapshot: {}.", snapshotFile, e);
Expand Down

0 comments on commit 8688a20

Please sign in to comment.