From 5db88dc9631f14c772f495fa01a71cfc05803be0 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Tue, 7 May 2024 16:40:33 -0700 Subject: [PATCH] Fix test failures. --- .../ratis/server/impl/LeaderElection.java | 49 +++++++------------ 1 file changed, 17 insertions(+), 32 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index 34deb66748..2f672c17ea 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -42,15 +42,16 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.Set; import java.util.stream.Collectors; @@ -130,7 +131,6 @@ public String toString() { } static class Executor { - private final ExecutorCompletionService service; private final ExecutorService executor; private final AtomicInteger count = new AtomicInteger(); @@ -139,19 +139,14 @@ static class Executor { Preconditions.assertTrue(size > 0); executor = Executors.newCachedThreadPool(r -> Daemon.newBuilder().setName(name + "-" + count.incrementAndGet()).setRunnable(r).build()); - service = new ExecutorCompletionService<>(executor); } ExecutorService getExecutor() { return executor; } - void submit(Callable task) { - service.submit(task); - } - - Future poll(TimeDuration waitTime) throws InterruptedException { - return service.poll(waitTime.getDuration(), waitTime.getUnit()); + Future submit(Callable task) { + return executor.submit(task); } } @@ -290,10 +285,9 @@ private ResultAndTerm submitRequestAndWaitResult(Phase phase, RaftConfigurationI r = new ResultAndTerm(Result.PASSED, electionTerm); } else { final TermIndex lastEntry = server.getState().getLastEntry(); - final Executor voteExecutor = server.getLeaderElectionExecutor(); - final Map submitted = submitRequests( - phase, electionTerm, lastEntry, others, voteExecutor); - r = waitForResults(phase, electionTerm, submitted, conf, voteExecutor); + final List> submitted = submitRequests( + phase, electionTerm, lastEntry, others, server.getLeaderElectionExecutor()); + r = waitForResults(phase, electionTerm, submitted, conf); } return r; @@ -345,14 +339,13 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException, } } - private Map submitRequests(Phase phase, long electionTerm, TermIndex lastEntry, + private List> submitRequests(Phase phase, long electionTerm, TermIndex lastEntry, Collection others, Executor voteExecutor) { - final Map submitted = new HashMap<>(); + final List> submitted = new ArrayList<>(others.size()); for (final RaftPeer peer : others) { final RequestVoteRequestProto r = ServerProtoUtils.toRequestVoteRequestProto( server.getMemberId(), peer.getId(), electionTerm, lastEntry, phase == Phase.PRE_VOTE); - submitted.put(r.getServerRequest().getCallId(), r); - voteExecutor.submit(() -> server.getServerRpc().requestVote(r)); + submitted.add(voteExecutor.submit(() -> server.getServerRpc().requestVote(r))); } return submitted; } @@ -366,18 +359,17 @@ private Set getHigherPriorityPeers(RaftConfiguration conf) { .collect(Collectors.toSet()); } - private ResultAndTerm waitForResults(Phase phase, long electionTerm, Map submitted, - RaftConfigurationImpl conf, Executor voteExecutor) throws InterruptedException { + private ResultAndTerm waitForResults(Phase phase, long electionTerm, List> submitted, + RaftConfigurationImpl conf) throws InterruptedException { final Timestamp timeout = Timestamp.currentTime().addTime(server.getRandomElectionTimeout()); final Map responses = new HashMap<>(); final List exceptions = new ArrayList<>(); - int waitForNum = submitted.size(); Collection votedPeers = new ArrayList<>(); Collection rejectedPeers = new ArrayList<>(); Set higherPriorityPeers = getHigherPriorityPeers(conf); final boolean singleMode = conf.isSingleMode(server.getId()); - while (waitForNum > 0 && shouldRun(electionTerm)) { + for(Iterator> i = submitted.iterator(); i.hasNext() && shouldRun(electionTerm); ) { final TimeDuration waitTime = timeout.elapsedTime().apply(n -> -n); if (waitTime.isNonPositive()) { if (conf.hasMajority(votedPeers, server.getId())) { @@ -393,16 +385,8 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, Map future = voteExecutor.poll(waitTime); - if (future == null) { - continue; // poll timeout, continue to return Result.TIMEOUT - } - - final RequestVoteReplyProto r = future.get(); - final RequestVoteRequestProto removed = submitted.remove(r.getServerReply().getCallId()); - if (removed == null) { - continue; // submitted in previous elections; ignore it. - } + final Future future = i.next(); + final RequestVoteReplyProto r = future.get(waitTime.getDuration(), waitTime.getUnit()); final RaftPeerId replierId = RaftPeerId.valueOf(r.getServerReply().getReplyId()); final RequestVoteReplyProto previous = responses.putIfAbsent(replierId, r); @@ -446,8 +430,9 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, Map this + " got exception when requesting votes", e); exceptions.add(e); + } catch (TimeoutException e) { + // get timeout, continue to return Result.TIMEOUT } - waitForNum--; } // received all the responses if (conf.hasMajority(votedPeers, server.getId())) {