Skip to content

Commit

Permalink
Minor refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
idelpivnitskiy committed Sep 25, 2023
1 parent a2f68d6 commit ae97ff0
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,22 @@

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;

import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;

import static java.lang.Math.min;
import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;

final class HttpClientChannelInitializer implements ChannelInitializer {

private static final List<Class<? extends ChannelHandler>> HANDLERS = unmodifiableList(asList(
HttpRequestEncoder.class, HttpResponseDecoder.class, CopyByteBufHandlerChannelInitializer.handlerClass()));

private final ChannelInitializer delegate;

/**
Expand Down Expand Up @@ -61,4 +68,15 @@ final class HttpClientChannelInitializer implements ChannelInitializer {
public void init(final Channel channel) {
delegate.init(channel);
}

/**
* A list of {@link ChannelHandler} classes added to the {@link ChannelPipeline} in reverse order
* (from last to first).
*
* @return A list of {@link ChannelHandler} classes added to the {@link ChannelPipeline} in reverse order
* (from last to first).
*/
static List<Class<? extends ChannelHandler>> handlers() {
return HANDLERS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@
import io.servicetalk.tcp.netty.internal.ReadOnlyTcpClientConfig;
import io.servicetalk.transport.api.ExecutionStrategy;
import io.servicetalk.transport.api.TransportObserver;
import io.servicetalk.transport.netty.internal.CopyByteBufHandlerChannelInitializer;
import io.servicetalk.transport.netty.internal.DefaultNettyConnection;
import io.servicetalk.transport.netty.internal.DeferSslHandler;
import io.servicetalk.transport.netty.internal.NoopTransportObserver.NoopConnectionObserver;
import io.servicetalk.transport.netty.internal.StacklessClosedChannelException;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -147,7 +148,7 @@ private Single<FilterableStreamingHttpConnection> handshake(
}
return result.shareContextOnSubscribe();
}).flatMap(protocol -> {
final Single<FilterableStreamingHttpConnection> result;
final Single<? extends FilterableStreamingHttpConnection> result;
switch (protocol) {
case AlpnIds.HTTP_1_1:
// Nothing to do, HTTP/1.1 pipeline is already initialized
Expand All @@ -156,22 +157,8 @@ private Single<FilterableStreamingHttpConnection> handshake(
case AlpnIds.HTTP_2:
final Channel channel = connection.nettyChannel();
assert channel.eventLoop().inEventLoop();
// Remove HTTP/1.1 handlers:
channel.pipeline().remove(HttpRequestEncoder.class);
channel.pipeline().remove(HttpResponseDecoder.class);
channel.pipeline().remove(CopyByteBufHandlerChannelInitializer.handlerClass());
channel.pipeline().remove(DefaultNettyConnection.handlerClass());
// Initialize HTTP/2:
final H2ProtocolConfig h2Config = config.h2Config();
assert h2Config != null;
final ReadOnlyTcpClientConfig tcpConfig = config.tcpConfig();
result = H2ClientParentConnectionContext.initChannel(channel, executionContext,
h2Config, reqRespFactoryFunc.apply(HTTP_2_0), tcpConfig.flushStrategy(),
tcpConfig.idleTimeoutMs(), tcpConfig.sslConfig(),
new H2ClientParentChannelInitializer(h2Config),
// FIXME: propagate real observer
NoopConnectionObserver.INSTANCE, config.allowDropTrailersReadFromTransport())
.cast(FilterableStreamingHttpConnection.class);
removeH1Handlers(channel);
result = initializeH2Connection(channel);
break;
default:
result = unknownAlpnProtocol(protocol);
Expand All @@ -181,6 +168,25 @@ private Single<FilterableStreamingHttpConnection> handshake(
});
}

private static void removeH1Handlers(final Channel channel) {
final ChannelPipeline pipeline = channel.pipeline();
pipeline.remove(DefaultNettyConnection.handlerClass());
for (Class<? extends ChannelHandler> handlerClass : HttpClientChannelInitializer.handlers()) {
pipeline.remove(handlerClass);
}
}

private Single<? extends FilterableStreamingHttpConnection> initializeH2Connection(final Channel channel) {
final H2ProtocolConfig h2Config = config.h2Config();
assert h2Config != null;
final ReadOnlyTcpClientConfig tcpConfig = config.tcpConfig();
return H2ClientParentConnectionContext.initChannel(channel, executionContext, h2Config,
reqRespFactoryFunc.apply(HTTP_2_0), tcpConfig.flushStrategy(), tcpConfig.idleTimeoutMs(),
tcpConfig.sslConfig(), new H2ClientParentChannelInitializer(h2Config),
// FIXME: propagate real observer
NoopConnectionObserver.INSTANCE, config.allowDropTrailersReadFromTransport());
}

private static Single<FilterableStreamingHttpConnection> drainPropagateError(
final StreamingHttpResponse response, final Throwable error) {
return safeCompletePropagateError(response.messageBody().ignoreElements(), error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
Expand Down Expand Up @@ -56,13 +57,13 @@ public void init(final Channel channel) {
}

/**
* Return {@link Class} of the {@link ChannelInboundHandler} in case there is a need to remove the handler from the
* Return {@link Class} of the {@link ChannelHandler} in case there is a need to remove the handler from the
* {@link ChannelPipeline}.
*
* @return {@link Class} of the {@link ChannelInboundHandler} in case there is a need to remove the handler from the
* @return {@link Class} of the {@link ChannelHandler} in case there is a need to remove the handler from the
* {@link ChannelPipeline}.
*/
public static Class<? extends ChannelInboundHandler> handlerClass() {
public static Class<? extends ChannelHandler> handlerClass() {
return CopyByteBufHandler.class;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
Expand Down Expand Up @@ -526,13 +526,14 @@ protected void handleSubscribe(
}

/**
* Return {@link Class} of the {@link ChannelInboundHandler} in case there is a need to remove the handler from the
* Return {@link Class} of the {@link ChannelHandler} in case there is a need to remove the handler from the
* {@link ChannelPipeline}.
*
* @return {@link Class} of the {@link ChannelInboundHandler} in case there is a need to remove the handler from the
* @return {@link Class} of the {@link ChannelHandler} in case there is a need to remove the handler from the
* {@link ChannelPipeline}.
*/
public static Class<? extends ChannelInboundHandler> handlerClass() {
// FIXME: remove this method in a follow-up RP, after refactoring for ProxyConnectObserver is complete.
public static Class<? extends ChannelHandler> handlerClass() {
return NettyToStChannelHandler.class;
}

Expand Down

0 comments on commit ae97ff0

Please sign in to comment.