Skip to content

Commit

Permalink
RATIS-1872. HeartbeatAck use in-correct callId as minCallId. (#905)
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo authored and SzyWilliam committed Aug 28, 2023
1 parent 209bbe2 commit 4b7305a
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
Expand Down Expand Up @@ -1076,20 +1075,14 @@ CompletableFuture<Long> getReadIndex() {
new LeaderNotReadyException(server.getMemberId())));
}

final MemoizedSupplier<AppendEntriesListener> supplier = MemoizedSupplier.valueOf(
() -> new AppendEntriesListener(readIndex));
final AppendEntriesListener listener = readIndexHeartbeats.addAppendEntriesListener(
readIndex, key -> supplier.get());
readIndex, i -> new AppendEntriesListener(i, senders));

// the readIndex is already acknowledged before
if (listener == null) {
return CompletableFuture.completedFuture(readIndex);
}

if (supplier.isInitialized()) {
senders.forEach(LogAppender::triggerHeartbeat);
}

return listener.getFuture();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,12 @@ static class AppendEntriesListener {
private final CompletableFuture<Long> future = new CompletableFuture<>();
private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new ConcurrentHashMap<>();

AppendEntriesListener(long commitIndex) {
AppendEntriesListener(long commitIndex, Iterable<LogAppender> logAppenders) {
this.commitIndex = commitIndex;
for (LogAppender a : logAppenders) {
a.triggerHeartbeat();
replies.put(a.getFollowerId(), new HeartbeatAck(a));
}
}

CompletableFuture<Long> getFuture() {
Expand Down Expand Up @@ -159,8 +163,7 @@ synchronized void failAll(Exception e) {
private final AppendEntriesListeners appendEntriesListeners = new AppendEntriesListeners();
private final RaftLogIndex ackedCommitIndex = new RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);

AppendEntriesListener addAppendEntriesListener(long commitIndex,
Function<Long, AppendEntriesListener> constructor) {
AppendEntriesListener addAppendEntriesListener(long commitIndex, Function<Long, AppendEntriesListener> constructor) {
if (commitIndex <= ackedCommitIndex.get()) {
return null;
}
Expand Down

0 comments on commit 4b7305a

Please sign in to comment.