From 687dcdfca9c00f8ac54807fb6ba7c774fd29cea5 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Wed, 30 Aug 2023 10:56:54 -0700 Subject: [PATCH] RATIS-1881. In LeaderStateImpl, set placeHolderIndex from the log. --- .../ratis/server/impl/LeaderStateImpl.java | 72 ++++++++++++------- .../ratis/server/impl/RaftServerImpl.java | 3 +- 2 files changed, 47 insertions(+), 28 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 6d57e68aa3..7f1d78f603 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -54,6 +54,7 @@ import org.apache.ratis.util.CollectionUtils; import org.apache.ratis.util.Daemon; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.Timestamp; @@ -288,6 +289,29 @@ private List getFollowerInfos(PeerConfiguration peers) { } } + /** Leader startup state. */ + private class StartupState { + /** At startup, append the current configuration to the log. */ + private final long startIndex = appendConfiguration(server.getRaftConf()); + /** Leader becomes ready when the applied index >= start index. */ + private final AtomicBoolean isReady = new AtomicBoolean(); + + boolean checkReady(long appliedIndex) { + if (appliedIndex >= startIndex) { + isReady.set(true); + LOG.info("leader is ready since appliedIndex == {} >= startIndex == {}", + appliedIndex, startIndex); + return true; + } else { + return false; + } + } + + boolean isReady() { + return isReady.get() || checkReady(server.getState().getLastAppliedIndex()); + } + } + private final StateUpdateEvent updateCommitEvent = new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, -1, this::updateCommit); private final StateUpdateEvent checkStagingEvent = @@ -311,11 +335,11 @@ private List getFollowerInfos(PeerConfiguration peers) { private final PendingRequests pendingRequests; private final WatchRequests watchRequests; private final MessageStreamRequests messageStreamRequests; + + private final MemoizedSupplier startupState = MemoizedSupplier.valueOf(StartupState::new); private final AtomicBoolean isStopped = new AtomicBoolean(); private final int stagingCatchupGap; - private final long placeHolderIndex; - private final AtomicBoolean isReady = new AtomicBoolean(); private final RaftServerMetricsImpl raftServerMetrics; private final LogAppenderMetrics logAppenderMetrics; private final long followerMaxGapThreshold; @@ -357,38 +381,37 @@ private List getFollowerInfos(PeerConfiguration peers) { final RaftConfigurationImpl conf = state.getRaftConf(); Collection others = conf.getOtherPeers(server.getId()); - placeHolderIndex = raftLog.getNextIndex(); + final long nextIndex = raftLog.getNextIndex(); senders = new SenderList(); - addSenders(others, placeHolderIndex, true); + addSenders(others, nextIndex, true); final Collection listeners = conf.getAllPeers(RaftPeerRole.LISTENER); if (!listeners.isEmpty()) { - addSenders(listeners, placeHolderIndex, true); + addSenders(listeners, nextIndex, true); } } - LogEntryProto start() { + void start() { // In the beginning of the new term, replicate a conf entry in order // to finally commit entries in the previous term. // Also this message can help identify the last committed index and the conf. - final LogEntryProto placeHolder = LogProtoUtils.toLogEntryProto( - server.getRaftConf(), server.getState().getCurrentTerm(), raftLog.getNextIndex()); CodeInjectionForTesting.execute(APPEND_PLACEHOLDER, server.getId().toString(), null); - raftLog.append(Collections.singletonList(placeHolder)); + // Initialize start state + startupState.get(); processor.start(); senders.forEach(LogAppender::start); - return placeHolder; } + boolean isReady() { - return isReady.get(); + return startupState.isInitialized() && startupState.get().isReady(); } void checkReady(LogEntryProto entry) { - if (entry.getTerm() == currentTerm && entry.getIndex() == placeHolderIndex) { - isReady.set(true); + Preconditions.assertTrue(startupState.isInitialized()); + if (entry.getTerm() == getCurrentTerm() && startupState.get().checkReady(entry.getIndex())) { server.getStateMachine().leaderEvent().notifyLeaderReady(); } } @@ -427,13 +450,10 @@ boolean inStagingState() { } long getCurrentTerm() { + Preconditions.assertSame(currentTerm, server.getState().getCurrentTerm(), "currentTerm"); return currentTerm; } - TermIndex getLastEntry() { - return server.getState().getLastEntry(); - } - @Override public boolean onFollowerTerm(FollowerInfo follower, long followerTerm) { if (isCaughtUp(follower) && followerTerm > getCurrentTerm()) { @@ -546,16 +566,17 @@ private void applyOldNewConf() { final RaftConfigurationImpl current = state.getRaftConf(); final RaftConfigurationImpl oldNewConf= stagingState.generateOldNewConf(current, state.getLog().getNextIndex()); // apply the (old, new) configuration to log, and use it as the current conf - long index = state.getLog().append(state.getCurrentTerm(), oldNewConf); - updateConfiguration(index, oldNewConf); + appendConfiguration(oldNewConf); this.stagingState = null; notifySenders(); } - private void updateConfiguration(long logIndex, RaftConfigurationImpl newConf) { - Preconditions.assertTrue(logIndex == newConf.getLogEntryIndex()); - server.getState().setRaftConf(newConf); + private long appendConfiguration(RaftConfigurationImpl conf) { + final long logIndex = raftLog.append(getCurrentTerm(), conf); + Preconditions.assertSame(conf.getLogEntryIndex(), logIndex, "confLogIndex"); + server.getState().setRaftConf(conf); + return logIndex; } void updateFollowerCommitInfos(CommitInfoCache cache, List protos) { @@ -958,8 +979,7 @@ private void replicateNewConf() { .build(); // stop the LogAppender if the corresponding follower and listener is no longer in the conf updateSenders(newConf); - long index = raftLog.append(server.getState().getCurrentTerm(), newConf); - updateConfiguration(index, newConf); + appendConfiguration(conf); notifySenders(); } @@ -1009,7 +1029,7 @@ private void checkPeersForYieldingLeader() { highestPriorityInfos.add(logAppender); } } - final TermIndex leaderLastEntry = getLastEntry(); + final TermIndex leaderLastEntry = server.getState().getLastEntry(); final LogAppender appender = chooseUpToDateFollower(highestPriorityInfos, leaderLastEntry); if (appender != null) { server.getTransferLeadership().start(appender); @@ -1077,7 +1097,7 @@ CompletableFuture getReadIndex() { } // leader has not committed any entries in this term, reject - if (server.getRaftLog().getTermIndex(readIndex).getTerm() != server.getState().getCurrentTerm()) { + if (server.getRaftLog().getTermIndex(readIndex).getTerm() != getCurrentTerm()) { return JavaUtils.completeExceptionally(new ReadIndexException( "Failed to getReadIndex " + readIndex + " since the term is not yet committed.", new LeaderNotReadyException(server.getMemberId()))); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 173bd1c707..3b225dc3c8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -607,8 +607,7 @@ synchronized void changeToLeader() { state.becomeLeader(); // start sending AppendEntries RPC to followers - final LogEntryProto e = leader.start(); - getState().setRaftConf(e); + leader.start(); } @Override