Skip to content

Commit

Permalink
Revert "RATIS-1892 Unify the lifetime of the RaftServerProxy thread p…
Browse files Browse the repository at this point in the history
…ool (apache#923)"

This reverts commit ba4ed31.
  • Loading branch information
adoroszlai committed May 18, 2024
1 parent 8c7c444 commit d5a9c7d
Showing 1 changed file with 17 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.ReferenceCountedObject;
Expand Down Expand Up @@ -122,7 +121,7 @@ public synchronized void close() {
isClosed = true;
ConcurrentUtils.parallelForEachAsync(map.entrySet(),
entry -> close(entry.getKey(), entry.getValue()),
executor.get());
executor);
}

private void close(RaftGroupId groupId, CompletableFuture<RaftServerImpl> future) {
Expand Down Expand Up @@ -195,8 +194,8 @@ String toString(RaftGroupId groupId, CompletableFuture<RaftServerImpl> f) {
private final DataStreamServerRpc dataStreamServerRpc;

private final ImplMap impls = new ImplMap();
private final MemoizedSupplier<ExecutorService> implExecutor;
private final MemoizedSupplier<ExecutorService> executor;
private final ExecutorService implExecutor;
private final ExecutorService executor;

private final JvmPauseMonitor pauseMonitor;
private final ThreadGroup threadGroup;
Expand All @@ -216,12 +215,11 @@ String toString(RaftGroupId groupId, CompletableFuture<RaftServerImpl> f) {

this.dataStreamServerRpc = new DataStreamServerImpl(this, parameters).getServerRpc();

this.implExecutor = MemoizedSupplier.valueOf(
() -> ConcurrentUtils.newSingleThreadExecutor(id + "-groupManagement"));
this.executor = MemoizedSupplier.valueOf(() -> ConcurrentUtils.newThreadPoolWithMax(
this.implExecutor = ConcurrentUtils.newSingleThreadExecutor(id + "-groupManagement");
this.executor = ConcurrentUtils.newThreadPoolWithMax(
RaftServerConfigKeys.ThreadPool.proxyCached(properties),
RaftServerConfigKeys.ThreadPool.proxySize(properties),
id + "-impl"));
id + "-impl");

final TimeDuration sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties);
final TimeDuration closeThreshold = RaftServerConfigKeys.closeThreshold(properties);
Expand Down Expand Up @@ -257,7 +255,7 @@ void initGroups(RaftGroup group, StartupOption option) {
.map(Arrays::stream).orElse(Stream.empty())
.filter(File::isDirectory)
.forEach(sub -> initGroupDir(sub, shouldAdd)),
executor.get()).join();
executor).join();
raftGroup.ifPresent(g -> addGroup(g, option));
}

Expand Down Expand Up @@ -294,7 +292,7 @@ private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group, Sta
} catch(IOException e) {
throw new CompletionException(getId() + ": Failed to initialize server for " + group, e);
}
}, implExecutor.get());
}, implExecutor);
}

private static String getIdStringFrom(RaftServerRpc rpc) {
Expand Down Expand Up @@ -403,7 +401,7 @@ public void start() throws IOException {
}

private void startImpl() throws IOException {
ConcurrentUtils.parallelForEachAsync(getImpls(), RaftServerImpl::start, executor.get()).join();
ConcurrentUtils.parallelForEachAsync(getImpls(), RaftServerImpl::start, executor).join();

LOG.info("{}: start RPC server", getId());
getServerRpc().start();
Expand All @@ -414,6 +412,12 @@ private void startImpl() throws IOException {

@Override
public void close() {
try {
ConcurrentUtils.shutdownAndWait(implExecutor);
} catch (Exception ignored) {
LOG.warn(getId() + ": Failed to shutdown implExecutor", ignored);
}

lifeCycle.checkStateAndClose(() -> {
LOG.info("{}: close", getId());
impls.close();
Expand All @@ -431,13 +435,7 @@ public void close() {
}

try {
ConcurrentUtils.shutdownAndWait(implExecutor.get());
} catch (Exception ignored) {
LOG.warn(getId() + ": Failed to shutdown implExecutor", ignored);
}

try {
ConcurrentUtils.shutdownAndWait(executor.get());
ConcurrentUtils.shutdownAndWait(executor);
} catch (Exception ignored) {
LOG.warn(getId() + ": Failed to shutdown executor", ignored);
}
Expand Down Expand Up @@ -520,7 +518,7 @@ private CompletableFuture<RaftClientReply> groupAddAsync(
throw new CompletionException(e);
}
return newImpl.newSuccessReply(request);
}, implExecutor.get())
}, implExecutor)
.whenComplete((raftClientReply, throwable) -> {
if (throwable != null) {
if (!(throwable.getCause() instanceof AlreadyExistsException)) {
Expand Down

0 comments on commit d5a9c7d

Please sign in to comment.