diff --git a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java index e21411ec79..07dc07406f 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -150,6 +150,13 @@ interface EventApi { */ default void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) {} + /** + * Notify the {@link StateMachine} that current node has become the leader and recovered successfully. + * Note that only leader will call this function. + * + */ + default void notifyLeaderReady() {} + /** * Notify the {@link StateMachine} a term-index update event. * This method will be invoked when a {@link MetadataProto} diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index add6b041da..b6374a4ee1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1801,7 +1801,11 @@ CompletableFuture applyLogToStateMachine(LogEntryProto next) throws Raf // the reply should have already been set. only need to record // the new conf in the metadata file and notify the StateMachine. state.writeRaftConfiguration(next); - stateMachine.event().notifyConfigurationChanged(next.getTerm(), next.getIndex(), next.getConfigurationEntry()); + stateMachine.event().notifyConfigurationChanged(next.getTerm(), next.getIndex(), + next.getConfigurationEntry()); + if (isFirstLogAppliedByCurrentLeaderInCurrentTerm(next.getTerm())) { + stateMachine.event().notifyLeaderReady(); + } } else if (next.hasStateMachineLogEntry()) { // check whether there is a TransactionContext because we are the leader. TransactionContext trx = role.getLeaderState() @@ -1825,6 +1829,11 @@ CompletableFuture applyLogToStateMachine(LogEntryProto next) throws Raf return null; } + boolean isFirstLogAppliedByCurrentLeaderInCurrentTerm(long term) { + return term != state.getLastEntry().getTerm() + && role.getLeaderState().map(leader -> leader.getCurrentTerm() == term).orElse(false); + } + /** * The given log entry is being truncated. * Fail the corresponding client request, if there is any.