Skip to content

Commit

Permalink
RATIS-1948. NettyClientStreamRpc.Connection.scheduleReconnect should …
Browse files Browse the repository at this point in the history
…check isClosed.
  • Loading branch information
szetszwo committed Dec 1, 2023
1 parent dd36de2 commit 011586d
Showing 1 changed file with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ private EventLoopGroup getWorkerGroup() {
}

private ChannelFuture connect() {
if (isClosed()) {
return null;
}
return new Bootstrap()
.group(getWorkerGroup())
.channel(NettyUtils.getSocketChannelClass(getWorkerGroup()))
Expand All @@ -194,6 +197,9 @@ public void operationComplete(ChannelFuture future) {
}

void scheduleReconnect(String message, Throwable cause) {
if (isClosed()) {
return;
}
LOG.warn("{}: {}; schedule reconnecting to {} in {}", this, message, address, RECONNECT);
if (cause != null) {
LOG.warn("", cause);
Expand Down Expand Up @@ -260,6 +266,8 @@ synchronized boolean write(DataStreamRequest request) {

synchronized boolean shouldFlush(boolean force, int countMin, SizeInBytes bytesMin) {
if (force || count >= countMin || bytes >= bytesMin.getSize()) {
LOG.debug("flush: force? {}, (count, bytes)=({}, {}), min=({}, {})",
force, count, bytes, countMin, bytesMin);
count = 0;
bytes = 0;
return true;
Expand Down Expand Up @@ -329,9 +337,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (!connection.isClosed()) {
connection.scheduleReconnect("channel is inactive", null);
}
connection.scheduleReconnect("channel is inactive", null);
}
};
}
Expand Down Expand Up @@ -449,7 +455,6 @@ public CompletableFuture<DataStreamReply> streamAsync(DataStreamRequest request)
@Override
public void close() {
final boolean flush = outstandingRequests.shouldFlush(true, 0, SizeInBytes.ZERO);
LOG.debug("flush? {}", flush);
if (flush) {
Optional.ofNullable(connection.getChannelUninterruptibly())
.map(c -> c.writeAndFlush(EMPTY_BYTE_BUFFER))
Expand Down

0 comments on commit 011586d

Please sign in to comment.