Skip to content

Commit

Permalink
[ISSUE #7330] Fix channel connect issue for goaway (#7467)
Browse files Browse the repository at this point in the history
* add waitChannelFuture for goaway

* add body for retry channel
  • Loading branch information
drpmma authored Oct 16, 2023
1 parent 3a035d7 commit d73b601
Showing 1 changed file with 28 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -716,20 +716,25 @@ private Channel createChannel(final String addr) throws InterruptedException {
}

if (cw != null) {
ChannelFuture channelFuture = cw.getChannelFuture();
if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
if (cw.isOK()) {
LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
} else {
LOGGER.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString());
}
return waitChannelFuture(addr, cw);
}

return null;
}

private Channel waitChannelFuture(String addr, ChannelWrapper cw) {
ChannelFuture channelFuture = cw.getChannelFuture();
if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
if (cw.isOK()) {
LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
} else {
LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
channelFuture.toString());
LOGGER.warn("createChannel: connect remote host[{}] failed, {}", addr, channelFuture.toString());
}
} else {
LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
channelFuture.toString());
}

return null;
}

Expand Down Expand Up @@ -818,8 +823,14 @@ public CompletableFuture<ResponseFuture> invokeImpl(final Channel channel, final
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
stopwatch.stop();
RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader());
Channel retryChannel = channelWrapper.getChannel();
if (channel != retryChannel) {
retryRequest.setBody(request.getBody());
Channel retryChannel;
if (channelWrapper.isOK()) {
retryChannel = channelWrapper.getChannel();
} else {
retryChannel = waitChannelFuture(channelWrapper.getChannelAddress(), channelWrapper);
}
if (retryChannel != null && channel != retryChannel) {
return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration);
}
}
Expand Down Expand Up @@ -994,6 +1005,10 @@ public void updateLastResponseTime() {
this.lastResponseTime = System.currentTimeMillis();
}

public String getChannelAddress() {
return channelAddress;
}

public boolean reconnect() {
if (lock.writeLock().tryLock()) {
try {
Expand Down

0 comments on commit d73b601

Please sign in to comment.