From 1f10ce02aecef1e5ad8ae74d95dcc3b529328684 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Tue, 7 May 2024 14:19:01 -0700 Subject: [PATCH] iRATIS-2080. Reuse LeaderElection executor. --- .../ratis/server/impl/LeaderElection.java | 16 +++++-------- .../ratis/server/impl/RaftServerImpl.java | 24 +++++++++++++++---- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index d738c87578..11352f0efb 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -137,13 +137,13 @@ static class Executor { Executor(Object name, int size) { Preconditions.assertTrue(size > 0); - executor = Executors.newFixedThreadPool(size, r -> + executor = Executors.newCachedThreadPool(r -> Daemon.newBuilder().setName(name + "-" + count.incrementAndGet()).setRunnable(r).build()); service = new ExecutorCompletionService<>(executor); } - void shutdown() { - executor.shutdown(); + ExecutorService getExecutor() { + return executor; } void submit(Callable task) { @@ -290,13 +290,9 @@ private ResultAndTerm submitRequestAndWaitResult(Phase phase, RaftConfigurationI r = new ResultAndTerm(Result.PASSED, electionTerm); } else { final TermIndex lastEntry = server.getState().getLastEntry(); - final Executor voteExecutor = new Executor(this, others.size()); - try { - final int submitted = submitRequests(phase, electionTerm, lastEntry, others, voteExecutor); - r = waitForResults(phase, electionTerm, submitted, conf, voteExecutor); - } finally { - voteExecutor.shutdown(); - } + final Executor voteExecutor = server.getLeaderElectionExecutor(); + final int submitted = submitRequests(phase, electionTerm, lastEntry, others, voteExecutor); + r = waitForResults(phase, electionTerm, submitted, conf, voteExecutor); } return r; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 4512a2c223..c6a27ba8c0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -243,6 +243,7 @@ public long[] getFollowerNextIndices() { private final ExecutorService serverExecutor; private final ExecutorService clientExecutor; + private final MemoizedSupplier leaderElectionExecutor; private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true); private final ThreadGroup threadGroup; @@ -281,14 +282,17 @@ public long[] getFollowerNextIndices() { this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this); this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties); + final RaftGroupMemberId memberId = getMemberId(); this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax( RaftServerConfigKeys.ThreadPool.serverCached(properties), RaftServerConfigKeys.ThreadPool.serverSize(properties), - id + "-server"); + memberId + "-server"); this.clientExecutor = ConcurrentUtils.newThreadPoolWithMax( RaftServerConfigKeys.ThreadPool.clientCached(properties), RaftServerConfigKeys.ThreadPool.clientSize(properties), - id + "-client"); + memberId + "-client"); + this.leaderElectionExecutor = MemoizedSupplier.valueOf( + () -> new LeaderElection.Executor(memberId + "-election" , group.getPeers().size())); } private long getCommitIndex(RaftPeerId id) { @@ -534,12 +538,20 @@ public void close() { try { ConcurrentUtils.shutdownAndWait(clientExecutor); } catch (Exception e) { - LOG.warn(getMemberId() + ": Failed to shutdown clientExecutor", e); + LOG.warn("{}: Failed to shutdown clientExecutor", getMemberId(), e); } try { ConcurrentUtils.shutdownAndWait(serverExecutor); } catch (Exception e) { - LOG.warn(getMemberId() + ": Failed to shutdown serverExecutor", e); + LOG.warn("{}: Failed to shutdown serverExecutor", getMemberId(), e); + } + + if (leaderElectionExecutor.isInitialized()) { + try { + ConcurrentUtils.shutdownAndWait(leaderElectionExecutor.get().getExecutor()); + } catch (Exception e) { + LOG.warn("{}: Failed to shutdown leaderElectionExecutor", getMemberId(), e); + } } }); } @@ -1525,6 +1537,10 @@ ExecutorService getServerExecutor() { return serverExecutor; } + LeaderElection.Executor getLeaderElectionExecutor() { + return leaderElectionExecutor.get(); + } + private CompletableFuture appendEntriesAsync(RaftPeerId leaderId, long callId, TermIndex previous, ReferenceCountedObject requestRef) throws IOException { final AppendEntriesRequestProto proto = requestRef.get();