diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index cfafdda3ab..7d384f601a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -49,6 +49,7 @@ import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LifeCycle; +import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.TimeDuration; @@ -120,7 +121,7 @@ public synchronized void close() { isClosed = true; ConcurrentUtils.parallelForEachAsync(map.entrySet(), entry -> close(entry.getKey(), entry.getValue()), - executor); + executor.get()); } private void close(RaftGroupId groupId, CompletableFuture future) { @@ -192,8 +193,8 @@ String toString(RaftGroupId groupId, CompletableFuture f) { private final DataStreamServerRpc dataStreamServerRpc; private final ImplMap impls = new ImplMap(); - private final ExecutorService implExecutor; - private final ExecutorService executor; + private final MemoizedSupplier implExecutor; + private final MemoizedSupplier executor; private final JvmPauseMonitor pauseMonitor; private final ThreadGroup threadGroup; @@ -213,11 +214,12 @@ String toString(RaftGroupId groupId, CompletableFuture f) { this.dataStreamServerRpc = new DataStreamServerImpl(this, parameters).getServerRpc(); - this.implExecutor = ConcurrentUtils.newSingleThreadExecutor(id + "-groupManagement"); - this.executor = ConcurrentUtils.newThreadPoolWithMax( + this.implExecutor = MemoizedSupplier.valueOf( + () -> ConcurrentUtils.newSingleThreadExecutor(id + "-groupManagement")); + this.executor = MemoizedSupplier.valueOf(() -> ConcurrentUtils.newThreadPoolWithMax( RaftServerConfigKeys.ThreadPool.proxyCached(properties), RaftServerConfigKeys.ThreadPool.proxySize(properties), - id + "-impl"); + id + "-impl")); final TimeDuration sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties); final TimeDuration rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties); @@ -253,7 +255,7 @@ void initGroups(RaftGroup group, StartupOption option) { .map(Arrays::stream).orElse(Stream.empty()) .filter(File::isDirectory) .forEach(sub -> initGroupDir(sub, shouldAdd)), - executor).join(); + executor.get()).join(); raftGroup.ifPresent(g -> addGroup(g, option)); } @@ -290,7 +292,7 @@ private CompletableFuture newRaftServerImpl(RaftGroup group, Sta } catch(IOException e) { throw new CompletionException(getId() + ": Failed to initialize server for " + group, e); } - }, implExecutor); + }, implExecutor.get()); } private static String getIdStringFrom(RaftServerRpc rpc) { @@ -399,7 +401,7 @@ public void start() throws IOException { } private void startImpl() throws IOException { - ConcurrentUtils.parallelForEachAsync(getImpls(), RaftServerImpl::start, executor).join(); + ConcurrentUtils.parallelForEachAsync(getImpls(), RaftServerImpl::start, executor.get()).join(); LOG.info("{}: start RPC server", getId()); getServerRpc().start(); @@ -410,18 +412,6 @@ private void startImpl() throws IOException { @Override public void close() { - try { - ConcurrentUtils.shutdownAndWait(implExecutor); - } catch (Exception ignored) { - LOG.warn(getId() + ": Failed to shutdown implExecutor", ignored); - } - - try { - ConcurrentUtils.shutdownAndWait(executor); - } catch (Exception ignored) { - LOG.warn(getId() + ": Failed to shutdown executor", ignored); - } - lifeCycle.checkStateAndClose(() -> { LOG.info("{}: close", getId()); impls.close(); @@ -437,6 +427,18 @@ public void close() { } catch (IOException ignored) { LOG.warn(getId() + ": Failed to close " + SupportedDataStreamType.NETTY + " server", ignored); } + + try { + ConcurrentUtils.shutdownAndWait(implExecutor.get()); + } catch (Exception ignored) { + LOG.warn(getId() + ": Failed to shutdown implExecutor", ignored); + } + + try { + ConcurrentUtils.shutdownAndWait(executor.get()); + } catch (Exception ignored) { + LOG.warn(getId() + ": Failed to shutdown executor", ignored); + } }); pauseMonitor.stop(); } @@ -510,7 +512,7 @@ private CompletableFuture groupAddAsync( throw new CompletionException(e); } return newImpl.newSuccessReply(request); - }, implExecutor) + }, implExecutor.get()) .whenComplete((raftClientReply, throwable) -> { if (throwable != null) { if (!(throwable.getCause() instanceof AlreadyExistsException)) {