Skip to content

Commit

Permalink
iRATIS-2080. Reuse LeaderElection executor.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed May 7, 2024
1 parent ac05d64 commit 1f10ce0
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,13 @@ static class Executor {

Executor(Object name, int size) {
Preconditions.assertTrue(size > 0);
executor = Executors.newFixedThreadPool(size, r ->
executor = Executors.newCachedThreadPool(r ->
Daemon.newBuilder().setName(name + "-" + count.incrementAndGet()).setRunnable(r).build());
service = new ExecutorCompletionService<>(executor);
}

void shutdown() {
executor.shutdown();
ExecutorService getExecutor() {
return executor;
}

void submit(Callable<RequestVoteReplyProto> task) {
Expand Down Expand Up @@ -290,13 +290,9 @@ private ResultAndTerm submitRequestAndWaitResult(Phase phase, RaftConfigurationI
r = new ResultAndTerm(Result.PASSED, electionTerm);
} else {
final TermIndex lastEntry = server.getState().getLastEntry();
final Executor voteExecutor = new Executor(this, others.size());
try {
final int submitted = submitRequests(phase, electionTerm, lastEntry, others, voteExecutor);
r = waitForResults(phase, electionTerm, submitted, conf, voteExecutor);
} finally {
voteExecutor.shutdown();
}
final Executor voteExecutor = server.getLeaderElectionExecutor();
final int submitted = submitRequests(phase, electionTerm, lastEntry, others, voteExecutor);
r = waitForResults(phase, electionTerm, submitted, conf, voteExecutor);
}

return r;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ public long[] getFollowerNextIndices() {

private final ExecutorService serverExecutor;
private final ExecutorService clientExecutor;
private final MemoizedSupplier<LeaderElection.Executor> leaderElectionExecutor;

private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true);
private final ThreadGroup threadGroup;
Expand Down Expand Up @@ -281,14 +282,17 @@ public long[] getFollowerNextIndices() {
this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties);

final RaftGroupMemberId memberId = getMemberId();
this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax(
RaftServerConfigKeys.ThreadPool.serverCached(properties),
RaftServerConfigKeys.ThreadPool.serverSize(properties),
id + "-server");
memberId + "-server");
this.clientExecutor = ConcurrentUtils.newThreadPoolWithMax(
RaftServerConfigKeys.ThreadPool.clientCached(properties),
RaftServerConfigKeys.ThreadPool.clientSize(properties),
id + "-client");
memberId + "-client");
this.leaderElectionExecutor = MemoizedSupplier.valueOf(
() -> new LeaderElection.Executor(memberId + "-election" , group.getPeers().size()));
}

private long getCommitIndex(RaftPeerId id) {
Expand Down Expand Up @@ -534,12 +538,20 @@ public void close() {
try {
ConcurrentUtils.shutdownAndWait(clientExecutor);
} catch (Exception e) {
LOG.warn(getMemberId() + ": Failed to shutdown clientExecutor", e);
LOG.warn("{}: Failed to shutdown clientExecutor", getMemberId(), e);
}
try {
ConcurrentUtils.shutdownAndWait(serverExecutor);
} catch (Exception e) {
LOG.warn(getMemberId() + ": Failed to shutdown serverExecutor", e);
LOG.warn("{}: Failed to shutdown serverExecutor", getMemberId(), e);
}

if (leaderElectionExecutor.isInitialized()) {
try {
ConcurrentUtils.shutdownAndWait(leaderElectionExecutor.get().getExecutor());
} catch (Exception e) {
LOG.warn("{}: Failed to shutdown leaderElectionExecutor", getMemberId(), e);
}
}
});
}
Expand Down Expand Up @@ -1525,6 +1537,10 @@ ExecutorService getServerExecutor() {
return serverExecutor;
}

LeaderElection.Executor getLeaderElectionExecutor() {
return leaderElectionExecutor.get();
}

private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(RaftPeerId leaderId, long callId,
TermIndex previous, ReferenceCountedObject<AppendEntriesRequestProto> requestRef) throws IOException {
final AppendEntriesRequestProto proto = requestRef.get();
Expand Down

0 comments on commit 1f10ce0

Please sign in to comment.