Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline configurator for netty #30387

Draft
wants to merge 1 commit into
base: netty-transport-http
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/com.ibm.ws.transport.http/bnd.bnd
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ Private-Package: \
com.ibm.ws.tcpchannel.internal.resources, \
com.ibm.ws.genericbnf.internal, \
io.openliberty.http.options, \
io.openliberty.http.pipeline.configurators, \
io.openliberty.http.utils

-dsannotations: com.ibm.ws.http.channel.internal.inbound.HttpPipelineEventHandler,\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.HttpConversionUtil;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler;
import io.openliberty.http.pipeline.configurators.PipelineHandlerUtility;

/**
*
Expand Down Expand Up @@ -883,7 +884,7 @@ public void pushNewRequest(Http2PushBuilder pushBuilder) {
@Override
public void run() {
try {
((HttpDispatcherHandler) nettyContext.channel().pipeline().get(HttpPipelineInitializer.HTTP_DISPATCHER_HANDLER_NAME)).channelRead(nettyContext,
((HttpDispatcherHandler) nettyContext.channel().pipeline().get(PipelineHandlerUtility.HTTP_DISPATCHER_HANDLER_NAME)).channelRead(nettyContext,
newRequest);
} catch (Exception e) {

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2023 IBM Corporation and others.
* Copyright (c) 2023, 2024 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
Expand All @@ -16,26 +16,34 @@

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.openliberty.http.pipeline.configurators.PipelineHandlerUtility;

/**
*
* This adapter handles outbound HTTP responses.
* If the response status indicates a protocol switch (e.g., WebSocket upgrade),
* this handler removes itself and certain HTTP/1.1-related handlers from the pipeline,
* preparing the pipeline for the upgraded protocol.
*/
public class TransportOutboundHandler extends ChannelOutboundHandlerAdapter {

HttpChannelConfig config;

FullHttpResponse fullResponse;
private final HttpChannelConfig config;

/**
* Constructs a TransportOutboundHandler with the given configuration.
*
* @param config The HttpChannelConfig containing server configuration settings.
*/
public TransportOutboundHandler(HttpChannelConfig config) {
Objects.requireNonNull(config);
this.config = config;
this.config = Objects.requireNonNull(config, "HttpChannelConfig cannot be null");
}

@Override
Expand All @@ -47,37 +55,65 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
HttpResponse response = (HttpResponse) msg;

final boolean isSwitching = response.status().equals(HttpResponseStatus.SWITCHING_PROTOCOLS);
ChannelFuture future = ctx.writeAndFlush(msg, promise);

ChannelFuture future = ctx.writeAndFlush(msg);

future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess() && isSwitching) {
future.addListener((ChannelFutureListener) f -> {

ctx.pipeline().remove(TransportOutboundHandler.class);
if (Objects.nonNull(ctx.pipeline().get(HttpServerCodec.class))) {
ctx.pipeline().remove(HttpServerCodec.class);
}

ctx.pipeline().remove("maxConnectionHandler");
ctx.pipeline().remove("chunkLoggingHandler");
ctx.pipeline().remove("chunkWriteHandler");
ctx.pipeline().remove(ByteBufferCodec.class);

if (ctx.pipeline().get(NettyServletUpgradeHandler.class) == null) {
if(f.isSuccess()){
System.out.println("MSP HTTP11CONFIG -> " + ctx.pipeline().names());
System.out.println("Switching: " + isSwitching);
}

NettyServletUpgradeHandler upgradeHandler = new NettyServletUpgradeHandler(ctx.channel());
ctx.pipeline().addLast(upgradeHandler);
}
if (f.isSuccess() && isSwitching) {
// On successful protocol switch, remove TransportOutboundHandler and HTTP codec
removeHandlerIfPresent(ctx.pipeline(), TransportOutboundHandler.class);
removeHandlerIfPresent(ctx.pipeline(), HttpServerCodec.class);
//TODO: set a PipelineHandlerUtility name to these.

// Remove other handlers that were part of the HTTP pipeline
removeHandlerIfPresent(ctx.pipeline(), PipelineHandlerUtility.MAX_CONNECTION_HANDLER_NAME);
removeHandlerIfPresent(ctx.pipeline(), PipelineHandlerUtility.CHUNK_LOGGING_HANDLER_NAME);
removeHandlerIfPresent(ctx.pipeline(), PipelineHandlerUtility.CHUNK_WRITE_HANDLER_NAME);
removeHandlerIfPresent(ctx.pipeline(), PipelineHandlerUtility.BYTE_BUFFER_CODEC_HANDLER_NAME);

// If NettyServletUpgradeHandler is not present, add it
if (ctx.pipeline().get(PipelineHandlerUtility.NETTY_SERVLET_UPGRADE_HANDLER_NAME) == null) {
NettyServletUpgradeHandler upgradeHandler = new NettyServletUpgradeHandler(ctx.channel());
ctx.pipeline().addLast(PipelineHandlerUtility.NETTY_SERVLET_UPGRADE_HANDLER_NAME, upgradeHandler);
}
System.out.println("MSP after switching-> " + ctx.pipeline().names());
} else{
System.out.println("MSP OUTBOUND NOT SUCCESS or not switching-> " + ctx.pipeline());

}
});
} else {
// For non-HttpResponse messages, just pass through
super.write(ctx, msg, promise);
}
}

else {
super.write(ctx, msg, promise);
/**
* Removes a handler by name if it exists in the pipeline.
*
* @param pipeline The ChannelPipeline to modify.
* @param handlerName The name of the handler to remove.
*/
private void removeHandlerIfPresent(ChannelPipeline pipeline, String handlerName) {
if (pipeline.context(handlerName) != null) {
pipeline.remove(handlerName);
}
}

}
/**
* Removes a handler by class if it exists in the pipeline.
*
* @param pipeline The ChannelPipeline to modify.
* @param handlerClass The class of the handler to remove.
*/
private void removeHandlerIfPresent(ChannelPipeline pipeline, Class<? extends ChannelHandler> handlerClass) {
if (pipeline.context(handlerClass) != null) {
pipeline.remove(handlerClass);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2023 IBM Corporation and others.
* Copyright (c) 2023, 2024 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
Expand All @@ -14,6 +14,7 @@
import com.ibm.ws.http.netty.NettyHttpChannelConfig;
import com.ibm.ws.http.netty.pipeline.CRLFValidationHandler;
import com.ibm.ws.http.netty.pipeline.HttpPipelineInitializer;
import com.ibm.ws.http.netty.pipeline.inbound.HttpDispatcherHandler;
import com.ibm.ws.http.netty.pipeline.inbound.LibertyHttpObjectAggregator;
import com.ibm.ws.http.netty.pipeline.inbound.LibertyHttpRequestHandler;

Expand All @@ -24,9 +25,11 @@
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
import io.openliberty.http.pipeline.configurators.PipelineHandlerUtility;

/**
* ALPN Handler for negotiating what protocol to use
* ALPN Handler for negotiating what protocol (HTTP/2 or HTTP/1.1) to use.
* This class checks the negotiated protocol and reconfigures the pipeline accordingly.
*/
public class LibertyNettyALPNHandler extends ApplicationProtocolNegotiationHandler {

Expand All @@ -35,51 +38,68 @@ public class LibertyNettyALPNHandler extends ApplicationProtocolNegotiationHandl
private final NettyHttpChannelConfig httpConfig;

/**
* Default to HTTP 2.0 for now
* Defaults to HTTP/1.1 if no protocol is negotiated.
*/
public LibertyNettyALPNHandler(NettyHttpChannelConfig httpConfig) {
super(ApplicationProtocolNames.HTTP_1_1);
this.httpConfig = httpConfig;
}

@Override
@Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception {
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(this, tc, "Configuring pipeline with HTTP 2 for incoming connection " + ctx.channel());
Tr.debug(this, tc, "Configuring pipeline with HTTP/2 for incoming connection " + ctx.channel());
}

// For HTTP/2 negotiation, we use a LibertyUpgradeCodec to build an Http2ConnectionHandler
LibertyUpgradeCodec codec = new LibertyUpgradeCodec(httpConfig, ctx.channel());
HttpToHttp2ConnectionHandler handler = codec.buildHttp2ConnectionHandler(httpConfig, ctx.channel());
// HTTP2 to HTTP 1.1 and back pipeline
ctx.pipeline().addAfter(HttpPipelineInitializer.HTTP_ALPN_HANDLER_NAME, null, handler);

// Insert the HTTP/2 handler after the ALPN handler
ctx.pipeline().addAfter(PipelineHandlerUtility.HTTP_ALPN_HANDLER_NAME, null, handler);

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(this, tc, "Configured pipeline with " + ctx.pipeline().names());
}
return;
}

if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(this, tc, "Configuring pipeline with HTTP 1.1 for incoming connection " + ctx.channel());
Tr.debug(this, tc, "Configuring pipeline with HTTP/1.1 for incoming connection " + ctx.channel());
}
ctx.pipeline().addAfter(HttpPipelineInitializer.HTTP_ALPN_HANDLER_NAME, HttpPipelineInitializer.NETTY_HTTP_SERVER_CODEC,
new HttpServerCodec(8192, Integer.MAX_VALUE, httpConfig.getIncomingBodyBufferSize()));
ctx.pipeline().addBefore(HttpPipelineInitializer.NETTY_HTTP_SERVER_CODEC, HttpPipelineInitializer.CRLF_VALIDATION_HANDLER, new CRLFValidationHandler());
ctx.pipeline().addAfter(HttpPipelineInitializer.NETTY_HTTP_SERVER_CODEC, HttpPipelineInitializer.HTTP_KEEP_ALIVE_HANDLER_NAME, new HttpServerKeepAliveHandler());
//TODO: this is a very large number, check best practice
ctx.pipeline().addAfter(HttpPipelineInitializer.HTTP_KEEP_ALIVE_HANDLER_NAME, HttpPipelineInitializer.HTTP_AGGREGATOR_HANDLER_NAME,
new LibertyHttpObjectAggregator(httpConfig.getMessageSizeLimit() == -1 ? HttpPipelineInitializer.maxContentLength : httpConfig.getMessageSizeLimit()));
ctx.pipeline().addAfter(HttpPipelineInitializer.HTTP_AGGREGATOR_HANDLER_NAME, HttpPipelineInitializer.HTTP_REQUEST_HANDLER_NAME, new LibertyHttpRequestHandler());
// Turn on half closure for H1

// Add the HTTP server codec after ALPN handler
ctx.pipeline().addAfter(
PipelineHandlerUtility.HTTP_ALPN_HANDLER_NAME,
PipelineHandlerUtility.NETTY_HTTP_SERVER_CODEC,
new HttpServerCodec(8192, Integer.MAX_VALUE, httpConfig.getIncomingBodyBufferSize())
);

// Add the dispatcher handler for HTTP/1.1 scenario
ctx.pipeline().addLast(PipelineHandlerUtility.HTTP_DISPATCHER_HANDLER_NAME, new HttpDispatcherHandler(httpConfig));

// Now reuse the existing utility methods to add all HTTP/1.1 handlers
// Pre-HTTP codec handlers (e.g., logging if enabled)
PipelineHandlerUtility.addPreHttpCodecHandlers(ctx.pipeline(), httpConfig);

// Pre-dispatcher handlers for HTTP/1.1 (keep-alive, aggregator, request handler)
PipelineHandlerUtility.addPreDispatcherHandlers(ctx.pipeline(), false, httpConfig);

// Allow half-closure for HTTP/1.1
ctx.channel().config().setOption(ChannelOption.ALLOW_HALF_CLOSURE, true);

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(this, tc, "Configured pipeline with " + ctx.pipeline().names());
}
return;
}

// If neither HTTP/2 nor HTTP/1.1 was negotiated, this is unexpected
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(this, tc, "Pipeline unconfigured for protocol " + protocol);
}
throw new IllegalStateException("unknown protocol: " + protocol);
throw new IllegalStateException("Unknown protocol: " + protocol);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
*/
public enum HttpOption implements EndpointOption {

// HTTP/1.1 Options
KEEP_ALIVE_ENABLED("keepAliveEnabled", false, Boolean.class, ConfigType.HTTP),
MAX_KEEP_ALIVE_REQUESTS("maxKeepAliveRequests", -1, Integer.class, ConfigType.HTTP),
PERSIST_TIMEOUT("persistTimeout", "30s", String.class, ConfigType.HTTP),
Expand All @@ -38,6 +39,8 @@ public enum HttpOption implements EndpointOption {
THROW_IOE_FOR_INBOUND_CONNECTIONS("ThrowIOEForInboundConnections", null, Boolean.class, ConfigType.HTTP),
DECOMPRESSION_RATIO_LIMIT("decompressionRatioLimit", 200, Integer.class, ConfigType.HTTP),
DECOMPRESSION_TOLERANCE("decompressionTolerance", 3, Integer.class, ConfigType.HTTP),

// HTTP/2.0 Options
HTTP2_CONNECTION_IDLE_TIMEOUT("http2ConnectionIdleTimeout", "0", String.class, ConfigType.HTTP2),
MAX_CONCURRENT_STREAMS("maxConcurrentStreams", 100, Integer.class, ConfigType.HTTP2),
MAX_FRAME_SIZE("maxFrameSize", 57344, Integer.class, ConfigType.HTTP2),
Expand Down
Loading