Skip to content

Commit

Permalink
Fix some bugs.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed Aug 30, 2023
1 parent 687dcdf commit e214215
Showing 1 changed file with 21 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,16 +289,19 @@ private List<FollowerInfo> getFollowerInfos(PeerConfiguration peers) {
}
}

/** Leader startup state. */
private class StartupState {
/** At startup, append the current configuration to the log. */
private final long startIndex = appendConfiguration(server.getRaftConf());
/** Leader becomes ready when the applied index >= start index. */
private final AtomicBoolean isReady = new AtomicBoolean();

boolean checkReady(long appliedIndex) {
/** When Leader startup, append a log entry . */
private class StartupLogEntry {
/** The log index at leader startup. */
private final long startIndex = appendConfiguration(RaftConfigurationImpl.newBuilder()
.setConf(server.getRaftConf().getConf())
.setLogEntryIndex(raftLog.getNextIndex())
.build());
/** Is the log entry applied? */
private final AtomicBoolean applied = new AtomicBoolean();

boolean checkStartIndex(long appliedIndex) {
if (appliedIndex >= startIndex) {
isReady.set(true);
applied.set(true);
LOG.info("leader is ready since appliedIndex == {} >= startIndex == {}",
appliedIndex, startIndex);
return true;
Expand All @@ -307,8 +310,8 @@ boolean checkReady(long appliedIndex) {
}
}

boolean isReady() {
return isReady.get() || checkReady(server.getState().getLastAppliedIndex());
boolean isApplied() {
return applied.get() || checkStartIndex(server.getState().getLastAppliedIndex());
}
}

Expand Down Expand Up @@ -336,7 +339,7 @@ boolean isReady() {
private final WatchRequests watchRequests;
private final MessageStreamRequests messageStreamRequests;

private final MemoizedSupplier<StartupState> startupState = MemoizedSupplier.valueOf(StartupState::new);
private final MemoizedSupplier<StartupLogEntry> startupLogEntry = MemoizedSupplier.valueOf(StartupLogEntry::new);
private final AtomicBoolean isStopped = new AtomicBoolean();

private final int stagingCatchupGap;
Expand Down Expand Up @@ -398,20 +401,19 @@ void start() {
// Also this message can help identify the last committed index and the conf.
CodeInjectionForTesting.execute(APPEND_PLACEHOLDER,
server.getId().toString(), null);
// Initialize start state
startupState.get();
// append the startup log entry to RaftLog
startupLogEntry.get();
processor.start();
senders.forEach(LogAppender::start);
}


boolean isReady() {
return startupState.isInitialized() && startupState.get().isReady();
return startupLogEntry.isInitialized() && startupLogEntry.get().isApplied();
}

void checkReady(LogEntryProto entry) {
Preconditions.assertTrue(startupState.isInitialized());
if (entry.getTerm() == getCurrentTerm() && startupState.get().checkReady(entry.getIndex())) {
Preconditions.assertTrue(startupLogEntry.isInitialized());
if (entry.getTerm() == getCurrentTerm() && startupLogEntry.get().checkStartIndex(entry.getIndex())) {
server.getStateMachine().leaderEvent().notifyLeaderReady();
}
}
Expand Down Expand Up @@ -979,7 +981,7 @@ private void replicateNewConf() {
.build();
// stop the LogAppender if the corresponding follower and listener is no longer in the conf
updateSenders(newConf);
appendConfiguration(conf);
appendConfiguration(newConf);
notifySenders();
}

Expand Down

0 comments on commit e214215

Please sign in to comment.