diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java index dc3e69c731..3502d5ed97 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java @@ -198,11 +198,6 @@ private static final class DefaultH2ClientParentConnection extends AbstractH2Par maxConcurrencyPublisher.ignoreElements().subscribe(); } - @Override - boolean hasSubscriber() { - return subscriber != null; - } - @Override void tryCompleteSubscriber() { if (subscriber != null) { @@ -214,12 +209,15 @@ void tryCompleteSubscriber() { } @Override - void tryFailSubscriber(Throwable cause) { + boolean tryFailSubscriber(Throwable cause) { if (subscriber != null) { close(parentContext.nettyChannel(), cause); Subscriber subscriberCopy = subscriber; subscriber = null; subscriberCopy.onError(cause); + return true; + } else { + return false; } } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java index d041778814..b45303ef31 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java @@ -25,11 +25,11 @@ import io.servicetalk.transport.api.ConnectionContext; import io.servicetalk.transport.api.ConnectionObserver; import io.servicetalk.transport.api.SslConfig; +import io.servicetalk.transport.netty.internal.ChannelCloseUtils; import io.servicetalk.transport.netty.internal.FlushStrategy; import io.servicetalk.transport.netty.internal.FlushStrategyHolder; import io.servicetalk.transport.netty.internal.NettyChannelListenableAsyncCloseable; import io.servicetalk.transport.netty.internal.NettyConnectionContext; -import io.servicetalk.transport.netty.internal.NoopTransportObserver.NoopConnectionObserver; import io.servicetalk.transport.netty.internal.StacklessClosedChannelException; import io.netty.channel.Channel; @@ -44,6 +44,8 @@ import io.netty.handler.codec.http2.Http2StreamChannel; import io.netty.handler.ssl.SslCloseCompletionEvent; import io.netty.handler.ssl.SslHandshakeCompletionEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.SocketAddress; import java.net.SocketOption; @@ -56,12 +58,14 @@ import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_2_0; import static io.servicetalk.http.netty.HttpExecutionContextUtils.channelExecutionContext; import static io.servicetalk.http.netty.NettyHttp2ExceptionUtils.wrapIfNecessary; -import static io.servicetalk.transport.netty.internal.ChannelCloseUtils.assignConnectionError; import static io.servicetalk.transport.netty.internal.NettyPipelineSslUtils.extractSslSession; import static io.servicetalk.transport.netty.internal.SocketOptionUtils.getOption; class H2ParentConnectionContext extends NettyChannelListenableAsyncCloseable implements NettyConnectionContext, HttpConnectionContext { + + private static final Logger LOGGER = LoggerFactory.getLogger(H2ParentConnectionContext.class); + final FlushStrategyHolder flushStrategyHolder; private final HttpExecutionContext executionContext; private final SingleSource.Processor transportError = newSingleProcessor(); @@ -162,7 +166,11 @@ protected final void doCloseAsyncGracefully() { }, true); } - private void notifyOnClosingImpl() { // For access from AbstractH2ParentConnection + /** + * This method is required to access notifyOnClosing() from AbstractH2ParentConnection, because usage of + * {@code parentContext.notifyOnClosing()} directly triggers {@link java.lang.IllegalAccessError}. + */ + private void notifyOnClosingImpl() { notifyOnClosing(); } @@ -191,11 +199,9 @@ abstract static class AbstractH2ParentConnection extends ChannelInboundHandlerAd this.observer = observer; } - abstract boolean hasSubscriber(); - abstract void tryCompleteSubscriber(); - abstract void tryFailSubscriber(Throwable cause); + abstract boolean tryFailSubscriber(Throwable cause); /** * Receive a settings frame and optionally handle the acknowledgement of the frame. @@ -238,19 +244,22 @@ public final void handlerRemoved(ChannelHandlerContext ctx) { private void doChannelClosed(final String method) { parentContext.notifyOnClosingImpl(); - if (hasSubscriber()) { - tryFailSubscriber(StacklessClosedChannelException.newInstance(H2ParentConnectionContext.class, method)); - } + tryFailSubscriber(StacklessClosedChannelException.newInstance(H2ParentConnectionContext.class, method)); parentContext.keepAliveManager.channelClosed(); } @Override public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // The parent channel will be closed in case of exception after the cause is propagated to subscriber. + // In case users don't have offloading, there is a risk to retry on the same IO thread. + // We should notify LoadBalancer that this connection is closing to avoid retrying on the same connection. + parentContext.notifyOnClosingImpl(); cause = wrapIfNecessary(cause); - if (observer != NoopConnectionObserver.INSTANCE) { - assignConnectionError(ctx.channel(), cause); - } parentContext.transportError.onSuccess(cause); + if (!tryFailSubscriber(cause)) { + LOGGER.debug("{} closing h2 parent channel on exception caught", parentContext.nettyChannel(), cause); + ChannelCloseUtils.close(ctx, cause); + } } @Override diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java index 2e6551500a..b33c291027 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java @@ -220,11 +220,6 @@ private static final class DefaultH2ServerParentConnection extends AbstractH2Par this.subscriber = requireNonNull(subscriber); } - @Override - boolean hasSubscriber() { - return subscriber != null; - } - @Override void tryCompleteSubscriber() { if (subscriber != null) { @@ -236,12 +231,15 @@ void tryCompleteSubscriber() { } @Override - void tryFailSubscriber(Throwable cause) { + boolean tryFailSubscriber(Throwable cause) { if (subscriber != null) { ChannelCloseUtils.close(parentContext.nettyChannel(), cause); Subscriber subscriberCopy = subscriber; subscriber = null; subscriberCopy.onError(cause); + return true; + } else { + return false; } } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionClosedAfterIoExceptionTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionClosedAfterIoExceptionTest.java index 1a5fe76b0f..a8aa15064d 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionClosedAfterIoExceptionTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionClosedAfterIoExceptionTest.java @@ -28,6 +28,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -37,6 +38,7 @@ import java.nio.channels.ClosedChannelException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -71,9 +73,11 @@ class ConnectionClosedAfterIoExceptionTest { void test(HttpProtocol protocol) throws Exception { AtomicReference firstConnection = new AtomicReference<>(); BlockingQueue errors = new LinkedBlockingQueue<>(); + AtomicBoolean firstAttempt = new AtomicBoolean(true); try (ServerContext serverContext = BuilderUtils.newServerBuilder(SERVER_CTX, protocol) // Fail only the first connect attempt - .appendEarlyConnectionAcceptor(conn -> errors.isEmpty() ? failed(DELIBERATE_EXCEPTION) : completed()) + .appendEarlyConnectionAcceptor(conn -> firstAttempt.compareAndSet(true, false) ? + failed(DELIBERATE_EXCEPTION) : completed()) .listenBlockingAndAwait((ctx, request, responseFactory) -> responseFactory.ok()); BlockingHttpClient client = BuilderUtils.newClientBuilder(serverContext, CLIENT_CTX, protocol) .appendConnectionFactoryFilter(original -> new DelegatingConnectionFactory newConnection(InetSocketAddress } }) .appendClientFilter(new RetryingHttpRequesterFilter.Builder() + .retryRetryableExceptions((metaData, t) -> { + errors.add((Throwable) t); + return BackOffPolicy.ofImmediateBounded(); + }) .retryOther((metaData, t) -> { errors.add(t); return BackOffPolicy.ofImmediateBounded(); @@ -93,8 +101,10 @@ public Single newConnection(InetSocketAddress .build()) .buildBlocking()) { - HttpResponse response = client.request(client.get("/")); - assertThat(response.status(), is(OK)); + Assertions.assertDoesNotThrow(() -> { + HttpResponse response = client.request(client.get("/")); + assertThat(response.status(), is(OK)); + }); assertThat("Unexpected number of errors, likely retried more than expected", errors, hasSize(1)); assertThat("Did not propagate original IoException", errors.poll(), diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java index a41602bd61..1d80cb5949 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java @@ -1350,7 +1350,7 @@ public Single handle(final HttpServiceContext ctx, } // We expect this to timeout, because we have not completed the outstanding request. - assertFalse(onServerCloseLatch.await(30, MILLISECONDS)); + assertFalse(onServerCloseLatch.await(300, MILLISECONDS)); requestBody.onComplete(); diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringRoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringRoundRobinLoadBalancerTest.java index ce06d7a6ea..a83f809406 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringRoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringRoundRobinLoadBalancerTest.java @@ -135,6 +135,7 @@ void expiringAHostDoesntRaceWithConnectionAdding() throws Exception { return null; }).toFuture(); + // Note that this will send a `EXPIRED` event and the host will remain, but be marked unhealthy. sendServiceDiscoveryEvents(downEvent("address-1")); f.get(); @@ -157,7 +158,10 @@ void expiringAHostDoesntRaceWithConnectionAdding() throws Exception { // Confirm host is expired: ExecutionException ex = assertThrows(ExecutionException.class, () -> lb.selectConnection(alwaysNewConnectionFilter(), null).toFuture().get()); - assertThat(ex.getCause(), instanceOf(NoAvailableHostException.class)); + + // We expect a `NoActiveHostException` because the host exists, but was marked + // as unhealthy because it is expired. + assertThat(ex.getCause(), instanceOf(NoActiveHostException.class)); lb.closeAsyncGracefully().toFuture().get(); verify(connection.get(), times(1)).closeAsyncGracefully();