diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 7f1d78f603..e100fcbe0f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -289,16 +289,19 @@ private List getFollowerInfos(PeerConfiguration peers) { } } - /** Leader startup state. */ - private class StartupState { - /** At startup, append the current configuration to the log. */ - private final long startIndex = appendConfiguration(server.getRaftConf()); - /** Leader becomes ready when the applied index >= start index. */ - private final AtomicBoolean isReady = new AtomicBoolean(); - - boolean checkReady(long appliedIndex) { + /** When Leader startup, append a log entry . */ + private class StartupLogEntry { + /** The log index at leader startup. */ + private final long startIndex = appendConfiguration(RaftConfigurationImpl.newBuilder() + .setConf(server.getRaftConf().getConf()) + .setLogEntryIndex(raftLog.getNextIndex()) + .build()); + /** Is the log entry applied? */ + private final AtomicBoolean applied = new AtomicBoolean(); + + boolean checkStartIndex(long appliedIndex) { if (appliedIndex >= startIndex) { - isReady.set(true); + applied.set(true); LOG.info("leader is ready since appliedIndex == {} >= startIndex == {}", appliedIndex, startIndex); return true; @@ -307,8 +310,8 @@ boolean checkReady(long appliedIndex) { } } - boolean isReady() { - return isReady.get() || checkReady(server.getState().getLastAppliedIndex()); + boolean isApplied() { + return applied.get() || checkStartIndex(server.getState().getLastAppliedIndex()); } } @@ -336,7 +339,7 @@ boolean isReady() { private final WatchRequests watchRequests; private final MessageStreamRequests messageStreamRequests; - private final MemoizedSupplier startupState = MemoizedSupplier.valueOf(StartupState::new); + private final MemoizedSupplier startupLogEntry = MemoizedSupplier.valueOf(StartupLogEntry::new); private final AtomicBoolean isStopped = new AtomicBoolean(); private final int stagingCatchupGap; @@ -398,20 +401,19 @@ void start() { // Also this message can help identify the last committed index and the conf. CodeInjectionForTesting.execute(APPEND_PLACEHOLDER, server.getId().toString(), null); - // Initialize start state - startupState.get(); + // append the startup log entry to RaftLog + startupLogEntry.get(); processor.start(); senders.forEach(LogAppender::start); } - boolean isReady() { - return startupState.isInitialized() && startupState.get().isReady(); + return startupLogEntry.isInitialized() && startupLogEntry.get().isApplied(); } void checkReady(LogEntryProto entry) { - Preconditions.assertTrue(startupState.isInitialized()); - if (entry.getTerm() == getCurrentTerm() && startupState.get().checkReady(entry.getIndex())) { + Preconditions.assertTrue(startupLogEntry.isInitialized()); + if (entry.getTerm() == getCurrentTerm() && startupLogEntry.get().checkStartIndex(entry.getIndex())) { server.getStateMachine().leaderEvent().notifyLeaderReady(); } } @@ -979,7 +981,7 @@ private void replicateNewConf() { .build(); // stop the LogAppender if the corresponding follower and listener is no longer in the conf updateSenders(newConf); - appendConfiguration(conf); + appendConfiguration(newConf); notifySenders(); }