diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index e100fcbe0f..ac8c3599ff 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -289,19 +289,22 @@ private List getFollowerInfos(PeerConfiguration peers) { } } - /** When Leader startup, append a log entry . */ private class StartupLogEntry { /** The log index at leader startup. */ private final long startIndex = appendConfiguration(RaftConfigurationImpl.newBuilder() .setConf(server.getRaftConf().getConf()) .setLogEntryIndex(raftLog.getNextIndex()) .build()); - /** Is the log entry applied? */ - private final AtomicBoolean applied = new AtomicBoolean(); + /** This future will be completed after the log entry is applied. */ + private final CompletableFuture appliedIndexFuture = new CompletableFuture<>(); - boolean checkStartIndex(long appliedIndex) { + boolean isApplied(LogEntryProto logEntry) { + if (appliedIndexFuture.isDone()) { + return true; + } + final long appliedIndex = logEntry != null? logEntry.getIndex(): server.getState().getLastAppliedIndex(); if (appliedIndex >= startIndex) { - applied.set(true); + appliedIndexFuture.complete(appliedIndex); LOG.info("leader is ready since appliedIndex == {} >= startIndex == {}", appliedIndex, startIndex); return true; @@ -309,10 +312,6 @@ boolean checkStartIndex(long appliedIndex) { return false; } } - - boolean isApplied() { - return applied.get() || checkStartIndex(server.getState().getLastAppliedIndex()); - } } private final StateUpdateEvent updateCommitEvent = @@ -401,19 +400,19 @@ void start() { // Also this message can help identify the last committed index and the conf. CodeInjectionForTesting.execute(APPEND_PLACEHOLDER, server.getId().toString(), null); - // append the startup log entry to RaftLog + // Initialize startup log entry and append it to the RaftLog startupLogEntry.get(); processor.start(); senders.forEach(LogAppender::start); } boolean isReady() { - return startupLogEntry.isInitialized() && startupLogEntry.get().isApplied(); + return startupLogEntry.isInitialized() && startupLogEntry.get().isApplied(null); } void checkReady(LogEntryProto entry) { Preconditions.assertTrue(startupLogEntry.isInitialized()); - if (entry.getTerm() == getCurrentTerm() && startupLogEntry.get().checkStartIndex(entry.getIndex())) { + if (entry.getTerm() == getCurrentTerm() && startupLogEntry.get().isApplied(entry)) { server.getStateMachine().leaderEvent().notifyLeaderReady(); } }