From 1c9abf3e1b333f1bfbb3a0cc2f28c1c1151a439d Mon Sep 17 00:00:00 2001 From: OneSizeFitQuorum Date: Tue, 19 Sep 2023 20:04:23 +0800 Subject: [PATCH 1/2] enhance Signed-off-by: OneSizeFitQuorum --- .../apache/ratis/server/impl/RaftServerProxy.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index f93120b3ab..cfafdda3ab 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -416,6 +416,12 @@ public void close() { LOG.warn(getId() + ": Failed to shutdown implExecutor", ignored); } + try { + ConcurrentUtils.shutdownAndWait(executor); + } catch (Exception ignored) { + LOG.warn(getId() + ": Failed to shutdown executor", ignored); + } + lifeCycle.checkStateAndClose(() -> { LOG.info("{}: close", getId()); impls.close(); @@ -431,12 +437,6 @@ public void close() { } catch (IOException ignored) { LOG.warn(getId() + ": Failed to close " + SupportedDataStreamType.NETTY + " server", ignored); } - - try { - ConcurrentUtils.shutdownAndWait(executor); - } catch (Exception ignored) { - LOG.warn(getId() + ": Failed to shutdown executor", ignored); - } }); pauseMonitor.stop(); } From e78c8b710f16bc964d825c4a69eb243178ede409 Mon Sep 17 00:00:00 2001 From: OneSizeFitQuorum Date: Wed, 20 Sep 2023 14:42:21 +0800 Subject: [PATCH 2/2] fix review Signed-off-by: OneSizeFitQuorum --- .../ratis/server/impl/RaftServerProxy.java | 46 ++++++++++--------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index cfafdda3ab..7d384f601a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -49,6 +49,7 @@ import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LifeCycle; +import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.TimeDuration; @@ -120,7 +121,7 @@ public synchronized void close() { isClosed = true; ConcurrentUtils.parallelForEachAsync(map.entrySet(), entry -> close(entry.getKey(), entry.getValue()), - executor); + executor.get()); } private void close(RaftGroupId groupId, CompletableFuture future) { @@ -192,8 +193,8 @@ String toString(RaftGroupId groupId, CompletableFuture f) { private final DataStreamServerRpc dataStreamServerRpc; private final ImplMap impls = new ImplMap(); - private final ExecutorService implExecutor; - private final ExecutorService executor; + private final MemoizedSupplier implExecutor; + private final MemoizedSupplier executor; private final JvmPauseMonitor pauseMonitor; private final ThreadGroup threadGroup; @@ -213,11 +214,12 @@ String toString(RaftGroupId groupId, CompletableFuture f) { this.dataStreamServerRpc = new DataStreamServerImpl(this, parameters).getServerRpc(); - this.implExecutor = ConcurrentUtils.newSingleThreadExecutor(id + "-groupManagement"); - this.executor = ConcurrentUtils.newThreadPoolWithMax( + this.implExecutor = MemoizedSupplier.valueOf( + () -> ConcurrentUtils.newSingleThreadExecutor(id + "-groupManagement")); + this.executor = MemoizedSupplier.valueOf(() -> ConcurrentUtils.newThreadPoolWithMax( RaftServerConfigKeys.ThreadPool.proxyCached(properties), RaftServerConfigKeys.ThreadPool.proxySize(properties), - id + "-impl"); + id + "-impl")); final TimeDuration sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties); final TimeDuration rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties); @@ -253,7 +255,7 @@ void initGroups(RaftGroup group, StartupOption option) { .map(Arrays::stream).orElse(Stream.empty()) .filter(File::isDirectory) .forEach(sub -> initGroupDir(sub, shouldAdd)), - executor).join(); + executor.get()).join(); raftGroup.ifPresent(g -> addGroup(g, option)); } @@ -290,7 +292,7 @@ private CompletableFuture newRaftServerImpl(RaftGroup group, Sta } catch(IOException e) { throw new CompletionException(getId() + ": Failed to initialize server for " + group, e); } - }, implExecutor); + }, implExecutor.get()); } private static String getIdStringFrom(RaftServerRpc rpc) { @@ -399,7 +401,7 @@ public void start() throws IOException { } private void startImpl() throws IOException { - ConcurrentUtils.parallelForEachAsync(getImpls(), RaftServerImpl::start, executor).join(); + ConcurrentUtils.parallelForEachAsync(getImpls(), RaftServerImpl::start, executor.get()).join(); LOG.info("{}: start RPC server", getId()); getServerRpc().start(); @@ -410,18 +412,6 @@ private void startImpl() throws IOException { @Override public void close() { - try { - ConcurrentUtils.shutdownAndWait(implExecutor); - } catch (Exception ignored) { - LOG.warn(getId() + ": Failed to shutdown implExecutor", ignored); - } - - try { - ConcurrentUtils.shutdownAndWait(executor); - } catch (Exception ignored) { - LOG.warn(getId() + ": Failed to shutdown executor", ignored); - } - lifeCycle.checkStateAndClose(() -> { LOG.info("{}: close", getId()); impls.close(); @@ -437,6 +427,18 @@ public void close() { } catch (IOException ignored) { LOG.warn(getId() + ": Failed to close " + SupportedDataStreamType.NETTY + " server", ignored); } + + try { + ConcurrentUtils.shutdownAndWait(implExecutor.get()); + } catch (Exception ignored) { + LOG.warn(getId() + ": Failed to shutdown implExecutor", ignored); + } + + try { + ConcurrentUtils.shutdownAndWait(executor.get()); + } catch (Exception ignored) { + LOG.warn(getId() + ": Failed to shutdown executor", ignored); + } }); pauseMonitor.stop(); } @@ -510,7 +512,7 @@ private CompletableFuture groupAddAsync( throw new CompletionException(e); } return newImpl.newSuccessReply(request); - }, implExecutor) + }, implExecutor.get()) .whenComplete((raftClientReply, throwable) -> { if (throwable != null) { if (!(throwable.getCause() instanceof AlreadyExistsException)) {