From ae97ff0e8423253fe02109cb2182231838d93c67 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Tue, 19 Sep 2023 22:43:53 -0700 Subject: [PATCH] Minor refactoring --- .../netty/HttpClientChannelInitializer.java | 18 ++++++++ .../ProxyConnectLBHttpConnectionFactory.java | 42 +++++++++++-------- .../CopyByteBufHandlerChannelInitializer.java | 7 ++-- .../internal/DefaultNettyConnection.java | 9 ++-- 4 files changed, 51 insertions(+), 25 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpClientChannelInitializer.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpClientChannelInitializer.java index 2502125c1e..d2bcb8bfd2 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpClientChannelInitializer.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpClientChannelInitializer.java @@ -23,15 +23,22 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelPipeline; import java.util.ArrayDeque; +import java.util.List; import java.util.Queue; import static java.lang.Math.min; +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; final class HttpClientChannelInitializer implements ChannelInitializer { + private static final List> HANDLERS = unmodifiableList(asList( + HttpRequestEncoder.class, HttpResponseDecoder.class, CopyByteBufHandlerChannelInitializer.handlerClass())); + private final ChannelInitializer delegate; /** @@ -61,4 +68,15 @@ final class HttpClientChannelInitializer implements ChannelInitializer { public void init(final Channel channel) { delegate.init(channel); } + + /** + * A list of {@link ChannelHandler} classes added to the {@link ChannelPipeline} in reverse order + * (from last to first). + * + * @return A list of {@link ChannelHandler} classes added to the {@link ChannelPipeline} in reverse order + * (from last to first). + */ + static List> handlers() { + return HANDLERS; + } } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ProxyConnectLBHttpConnectionFactory.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ProxyConnectLBHttpConnectionFactory.java index d69c7964ca..5e0f4924dc 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ProxyConnectLBHttpConnectionFactory.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ProxyConnectLBHttpConnectionFactory.java @@ -28,13 +28,14 @@ import io.servicetalk.tcp.netty.internal.ReadOnlyTcpClientConfig; import io.servicetalk.transport.api.ExecutionStrategy; import io.servicetalk.transport.api.TransportObserver; -import io.servicetalk.transport.netty.internal.CopyByteBufHandlerChannelInitializer; import io.servicetalk.transport.netty.internal.DefaultNettyConnection; import io.servicetalk.transport.netty.internal.DeferSslHandler; import io.servicetalk.transport.netty.internal.NoopTransportObserver.NoopConnectionObserver; import io.servicetalk.transport.netty.internal.StacklessClosedChannelException; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelPipeline; import javax.annotation.Nullable; @@ -147,7 +148,7 @@ private Single handshake( } return result.shareContextOnSubscribe(); }).flatMap(protocol -> { - final Single result; + final Single result; switch (protocol) { case AlpnIds.HTTP_1_1: // Nothing to do, HTTP/1.1 pipeline is already initialized @@ -156,22 +157,8 @@ private Single handshake( case AlpnIds.HTTP_2: final Channel channel = connection.nettyChannel(); assert channel.eventLoop().inEventLoop(); - // Remove HTTP/1.1 handlers: - channel.pipeline().remove(HttpRequestEncoder.class); - channel.pipeline().remove(HttpResponseDecoder.class); - channel.pipeline().remove(CopyByteBufHandlerChannelInitializer.handlerClass()); - channel.pipeline().remove(DefaultNettyConnection.handlerClass()); - // Initialize HTTP/2: - final H2ProtocolConfig h2Config = config.h2Config(); - assert h2Config != null; - final ReadOnlyTcpClientConfig tcpConfig = config.tcpConfig(); - result = H2ClientParentConnectionContext.initChannel(channel, executionContext, - h2Config, reqRespFactoryFunc.apply(HTTP_2_0), tcpConfig.flushStrategy(), - tcpConfig.idleTimeoutMs(), tcpConfig.sslConfig(), - new H2ClientParentChannelInitializer(h2Config), - // FIXME: propagate real observer - NoopConnectionObserver.INSTANCE, config.allowDropTrailersReadFromTransport()) - .cast(FilterableStreamingHttpConnection.class); + removeH1Handlers(channel); + result = initializeH2Connection(channel); break; default: result = unknownAlpnProtocol(protocol); @@ -181,6 +168,25 @@ private Single handshake( }); } + private static void removeH1Handlers(final Channel channel) { + final ChannelPipeline pipeline = channel.pipeline(); + pipeline.remove(DefaultNettyConnection.handlerClass()); + for (Class handlerClass : HttpClientChannelInitializer.handlers()) { + pipeline.remove(handlerClass); + } + } + + private Single initializeH2Connection(final Channel channel) { + final H2ProtocolConfig h2Config = config.h2Config(); + assert h2Config != null; + final ReadOnlyTcpClientConfig tcpConfig = config.tcpConfig(); + return H2ClientParentConnectionContext.initChannel(channel, executionContext, h2Config, + reqRespFactoryFunc.apply(HTTP_2_0), tcpConfig.flushStrategy(), tcpConfig.idleTimeoutMs(), + tcpConfig.sslConfig(), new H2ClientParentChannelInitializer(h2Config), + // FIXME: propagate real observer + NoopConnectionObserver.INSTANCE, config.allowDropTrailersReadFromTransport()); + } + private static Single drainPropagateError( final StreamingHttpResponse response, final Throwable error) { return safeCompletePropagateError(response.messageBody().ignoreElements(), error); diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/CopyByteBufHandlerChannelInitializer.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/CopyByteBufHandlerChannelInitializer.java index 2eb183a35e..368964669b 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/CopyByteBufHandlerChannelInitializer.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/CopyByteBufHandlerChannelInitializer.java @@ -19,6 +19,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; @@ -56,13 +57,13 @@ public void init(final Channel channel) { } /** - * Return {@link Class} of the {@link ChannelInboundHandler} in case there is a need to remove the handler from the + * Return {@link Class} of the {@link ChannelHandler} in case there is a need to remove the handler from the * {@link ChannelPipeline}. * - * @return {@link Class} of the {@link ChannelInboundHandler} in case there is a need to remove the handler from the + * @return {@link Class} of the {@link ChannelHandler} in case there is a need to remove the handler from the * {@link ChannelPipeline}. */ - public static Class handlerClass() { + public static Class handlerClass() { return CopyByteBufHandler.class; } diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/DefaultNettyConnection.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/DefaultNettyConnection.java index 89b900de32..61277fd6eb 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/DefaultNettyConnection.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/DefaultNettyConnection.java @@ -52,8 +52,8 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; @@ -526,13 +526,14 @@ protected void handleSubscribe( } /** - * Return {@link Class} of the {@link ChannelInboundHandler} in case there is a need to remove the handler from the + * Return {@link Class} of the {@link ChannelHandler} in case there is a need to remove the handler from the * {@link ChannelPipeline}. * - * @return {@link Class} of the {@link ChannelInboundHandler} in case there is a need to remove the handler from the + * @return {@link Class} of the {@link ChannelHandler} in case there is a need to remove the handler from the * {@link ChannelPipeline}. */ - public static Class handlerClass() { + // FIXME: remove this method in a follow-up RP, after refactoring for ProxyConnectObserver is complete. + public static Class handlerClass() { return NettyToStChannelHandler.class; }