From e50a0125b6acfc7c236d598a2d0631bdd96f23b6 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 18 Aug 2023 07:49:03 -0700 Subject: [PATCH] RATIS-1872. HeartbeatAck use in-correct callId as minCallId. (#905) --- .../org/apache/ratis/server/impl/LeaderStateImpl.java | 9 +-------- .../apache/ratis/server/impl/ReadIndexHeartbeats.java | 9 ++++++--- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 5dfbc009ba..adf035e4f8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -54,7 +54,6 @@ import org.apache.ratis.util.CollectionUtils; import org.apache.ratis.util.Daemon; import org.apache.ratis.util.JavaUtils; -import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.Timestamp; @@ -1076,20 +1075,14 @@ CompletableFuture getReadIndex() { new LeaderNotReadyException(server.getMemberId()))); } - final MemoizedSupplier supplier = MemoizedSupplier.valueOf( - () -> new AppendEntriesListener(readIndex)); final AppendEntriesListener listener = readIndexHeartbeats.addAppendEntriesListener( - readIndex, key -> supplier.get()); + readIndex, i -> new AppendEntriesListener(i, senders)); // the readIndex is already acknowledged before if (listener == null) { return CompletableFuture.completedFuture(readIndex); } - if (supplier.isInitialized()) { - senders.forEach(LogAppender::triggerHeartbeat); - } - return listener.getFuture(); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java index 92513086d7..7e252f7ad8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java @@ -85,8 +85,12 @@ static class AppendEntriesListener { private final CompletableFuture future = new CompletableFuture<>(); private final ConcurrentHashMap replies = new ConcurrentHashMap<>(); - AppendEntriesListener(long commitIndex) { + AppendEntriesListener(long commitIndex, Iterable logAppenders) { this.commitIndex = commitIndex; + for (LogAppender a : logAppenders) { + a.triggerHeartbeat(); + replies.put(a.getFollowerId(), new HeartbeatAck(a)); + } } CompletableFuture getFuture() { @@ -159,8 +163,7 @@ synchronized void failAll(Exception e) { private final AppendEntriesListeners appendEntriesListeners = new AppendEntriesListeners(); private final RaftLogIndex ackedCommitIndex = new RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX); - AppendEntriesListener addAppendEntriesListener(long commitIndex, - Function constructor) { + AppendEntriesListener addAppendEntriesListener(long commitIndex, Function constructor) { if (commitIndex <= ackedCommitIndex.get()) { return null; }