Skip to content

Commit

Permalink
Fix test failures.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed May 7, 2024
1 parent f1f273f commit 5db88dc
Showing 1 changed file with 17 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -130,7 +131,6 @@ public String toString() {
}

static class Executor {
private final ExecutorCompletionService<RequestVoteReplyProto> service;
private final ExecutorService executor;

private final AtomicInteger count = new AtomicInteger();
Expand All @@ -139,19 +139,14 @@ static class Executor {
Preconditions.assertTrue(size > 0);
executor = Executors.newCachedThreadPool(r ->
Daemon.newBuilder().setName(name + "-" + count.incrementAndGet()).setRunnable(r).build());
service = new ExecutorCompletionService<>(executor);
}

ExecutorService getExecutor() {
return executor;
}

void submit(Callable<RequestVoteReplyProto> task) {
service.submit(task);
}

Future<RequestVoteReplyProto> poll(TimeDuration waitTime) throws InterruptedException {
return service.poll(waitTime.getDuration(), waitTime.getUnit());
Future<RequestVoteReplyProto> submit(Callable<RequestVoteReplyProto> task) {
return executor.submit(task);
}
}

Expand Down Expand Up @@ -290,10 +285,9 @@ private ResultAndTerm submitRequestAndWaitResult(Phase phase, RaftConfigurationI
r = new ResultAndTerm(Result.PASSED, electionTerm);
} else {
final TermIndex lastEntry = server.getState().getLastEntry();
final Executor voteExecutor = server.getLeaderElectionExecutor();
final Map<Long, RequestVoteRequestProto> submitted = submitRequests(
phase, electionTerm, lastEntry, others, voteExecutor);
r = waitForResults(phase, electionTerm, submitted, conf, voteExecutor);
final List<Future<RequestVoteReplyProto>> submitted = submitRequests(
phase, electionTerm, lastEntry, others, server.getLeaderElectionExecutor());
r = waitForResults(phase, electionTerm, submitted, conf);
}

return r;
Expand Down Expand Up @@ -345,14 +339,13 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException,
}
}

private Map<Long, RequestVoteRequestProto> submitRequests(Phase phase, long electionTerm, TermIndex lastEntry,
private List<Future<RequestVoteReplyProto>> submitRequests(Phase phase, long electionTerm, TermIndex lastEntry,
Collection<RaftPeer> others, Executor voteExecutor) {
final Map<Long, RequestVoteRequestProto> submitted = new HashMap<>();
final List<Future<RequestVoteReplyProto>> submitted = new ArrayList<>(others.size());
for (final RaftPeer peer : others) {
final RequestVoteRequestProto r = ServerProtoUtils.toRequestVoteRequestProto(
server.getMemberId(), peer.getId(), electionTerm, lastEntry, phase == Phase.PRE_VOTE);
submitted.put(r.getServerRequest().getCallId(), r);
voteExecutor.submit(() -> server.getServerRpc().requestVote(r));
submitted.add(voteExecutor.submit(() -> server.getServerRpc().requestVote(r)));
}
return submitted;
}
Expand All @@ -366,18 +359,17 @@ private Set<RaftPeerId> getHigherPriorityPeers(RaftConfiguration conf) {
.collect(Collectors.toSet());
}

private ResultAndTerm waitForResults(Phase phase, long electionTerm, Map<Long, RequestVoteRequestProto> submitted,
RaftConfigurationImpl conf, Executor voteExecutor) throws InterruptedException {
private ResultAndTerm waitForResults(Phase phase, long electionTerm, List<Future<RequestVoteReplyProto>> submitted,
RaftConfigurationImpl conf) throws InterruptedException {
final Timestamp timeout = Timestamp.currentTime().addTime(server.getRandomElectionTimeout());
final Map<RaftPeerId, RequestVoteReplyProto> responses = new HashMap<>();
final List<Exception> exceptions = new ArrayList<>();
int waitForNum = submitted.size();
Collection<RaftPeerId> votedPeers = new ArrayList<>();
Collection<RaftPeerId> rejectedPeers = new ArrayList<>();
Set<RaftPeerId> higherPriorityPeers = getHigherPriorityPeers(conf);
final boolean singleMode = conf.isSingleMode(server.getId());

while (waitForNum > 0 && shouldRun(electionTerm)) {
for(Iterator<Future<RequestVoteReplyProto>> i = submitted.iterator(); i.hasNext() && shouldRun(electionTerm); ) {
final TimeDuration waitTime = timeout.elapsedTime().apply(n -> -n);
if (waitTime.isNonPositive()) {
if (conf.hasMajority(votedPeers, server.getId())) {
Expand All @@ -393,16 +385,8 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, Map<Long, R
}

try {
final Future<RequestVoteReplyProto> future = voteExecutor.poll(waitTime);
if (future == null) {
continue; // poll timeout, continue to return Result.TIMEOUT
}

final RequestVoteReplyProto r = future.get();
final RequestVoteRequestProto removed = submitted.remove(r.getServerReply().getCallId());
if (removed == null) {
continue; // submitted in previous elections; ignore it.
}
final Future<RequestVoteReplyProto> future = i.next();
final RequestVoteReplyProto r = future.get(waitTime.getDuration(), waitTime.getUnit());

final RaftPeerId replierId = RaftPeerId.valueOf(r.getServerReply().getReplyId());
final RequestVoteReplyProto previous = responses.putIfAbsent(replierId, r);
Expand Down Expand Up @@ -446,8 +430,9 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, Map<Long, R
} catch(ExecutionException e) {
LogUtils.infoOrTrace(LOG, () -> this + " got exception when requesting votes", e);
exceptions.add(e);
} catch (TimeoutException e) {
// get timeout, continue to return Result.TIMEOUT
}
waitForNum--;
}
// received all the responses
if (conf.hasMajority(votedPeers, server.getId())) {
Expand Down

0 comments on commit 5db88dc

Please sign in to comment.