From 45b1a6557e4bc532f6f320a91184d4eb180cafdb Mon Sep 17 00:00:00 2001 From: OneSizeFitQuorum Date: Wed, 23 Aug 2023 16:47:48 +0800 Subject: [PATCH 1/3] finish Signed-off-by: OneSizeFitQuorum --- .../java/org/apache/ratis/statemachine/StateMachine.java | 7 +++++++ .../java/org/apache/ratis/server/impl/RaftServerImpl.java | 8 ++++++++ 2 files changed, 15 insertions(+) diff --git a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java index e21411ec79..07dc07406f 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -150,6 +150,13 @@ interface EventApi { */ default void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) {} + /** + * Notify the {@link StateMachine} that current node has become the leader and recovered successfully. + * Note that only leader will call this function. + * + */ + default void notifyLeaderReady() {} + /** * Notify the {@link StateMachine} a term-index update event. * This method will be invoked when a {@link MetadataProto} diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index add6b041da..7b3f50c0aa 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1802,6 +1802,9 @@ CompletableFuture applyLogToStateMachine(LogEntryProto next) throws Raf // the new conf in the metadata file and notify the StateMachine. state.writeRaftConfiguration(next); stateMachine.event().notifyConfigurationChanged(next.getTerm(), next.getIndex(), next.getConfigurationEntry()); + if (isFirstLogAppliedByCurrentLeaderInCurrentTerm(next.getTerm())) { + stateMachine.event().notifyLeaderReady(); + } } else if (next.hasStateMachineLogEntry()) { // check whether there is a TransactionContext because we are the leader. TransactionContext trx = role.getLeaderState() @@ -1825,6 +1828,11 @@ CompletableFuture applyLogToStateMachine(LogEntryProto next) throws Raf return null; } + boolean isFirstLogAppliedByCurrentLeaderInCurrentTerm(long term) { + return term != state.getLastEntry().getTerm() + && role.getLeaderState().map(leader -> leader.getCurrentTerm() == term).orElse(false); + } + /** * The given log entry is being truncated. * Fail the corresponding client request, if there is any. From e97260161586b986f0dd849d9bd6adafee635eac Mon Sep 17 00:00:00 2001 From: OneSizeFitQuorum Date: Wed, 23 Aug 2023 18:19:34 +0800 Subject: [PATCH 2/3] fix wrong index Signed-off-by: OneSizeFitQuorum --- .../org/apache/ratis/server/impl/LeaderStateImpl.java | 4 ++++ .../java/org/apache/ratis/server/impl/RaftServerImpl.java | 8 ++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 7ea4d738dc..0eaf5c4e7b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -427,6 +427,10 @@ TermIndex getLastEntry() { return server.getState().getLastEntry(); } + long getPlaceHolderIndex() { + return placeHolderIndex; + } + @Override public boolean onFollowerTerm(FollowerInfo follower, long followerTerm) { if (isCaughtUp(follower) && followerTerm > getCurrentTerm()) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 7b3f50c0aa..828fa57b7e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1802,7 +1802,7 @@ CompletableFuture applyLogToStateMachine(LogEntryProto next) throws Raf // the new conf in the metadata file and notify the StateMachine. state.writeRaftConfiguration(next); stateMachine.event().notifyConfigurationChanged(next.getTerm(), next.getIndex(), next.getConfigurationEntry()); - if (isFirstLogAppliedByCurrentLeaderInCurrentTerm(next.getTerm())) { + if (isFirstLogAppliedByCurrentLeaderInCurrentTerm(next)) { stateMachine.event().notifyLeaderReady(); } } else if (next.hasStateMachineLogEntry()) { @@ -1828,9 +1828,9 @@ CompletableFuture applyLogToStateMachine(LogEntryProto next) throws Raf return null; } - boolean isFirstLogAppliedByCurrentLeaderInCurrentTerm(long term) { - return term != state.getLastEntry().getTerm() - && role.getLeaderState().map(leader -> leader.getCurrentTerm() == term).orElse(false); + boolean isFirstLogAppliedByCurrentLeaderInCurrentTerm(LogEntryProto entry) { + return role.getLeaderState().map(leader -> leader.getCurrentTerm() == entry.getTerm() && + leader.getPlaceHolderIndex() == entry.getIndex()).orElse(false); } /** From cc0a2c146735985a74103c221575b2e53953f6fc Mon Sep 17 00:00:00 2001 From: OneSizeFitQuorum Date: Fri, 25 Aug 2023 10:37:33 +0800 Subject: [PATCH 3/3] fix review Signed-off-by: OneSizeFitQuorum --- .../apache/ratis/statemachine/StateMachine.java | 12 +++++------- .../apache/ratis/server/impl/LeaderStateImpl.java | 14 +++++++++----- .../apache/ratis/server/impl/RaftServerImpl.java | 9 +-------- 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java index 07dc07406f..90813f325b 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -150,13 +150,6 @@ interface EventApi { */ default void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) {} - /** - * Notify the {@link StateMachine} that current node has become the leader and recovered successfully. - * Note that only leader will call this function. - * - */ - default void notifyLeaderReady() {} - /** * Notify the {@link StateMachine} a term-index update event. * This method will be invoked when a {@link MetadataProto} @@ -235,6 +228,11 @@ default void notifyFollowerSlowness(RoleInfoProto leaderInfo) {} * Notify {@link StateMachine} that this server is no longer the leader. */ default void notifyNotLeader(Collection pendingEntries) throws IOException {} + + /** + * Notify the {@link StateMachine} that this server becomes ready after changed to leader. + */ + default void notifyLeaderReady() {} } /** diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 0eaf5c4e7b..7eb2559bbb 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -316,6 +316,7 @@ private List getFollowerInfos(PeerConfiguration peers) { private final int stagingCatchupGap; private final long placeHolderIndex; + private final AtomicBoolean isReady = new AtomicBoolean(); private final RaftServerMetricsImpl raftServerMetrics; private final LogAppenderMetrics logAppenderMetrics; private final long followerMaxGapThreshold; @@ -383,7 +384,14 @@ LogEntryProto start() { } boolean isReady() { - return server.getState().getLastAppliedIndex() >= placeHolderIndex; + return isReady.get(); + } + + void checkReady(LogEntryProto entry) { + if (entry.getTerm() == currentTerm && entry.getIndex() == placeHolderIndex) { + isReady.set(true); + server.getStateMachine().leaderEvent().notifyLeaderReady(); + } } void stop() { @@ -427,10 +435,6 @@ TermIndex getLastEntry() { return server.getState().getLastEntry(); } - long getPlaceHolderIndex() { - return placeHolderIndex; - } - @Override public boolean onFollowerTerm(FollowerInfo follower, long followerTerm) { if (isCaughtUp(follower) && followerTerm > getCurrentTerm()) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 828fa57b7e..7065993315 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1802,9 +1802,7 @@ CompletableFuture applyLogToStateMachine(LogEntryProto next) throws Raf // the new conf in the metadata file and notify the StateMachine. state.writeRaftConfiguration(next); stateMachine.event().notifyConfigurationChanged(next.getTerm(), next.getIndex(), next.getConfigurationEntry()); - if (isFirstLogAppliedByCurrentLeaderInCurrentTerm(next)) { - stateMachine.event().notifyLeaderReady(); - } + role.getLeaderState().ifPresent(leader -> leader.checkReady(next)); } else if (next.hasStateMachineLogEntry()) { // check whether there is a TransactionContext because we are the leader. TransactionContext trx = role.getLeaderState() @@ -1828,11 +1826,6 @@ CompletableFuture applyLogToStateMachine(LogEntryProto next) throws Raf return null; } - boolean isFirstLogAppliedByCurrentLeaderInCurrentTerm(LogEntryProto entry) { - return role.getLeaderState().map(leader -> leader.getCurrentTerm() == entry.getTerm() && - leader.getPlaceHolderIndex() == entry.getIndex()).orElse(false); - } - /** * The given log entry is being truncated. * Fail the corresponding client request, if there is any.