Skip to content

Commit

Permalink
ignore previous requests.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed May 7, 2024
1 parent 1f10ce0 commit f1f273f
Showing 1 changed file with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ private ResultAndTerm submitRequestAndWaitResult(Phase phase, RaftConfigurationI
} else {
final TermIndex lastEntry = server.getState().getLastEntry();
final Executor voteExecutor = server.getLeaderElectionExecutor();
final int submitted = submitRequests(phase, electionTerm, lastEntry, others, voteExecutor);
final Map<Long, RequestVoteRequestProto> submitted = submitRequests(
phase, electionTerm, lastEntry, others, voteExecutor);
r = waitForResults(phase, electionTerm, submitted, conf, voteExecutor);
}

Expand Down Expand Up @@ -344,14 +345,14 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException,
}
}

private int submitRequests(Phase phase, long electionTerm, TermIndex lastEntry,
private Map<Long, RequestVoteRequestProto> submitRequests(Phase phase, long electionTerm, TermIndex lastEntry,
Collection<RaftPeer> others, Executor voteExecutor) {
int submitted = 0;
final Map<Long, RequestVoteRequestProto> submitted = new HashMap<>();
for (final RaftPeer peer : others) {
final RequestVoteRequestProto r = ServerProtoUtils.toRequestVoteRequestProto(
server.getMemberId(), peer.getId(), electionTerm, lastEntry, phase == Phase.PRE_VOTE);
submitted.put(r.getServerRequest().getCallId(), r);
voteExecutor.submit(() -> server.getServerRpc().requestVote(r));
submitted++;
}
return submitted;
}
Expand All @@ -365,12 +366,12 @@ private Set<RaftPeerId> getHigherPriorityPeers(RaftConfiguration conf) {
.collect(Collectors.toSet());
}

private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitted,
private ResultAndTerm waitForResults(Phase phase, long electionTerm, Map<Long, RequestVoteRequestProto> submitted,
RaftConfigurationImpl conf, Executor voteExecutor) throws InterruptedException {
final Timestamp timeout = Timestamp.currentTime().addTime(server.getRandomElectionTimeout());
final Map<RaftPeerId, RequestVoteReplyProto> responses = new HashMap<>();
final List<Exception> exceptions = new ArrayList<>();
int waitForNum = submitted;
int waitForNum = submitted.size();
Collection<RaftPeerId> votedPeers = new ArrayList<>();
Collection<RaftPeerId> rejectedPeers = new ArrayList<>();
Set<RaftPeerId> higherPriorityPeers = getHigherPriorityPeers(conf);
Expand Down Expand Up @@ -398,6 +399,11 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt
}

final RequestVoteReplyProto r = future.get();
final RequestVoteRequestProto removed = submitted.remove(r.getServerReply().getCallId());
if (removed == null) {
continue; // submitted in previous elections; ignore it.
}

final RaftPeerId replierId = RaftPeerId.valueOf(r.getServerReply().getReplyId());
final RequestVoteReplyProto previous = responses.putIfAbsent(replierId, r);
if (previous != null) {
Expand Down

0 comments on commit f1f273f

Please sign in to comment.