Skip to content

Commit

Permalink
Add ConnectionObserver.ProxyConnectObserver
Browse files Browse the repository at this point in the history
Motivation:

After apple#2699 implemented HTTP proxy CONNECT with ALPN, it made observability
confusing. It broke the contract that only one `connectionEstablished` method can
be invoked, which happens when ALPN switches from HTTP/1.1 to HTTP/2 pipeline.
Also, for a proxy tunnel with HTTP/1.1 `ConnectionObserver` events were always confusing:
users had to wait for `handshakeComplete` after `connectionEstablished` to report that
the connection is "ready".

Modifications:

- Add `ConnectionObserver.ProxyConnectObserver` that reports events related to
`CONNECT` phase that starts right after `onTransportHandshakeComplete`;
- Refactor `PipelinedLBHttpConnectionFactory` to defer invocation of
`connectionEstablished`/`multiplexedConnectionEstablished` until after the handshake
is complete to make it consistent with non-proxied connections and make sure only
one of the "established" methods is invoked;
- Removed `ProxyConnectLBHttpConnectionFactoryTest` because it targeted code paths
that do not exist anymore. Instead, enhanced `HttpsProxyTest` to cover similar use-cases.
- Enhanced `ProxyTunnel` to allow behavior customization;

Result:

Enhanced observability for proxy tunnels, `ConnectionObserver` behavior is consistent
between proxied and non-proxied use-cases.
  • Loading branch information
idelpivnitskiy committed Sep 29, 2023
1 parent 055374e commit afe36ec
Show file tree
Hide file tree
Showing 23 changed files with 777 additions and 657 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2023 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2019, 2021-2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.TerminalSignalConsumer;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpConnectionContext;
import io.servicetalk.http.api.HttpEventKey;
import io.servicetalk.http.api.HttpExecutionContext;
Expand All @@ -38,7 +39,6 @@
import io.servicetalk.transport.netty.internal.FlushStrategy;
import io.servicetalk.transport.netty.internal.NettyConnectionContext;

import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -66,7 +66,7 @@
import static java.util.Objects.requireNonNull;

abstract class AbstractStreamingHttpConnection<CC extends NettyConnectionContext>
implements NettyFilterableStreamingHttpConnection {
implements FilterableStreamingHttpConnection {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStreamingHttpConnection.class);
static final IgnoreConsumedEvent<Integer> ZERO_MAX_CONCURRENCY_EVENT = new IgnoreConsumedEvent<>(0);
Expand Down Expand Up @@ -168,7 +168,7 @@ public void cancel() {
}

@Override
public final Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
public Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
return defer(() -> {
Publisher<Object> flatRequest;
// See https://tools.ietf.org/html/rfc7230#section-3.3.3
Expand Down Expand Up @@ -260,11 +260,6 @@ public final StreamingHttpResponseFactory httpResponseFactory() {
return reqRespFactory;
}

@Override
public Channel nettyChannel() {
return connection.nettyChannel();
}

@Override
public final Completable onClose() {
return connectionContext.onClose();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ final class PipelinedLBHttpConnectionFactory<ResolvedAddress> extends AbstractLB
Single<FilterableStreamingHttpConnection> newFilterableConnection(final ResolvedAddress resolvedAddress,
final TransportObserver observer) {
assert config.h1Config() != null;
return buildStreaming(executionContext, resolvedAddress, config.tcpConfig(), config.h1Config(),
config.hasProxy(), observer)
return buildStreaming(executionContext, resolvedAddress, config, observer)
.map(conn -> new PipelinedStreamingHttpConnection(conn, config.h1Config(),
reqRespFactoryFunc.apply(HTTP_1_1), config.allowDropTrailersReadFromTransport()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.netty;

import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.http.api.HttpHeadersFactory;
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.http.netty.ProxyConnectException.RetryableProxyConnectException;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.ConnectionObserver.ProxyConnectObserver;
import io.servicetalk.transport.netty.internal.ChannelInitializer;
import io.servicetalk.transport.netty.internal.CloseHandler.InboundDataEndEvent;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import static io.servicetalk.http.api.HttpHeaderNames.HOST;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.api.HttpRequestMetaDataFactory.newRequestMetaData;
import static io.servicetalk.http.api.HttpRequestMethod.CONNECT;
import static io.servicetalk.http.api.HttpResponseStatus.StatusClass.SUCCESSFUL_2XX;
import static io.servicetalk.transport.netty.internal.ChannelCloseUtils.assignConnectionError;

/**
* A {@link Single} that adds a {@link ChannelHandler} into {@link ChannelPipeline} to perform HTTP/1.1 CONNECT
* exchange.
*/
final class ProxyConnectChannelSingle extends ChannelInitSingle<Channel> {

private final ConnectionObserver observer;
private final HttpHeadersFactory headersFactory;
private final String connectAddress;

ProxyConnectChannelSingle(final Channel channel,
final ChannelInitializer channelInitializer,
final ConnectionObserver observer,
final HttpHeadersFactory headersFactory,
final String connectAddress) {
super(channel, channelInitializer);
this.observer = observer;
this.headersFactory = headersFactory;
this.connectAddress = connectAddress;
assert !channel.config().isAutoRead();
}

@Override
protected ChannelHandler newChannelHandler(final Subscriber<? super Channel> subscriber) {
return new ProxyConnectHandler(observer, headersFactory, connectAddress, subscriber);
}

private static final class ProxyConnectHandler extends ChannelDuplexHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(ProxyConnectHandler.class);

private final ConnectionObserver observer;
private final HttpHeadersFactory headersFactory;
private final String connectAddress;
@Nullable
private Subscriber<? super Channel> subscriber;
@Nullable
private ProxyConnectObserver connectObserver;
@Nullable
private HttpResponseMetaData response;

private ProxyConnectHandler(final ConnectionObserver observer,
final HttpHeadersFactory headersFactory,
final String connectAddress,
final Subscriber<? super Channel> subscriber) {
this.observer = observer;
this.headersFactory = headersFactory;
this.connectAddress = connectAddress;
this.subscriber = subscriber;
}

@Override
public void handlerAdded(final ChannelHandlerContext ctx) {
if (ctx.channel().isActive()) {
sendConnectRequest(ctx);
}
}

@Override
public void channelActive(final ChannelHandlerContext ctx) {
sendConnectRequest(ctx);
ctx.fireChannelActive();
}

private void sendConnectRequest(final ChannelHandlerContext ctx) {
final HttpRequestMetaData request = newRequestMetaData(HTTP_1_1, CONNECT, connectAddress,
headersFactory.newHeaders()).addHeader(HOST, connectAddress);
connectObserver = observer.onProxyConnect(request);
ctx.writeAndFlush(request).addListener(f -> {
if (f.isSuccess()) {
ctx.read();
} else {
failSubscriber(ctx, new RetryableProxyConnectException(
"Failed to write CONNECT request", f.cause()));
}
});
}

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
if (msg instanceof HttpResponseMetaData) {
if (response != null) {
failSubscriber(ctx, new RetryableProxyConnectException(
"Received two responses for a single CONNECT request"));
return;
}
response = (HttpResponseMetaData) msg;
if (response.status().statusClass() != SUCCESSFUL_2XX) {
failSubscriber(ctx, new ProxyResponseException("Non-successful response '" + response.status() +
"' from proxy on CONNECT " + connectAddress, response.status()));
}
// We do not complete subscriber here because we need to wait for the HttpResponseDecoder state machine
// to complete. Completion will be signalled by InboundDataEndEvent. Any other messages before that are
// unexpected, see https://datatracker.ietf.org/doc/html/rfc9110#section-9.3.6
// It also helps to make sure we do not propagate InboundDataEndEvent after the next handlers are added
// to the pipeline, potentially causing changes in their state machine.
} else {
failSubscriber(ctx, new RetryableProxyConnectException(
"Received unexpected message in the pipeline of type: " + msg.getClass().getName()));
}
}

@Override
public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
if (subscriber != null) {
ctx.read(); // Keep requesting until finished
}
ctx.fireChannelReadComplete();
}

@Override
public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
if (evt != InboundDataEndEvent.INSTANCE || subscriber == null) {
ctx.fireUserEventTriggered(evt);
return;
}
assert response != null;
assert connectObserver != null;
connectObserver.proxyConnectComplete(response);
ctx.pipeline().remove(this);
final Channel channel = ctx.channel();
LOGGER.debug("{} Received successful response from proxy on CONNECT {}", channel, connectAddress);
final Subscriber<? super Channel> subscriberCopy = subscriber;
subscriber = null;
subscriberCopy.onSuccess(channel);
}

@Override
public void channelInactive(final ChannelHandlerContext ctx) {
if (subscriber != null) {
failSubscriber(ctx, new RetryableProxyConnectException(
"Connection closed before proxy CONNECT finished"));
return;
}
ctx.fireChannelInactive();
}

@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
if (subscriber != null) {
failSubscriber(ctx, new ProxyConnectException(
"Unexpected exception before proxy CONNECT finished", cause));
return;
}
ctx.fireExceptionCaught(cause);
}

private void failSubscriber(final ChannelHandlerContext ctx, final Throwable cause) {
assignConnectionError(ctx.channel(), cause);
if (subscriber != null) {
if (connectObserver != null) {
connectObserver.proxyConnectFailed(cause);
}
final SingleSource.Subscriber<? super Channel> subscriberCopy = subscriber;
subscriber = null;
subscriberCopy.onError(cause);
}
ctx.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.netty;

import io.servicetalk.transport.api.RetryableException;

import java.io.IOException;

/**
* An exception while processing
* <a href="https://datatracker.ietf.org/doc/html/rfc9110#section-9.3.6">HTTP/1.1 CONNECT</a> request.
*/
public class ProxyConnectException extends IOException {

private static final long serialVersionUID = 4453075928788773272L;

ProxyConnectException(final String message) {
super(message);
}

ProxyConnectException(final String message, final Throwable cause) {
super(message, cause);
}

static final class RetryableProxyConnectException extends ProxyConnectException
implements RetryableException {

private static final long serialVersionUID = 5118637083568536242L;

RetryableProxyConnectException(final String message) {
super(message);
}

RetryableProxyConnectException(final String message, final Throwable cause) {
super(message, cause);
}
}
}
Loading

0 comments on commit afe36ec

Please sign in to comment.