Skip to content

Commit

Permalink
fix review
Browse files Browse the repository at this point in the history
Signed-off-by: OneSizeFitQuorum <[email protected]>
  • Loading branch information
OneSizeFitsQuorum committed Sep 20, 2023
1 parent 1c9abf3 commit e78c8b7
Showing 1 changed file with 24 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.TimeDuration;
Expand Down Expand Up @@ -120,7 +121,7 @@ public synchronized void close() {
isClosed = true;
ConcurrentUtils.parallelForEachAsync(map.entrySet(),
entry -> close(entry.getKey(), entry.getValue()),
executor);
executor.get());
}

private void close(RaftGroupId groupId, CompletableFuture<RaftServerImpl> future) {
Expand Down Expand Up @@ -192,8 +193,8 @@ String toString(RaftGroupId groupId, CompletableFuture<RaftServerImpl> f) {
private final DataStreamServerRpc dataStreamServerRpc;

private final ImplMap impls = new ImplMap();
private final ExecutorService implExecutor;
private final ExecutorService executor;
private final MemoizedSupplier<ExecutorService> implExecutor;
private final MemoizedSupplier<ExecutorService> executor;

private final JvmPauseMonitor pauseMonitor;
private final ThreadGroup threadGroup;
Expand All @@ -213,11 +214,12 @@ String toString(RaftGroupId groupId, CompletableFuture<RaftServerImpl> f) {

this.dataStreamServerRpc = new DataStreamServerImpl(this, parameters).getServerRpc();

this.implExecutor = ConcurrentUtils.newSingleThreadExecutor(id + "-groupManagement");
this.executor = ConcurrentUtils.newThreadPoolWithMax(
this.implExecutor = MemoizedSupplier.valueOf(
() -> ConcurrentUtils.newSingleThreadExecutor(id + "-groupManagement"));
this.executor = MemoizedSupplier.valueOf(() -> ConcurrentUtils.newThreadPoolWithMax(
RaftServerConfigKeys.ThreadPool.proxyCached(properties),
RaftServerConfigKeys.ThreadPool.proxySize(properties),
id + "-impl");
id + "-impl"));

final TimeDuration sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties);
final TimeDuration rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties);
Expand Down Expand Up @@ -253,7 +255,7 @@ void initGroups(RaftGroup group, StartupOption option) {
.map(Arrays::stream).orElse(Stream.empty())
.filter(File::isDirectory)
.forEach(sub -> initGroupDir(sub, shouldAdd)),
executor).join();
executor.get()).join();
raftGroup.ifPresent(g -> addGroup(g, option));
}

Expand Down Expand Up @@ -290,7 +292,7 @@ private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group, Sta
} catch(IOException e) {
throw new CompletionException(getId() + ": Failed to initialize server for " + group, e);
}
}, implExecutor);
}, implExecutor.get());
}

private static String getIdStringFrom(RaftServerRpc rpc) {
Expand Down Expand Up @@ -399,7 +401,7 @@ public void start() throws IOException {
}

private void startImpl() throws IOException {
ConcurrentUtils.parallelForEachAsync(getImpls(), RaftServerImpl::start, executor).join();
ConcurrentUtils.parallelForEachAsync(getImpls(), RaftServerImpl::start, executor.get()).join();

LOG.info("{}: start RPC server", getId());
getServerRpc().start();
Expand All @@ -410,18 +412,6 @@ private void startImpl() throws IOException {

@Override
public void close() {
try {
ConcurrentUtils.shutdownAndWait(implExecutor);
} catch (Exception ignored) {
LOG.warn(getId() + ": Failed to shutdown implExecutor", ignored);
}

try {
ConcurrentUtils.shutdownAndWait(executor);
} catch (Exception ignored) {
LOG.warn(getId() + ": Failed to shutdown executor", ignored);
}

lifeCycle.checkStateAndClose(() -> {
LOG.info("{}: close", getId());
impls.close();
Expand All @@ -437,6 +427,18 @@ public void close() {
} catch (IOException ignored) {
LOG.warn(getId() + ": Failed to close " + SupportedDataStreamType.NETTY + " server", ignored);
}

try {
ConcurrentUtils.shutdownAndWait(implExecutor.get());
} catch (Exception ignored) {
LOG.warn(getId() + ": Failed to shutdown implExecutor", ignored);
}

try {
ConcurrentUtils.shutdownAndWait(executor.get());
} catch (Exception ignored) {
LOG.warn(getId() + ": Failed to shutdown executor", ignored);
}
});
pauseMonitor.stop();
}
Expand Down Expand Up @@ -510,7 +512,7 @@ private CompletableFuture<RaftClientReply> groupAddAsync(
throw new CompletionException(e);
}
return newImpl.newSuccessReply(request);
}, implExecutor)
}, implExecutor.get())
.whenComplete((raftClientReply, throwable) -> {
if (throwable != null) {
if (!(throwable.getCause() instanceof AlreadyExistsException)) {
Expand Down

0 comments on commit e78c8b7

Please sign in to comment.