diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index e5e74e991c..2cd3bc1938 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -49,7 +49,6 @@ import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LifeCycle; -import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.ReferenceCountedObject; @@ -122,7 +121,7 @@ public synchronized void close() { isClosed = true; ConcurrentUtils.parallelForEachAsync(map.entrySet(), entry -> close(entry.getKey(), entry.getValue()), - executor.get()); + executor); } private void close(RaftGroupId groupId, CompletableFuture future) { @@ -195,8 +194,8 @@ String toString(RaftGroupId groupId, CompletableFuture f) { private final DataStreamServerRpc dataStreamServerRpc; private final ImplMap impls = new ImplMap(); - private final MemoizedSupplier implExecutor; - private final MemoizedSupplier executor; + private final ExecutorService implExecutor; + private final ExecutorService executor; private final JvmPauseMonitor pauseMonitor; private final ThreadGroup threadGroup; @@ -216,12 +215,11 @@ String toString(RaftGroupId groupId, CompletableFuture f) { this.dataStreamServerRpc = new DataStreamServerImpl(this, parameters).getServerRpc(); - this.implExecutor = MemoizedSupplier.valueOf( - () -> ConcurrentUtils.newSingleThreadExecutor(id + "-groupManagement")); - this.executor = MemoizedSupplier.valueOf(() -> ConcurrentUtils.newThreadPoolWithMax( + this.implExecutor = ConcurrentUtils.newSingleThreadExecutor(id + "-groupManagement"); + this.executor = ConcurrentUtils.newThreadPoolWithMax( RaftServerConfigKeys.ThreadPool.proxyCached(properties), RaftServerConfigKeys.ThreadPool.proxySize(properties), - id + "-impl")); + id + "-impl"); final TimeDuration sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties); final TimeDuration closeThreshold = RaftServerConfigKeys.closeThreshold(properties); @@ -257,7 +255,7 @@ void initGroups(RaftGroup group, StartupOption option) { .map(Arrays::stream).orElse(Stream.empty()) .filter(File::isDirectory) .forEach(sub -> initGroupDir(sub, shouldAdd)), - executor.get()).join(); + executor).join(); raftGroup.ifPresent(g -> addGroup(g, option)); } @@ -294,7 +292,7 @@ private CompletableFuture newRaftServerImpl(RaftGroup group, Sta } catch(IOException e) { throw new CompletionException(getId() + ": Failed to initialize server for " + group, e); } - }, implExecutor.get()); + }, implExecutor); } private static String getIdStringFrom(RaftServerRpc rpc) { @@ -403,7 +401,7 @@ public void start() throws IOException { } private void startImpl() throws IOException { - ConcurrentUtils.parallelForEachAsync(getImpls(), RaftServerImpl::start, executor.get()).join(); + ConcurrentUtils.parallelForEachAsync(getImpls(), RaftServerImpl::start, executor).join(); LOG.info("{}: start RPC server", getId()); getServerRpc().start(); @@ -414,6 +412,12 @@ private void startImpl() throws IOException { @Override public void close() { + try { + ConcurrentUtils.shutdownAndWait(implExecutor); + } catch (Exception ignored) { + LOG.warn(getId() + ": Failed to shutdown implExecutor", ignored); + } + lifeCycle.checkStateAndClose(() -> { LOG.info("{}: close", getId()); impls.close(); @@ -431,13 +435,7 @@ public void close() { } try { - ConcurrentUtils.shutdownAndWait(implExecutor.get()); - } catch (Exception ignored) { - LOG.warn(getId() + ": Failed to shutdown implExecutor", ignored); - } - - try { - ConcurrentUtils.shutdownAndWait(executor.get()); + ConcurrentUtils.shutdownAndWait(executor); } catch (Exception ignored) { LOG.warn(getId() + ": Failed to shutdown executor", ignored); } @@ -520,7 +518,7 @@ private CompletableFuture groupAddAsync( throw new CompletionException(e); } return newImpl.newSuccessReply(request); - }, implExecutor.get()) + }, implExecutor) .whenComplete((raftClientReply, throwable) -> { if (throwable != null) { if (!(throwable.getCause() instanceof AlreadyExistsException)) {