Skip to content

Commit

Permalink
Close proxies early and shutdown executors.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed Dec 1, 2023
1 parent ca54cda commit ae21773
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -255,8 +256,8 @@ public Set<ClientInvocationId> remove(ChannelId channelId) {

private final StreamMap streams = new StreamMap();
private final ChannelMap channels;
private final Executor requestExecutor;
private final Executor writeExecutor;
private final ExecutorService requestExecutor;
private final ExecutorService writeExecutor;

private final NettyServerStreamRpcMetrics nettyServerStreamRpcMetrics;

Expand All @@ -277,6 +278,11 @@ public Set<ClientInvocationId> remove(ChannelId channelId) {
this.nettyServerStreamRpcMetrics = metrics;
}

void shutdown() {
requestExecutor.shutdown();
writeExecutor.shutdown();
}

private CompletableFuture<DataStream> stream(RaftClientRequest request, StateMachine stateMachine) {
final RequestMetrics metrics = getMetrics().newRequestMetrics(RequestType.STATE_MACHINE_STREAM);
final Timekeeper.Context context = metrics.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,18 @@ public InetSocketAddress getInetSocketAddress() {

@Override
public void close() {
try {
proxies.close();
} catch (Exception e) {
LOG.error(this + ": Failed to close proxies.", e);
}

try {
requests.shutdown();
} catch (Exception e) {
LOG.error(this + ": Failed to shutdown request service.", e);
}

try {
channelFuture.channel().close().sync();
bossGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
Expand All @@ -314,7 +326,6 @@ public void close() {
LOG.error(this + ": Interrupted close()", e);
}

proxies.close();
}

@Override
Expand Down

0 comments on commit ae21773

Please sign in to comment.