Skip to content

Commit

Permalink
Revert "RATIS-2080. Reuse LeaderElection executor. (apache#1082)"
Browse files Browse the repository at this point in the history
This reverts commit 8c9c801.
  • Loading branch information
szetszwo committed May 10, 2024
1 parent 0814b89 commit 33cb586
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,15 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -131,22 +130,28 @@ public String toString() {
}

static class Executor {
private final ExecutorCompletionService<RequestVoteReplyProto> service;
private final ExecutorService executor;

private final AtomicInteger count = new AtomicInteger();

Executor(Object name, int size) {
Preconditions.assertTrue(size > 0);
executor = Executors.newCachedThreadPool(r ->
executor = Executors.newFixedThreadPool(size, r ->
Daemon.newBuilder().setName(name + "-" + count.incrementAndGet()).setRunnable(r).build());
service = new ExecutorCompletionService<>(executor);
}

ExecutorService getExecutor() {
return executor;
void shutdown() {
executor.shutdown();
}

Future<RequestVoteReplyProto> submit(Callable<RequestVoteReplyProto> task) {
return executor.submit(task);
void submit(Callable<RequestVoteReplyProto> task) {
service.submit(task);
}

Future<RequestVoteReplyProto> poll(TimeDuration waitTime) throws InterruptedException {
return service.poll(waitTime.getDuration(), waitTime.getUnit());
}
}

Expand Down Expand Up @@ -285,9 +290,13 @@ private ResultAndTerm submitRequestAndWaitResult(Phase phase, RaftConfigurationI
r = new ResultAndTerm(Result.PASSED, electionTerm);
} else {
final TermIndex lastEntry = server.getState().getLastEntry();
final List<Future<RequestVoteReplyProto>> submitted = submitRequests(
phase, electionTerm, lastEntry, others, server.getLeaderElectionExecutor());
r = waitForResults(phase, electionTerm, submitted, conf);
final Executor voteExecutor = new Executor(this, others.size());
try {
final int submitted = submitRequests(phase, electionTerm, lastEntry, others, voteExecutor);
r = waitForResults(phase, electionTerm, submitted, conf, voteExecutor);
} finally {
voteExecutor.shutdown();
}
}

return r;
Expand Down Expand Up @@ -339,13 +348,14 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException,
}
}

private List<Future<RequestVoteReplyProto>> submitRequests(Phase phase, long electionTerm, TermIndex lastEntry,
private int submitRequests(Phase phase, long electionTerm, TermIndex lastEntry,
Collection<RaftPeer> others, Executor voteExecutor) {
final List<Future<RequestVoteReplyProto>> submitted = new ArrayList<>(others.size());
int submitted = 0;
for (final RaftPeer peer : others) {
final RequestVoteRequestProto r = ServerProtoUtils.toRequestVoteRequestProto(
server.getMemberId(), peer.getId(), electionTerm, lastEntry, phase == Phase.PRE_VOTE);
submitted.add(voteExecutor.submit(() -> server.getServerRpc().requestVote(r)));
voteExecutor.submit(() -> server.getServerRpc().requestVote(r));
submitted++;
}
return submitted;
}
Expand All @@ -359,17 +369,18 @@ private Set<RaftPeerId> getHigherPriorityPeers(RaftConfiguration conf) {
.collect(Collectors.toSet());
}

private ResultAndTerm waitForResults(Phase phase, long electionTerm, List<Future<RequestVoteReplyProto>> submitted,
RaftConfigurationImpl conf) throws InterruptedException {
private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitted,
RaftConfigurationImpl conf, Executor voteExecutor) throws InterruptedException {
final Timestamp timeout = Timestamp.currentTime().addTime(server.getRandomElectionTimeout());
final Map<RaftPeerId, RequestVoteReplyProto> responses = new HashMap<>();
final List<Exception> exceptions = new ArrayList<>();
int waitForNum = submitted;
Collection<RaftPeerId> votedPeers = new ArrayList<>();
Collection<RaftPeerId> rejectedPeers = new ArrayList<>();
Set<RaftPeerId> higherPriorityPeers = getHigherPriorityPeers(conf);
final boolean singleMode = conf.isSingleMode(server.getId());

for(Iterator<Future<RequestVoteReplyProto>> i = submitted.iterator(); i.hasNext() && shouldRun(electionTerm); ) {
while (waitForNum > 0 && shouldRun(electionTerm)) {
final TimeDuration waitTime = timeout.elapsedTime().apply(n -> -n);
if (waitTime.isNonPositive()) {
if (conf.hasMajority(votedPeers, server.getId())) {
Expand All @@ -385,9 +396,12 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, List<Future
}

try {
final Future<RequestVoteReplyProto> future = i.next();
final RequestVoteReplyProto r = future.get(waitTime.getDuration(), waitTime.getUnit());
final Future<RequestVoteReplyProto> future = voteExecutor.poll(waitTime);
if (future == null) {
continue; // poll timeout, continue to return Result.TIMEOUT
}

final RequestVoteReplyProto r = future.get();
final RaftPeerId replierId = RaftPeerId.valueOf(r.getServerReply().getReplyId());
final RequestVoteReplyProto previous = responses.putIfAbsent(replierId, r);
if (previous != null) {
Expand Down Expand Up @@ -430,9 +444,8 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, List<Future
} catch(ExecutionException e) {
LogUtils.infoOrTrace(LOG, () -> this + " got exception when requesting votes", e);
exceptions.add(e);
} catch (TimeoutException e) {
// get timeout, continue to return Result.TIMEOUT
}
waitForNum--;
}
// received all the responses
if (conf.hasMajority(votedPeers, server.getId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ public long[] getFollowerNextIndices() {

private final ExecutorService serverExecutor;
private final ExecutorService clientExecutor;
private final MemoizedSupplier<LeaderElection.Executor> leaderElectionExecutor;

private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true);
private final ThreadGroup threadGroup;
Expand Down Expand Up @@ -282,17 +281,14 @@ public long[] getFollowerNextIndices() {
this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties);

final RaftGroupMemberId memberId = getMemberId();
this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax(
RaftServerConfigKeys.ThreadPool.serverCached(properties),
RaftServerConfigKeys.ThreadPool.serverSize(properties),
memberId + "-server");
id + "-server");
this.clientExecutor = ConcurrentUtils.newThreadPoolWithMax(
RaftServerConfigKeys.ThreadPool.clientCached(properties),
RaftServerConfigKeys.ThreadPool.clientSize(properties),
memberId + "-client");
this.leaderElectionExecutor = MemoizedSupplier.valueOf(
() -> new LeaderElection.Executor(memberId + "-election" , group.getPeers().size()));
id + "-client");
}

private long getCommitIndex(RaftPeerId id) {
Expand Down Expand Up @@ -538,20 +534,12 @@ public void close() {
try {
ConcurrentUtils.shutdownAndWait(clientExecutor);
} catch (Exception e) {
LOG.warn("{}: Failed to shutdown clientExecutor", getMemberId(), e);
LOG.warn(getMemberId() + ": Failed to shutdown clientExecutor", e);
}
try {
ConcurrentUtils.shutdownAndWait(serverExecutor);
} catch (Exception e) {
LOG.warn("{}: Failed to shutdown serverExecutor", getMemberId(), e);
}

if (leaderElectionExecutor.isInitialized()) {
try {
ConcurrentUtils.shutdownAndWait(leaderElectionExecutor.get().getExecutor());
} catch (Exception e) {
LOG.warn("{}: Failed to shutdown leaderElectionExecutor", getMemberId(), e);
}
LOG.warn(getMemberId() + ": Failed to shutdown serverExecutor", e);
}
});
}
Expand Down Expand Up @@ -1537,10 +1525,6 @@ ExecutorService getServerExecutor() {
return serverExecutor;
}

LeaderElection.Executor getLeaderElectionExecutor() {
return leaderElectionExecutor.get();
}

private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(RaftPeerId leaderId, long callId,
TermIndex previous, ReferenceCountedObject<AppendEntriesRequestProto> requestRef) throws IOException {
final AppendEntriesRequestProto proto = requestRef.get();
Expand Down

0 comments on commit 33cb586

Please sign in to comment.