Skip to content

Commit

Permalink
Merge branch 'main' into bl_anderson/RR_move_selection_to_Host
Browse files Browse the repository at this point in the history
  • Loading branch information
bryce-anderson committed Oct 19, 2023
2 parents 24aacdc + 61973fe commit a1e136b
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,6 @@ private static final class DefaultH2ClientParentConnection extends AbstractH2Par
maxConcurrencyPublisher.ignoreElements().subscribe();
}

@Override
boolean hasSubscriber() {
return subscriber != null;
}

@Override
void tryCompleteSubscriber() {
if (subscriber != null) {
Expand All @@ -214,12 +209,15 @@ void tryCompleteSubscriber() {
}

@Override
void tryFailSubscriber(Throwable cause) {
boolean tryFailSubscriber(Throwable cause) {
if (subscriber != null) {
close(parentContext.nettyChannel(), cause);
Subscriber<? super H2ClientParentConnection> subscriberCopy = subscriber;
subscriber = null;
subscriberCopy.onError(cause);
return true;
} else {
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import io.servicetalk.transport.api.ConnectionContext;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.SslConfig;
import io.servicetalk.transport.netty.internal.ChannelCloseUtils;
import io.servicetalk.transport.netty.internal.FlushStrategy;
import io.servicetalk.transport.netty.internal.FlushStrategyHolder;
import io.servicetalk.transport.netty.internal.NettyChannelListenableAsyncCloseable;
import io.servicetalk.transport.netty.internal.NettyConnectionContext;
import io.servicetalk.transport.netty.internal.NoopTransportObserver.NoopConnectionObserver;
import io.servicetalk.transport.netty.internal.StacklessClosedChannelException;

import io.netty.channel.Channel;
Expand All @@ -44,6 +44,8 @@
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.ssl.SslCloseCompletionEvent;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.SocketAddress;
import java.net.SocketOption;
Expand All @@ -56,12 +58,14 @@
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_2_0;
import static io.servicetalk.http.netty.HttpExecutionContextUtils.channelExecutionContext;
import static io.servicetalk.http.netty.NettyHttp2ExceptionUtils.wrapIfNecessary;
import static io.servicetalk.transport.netty.internal.ChannelCloseUtils.assignConnectionError;
import static io.servicetalk.transport.netty.internal.NettyPipelineSslUtils.extractSslSession;
import static io.servicetalk.transport.netty.internal.SocketOptionUtils.getOption;

class H2ParentConnectionContext extends NettyChannelListenableAsyncCloseable implements NettyConnectionContext,
HttpConnectionContext {

private static final Logger LOGGER = LoggerFactory.getLogger(H2ParentConnectionContext.class);

final FlushStrategyHolder flushStrategyHolder;
private final HttpExecutionContext executionContext;
private final SingleSource.Processor<Throwable, Throwable> transportError = newSingleProcessor();
Expand Down Expand Up @@ -162,7 +166,11 @@ protected final void doCloseAsyncGracefully() {
}, true);
}

private void notifyOnClosingImpl() { // For access from AbstractH2ParentConnection
/**
* This method is required to access notifyOnClosing() from AbstractH2ParentConnection, because usage of
* {@code parentContext.notifyOnClosing()} directly triggers {@link java.lang.IllegalAccessError}.
*/
private void notifyOnClosingImpl() {
notifyOnClosing();
}

Expand Down Expand Up @@ -191,11 +199,9 @@ abstract static class AbstractH2ParentConnection extends ChannelInboundHandlerAd
this.observer = observer;
}

abstract boolean hasSubscriber();

abstract void tryCompleteSubscriber();

abstract void tryFailSubscriber(Throwable cause);
abstract boolean tryFailSubscriber(Throwable cause);

/**
* Receive a settings frame and optionally handle the acknowledgement of the frame.
Expand Down Expand Up @@ -238,19 +244,22 @@ public final void handlerRemoved(ChannelHandlerContext ctx) {
private void doChannelClosed(final String method) {
parentContext.notifyOnClosingImpl();

if (hasSubscriber()) {
tryFailSubscriber(StacklessClosedChannelException.newInstance(H2ParentConnectionContext.class, method));
}
tryFailSubscriber(StacklessClosedChannelException.newInstance(H2ParentConnectionContext.class, method));
parentContext.keepAliveManager.channelClosed();
}

@Override
public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// The parent channel will be closed in case of exception after the cause is propagated to subscriber.
// In case users don't have offloading, there is a risk to retry on the same IO thread.
// We should notify LoadBalancer that this connection is closing to avoid retrying on the same connection.
parentContext.notifyOnClosingImpl();
cause = wrapIfNecessary(cause);
if (observer != NoopConnectionObserver.INSTANCE) {
assignConnectionError(ctx.channel(), cause);
}
parentContext.transportError.onSuccess(cause);
if (!tryFailSubscriber(cause)) {
LOGGER.debug("{} closing h2 parent channel on exception caught", parentContext.nettyChannel(), cause);
ChannelCloseUtils.close(ctx, cause);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,6 @@ private static final class DefaultH2ServerParentConnection extends AbstractH2Par
this.subscriber = requireNonNull(subscriber);
}

@Override
boolean hasSubscriber() {
return subscriber != null;
}

@Override
void tryCompleteSubscriber() {
if (subscriber != null) {
Expand All @@ -236,12 +231,15 @@ void tryCompleteSubscriber() {
}

@Override
void tryFailSubscriber(Throwable cause) {
boolean tryFailSubscriber(Throwable cause) {
if (subscriber != null) {
ChannelCloseUtils.close(parentContext.nettyChannel(), cause);
Subscriber<? super H2ServerParentConnectionContext> subscriberCopy = subscriber;
subscriber = null;
subscriberCopy.onError(cause);
return true;
} else {
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
Expand All @@ -37,6 +38,7 @@
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -71,9 +73,11 @@ class ConnectionClosedAfterIoExceptionTest {
void test(HttpProtocol protocol) throws Exception {
AtomicReference<FilterableStreamingHttpConnection> firstConnection = new AtomicReference<>();
BlockingQueue<Throwable> errors = new LinkedBlockingQueue<>();
AtomicBoolean firstAttempt = new AtomicBoolean(true);
try (ServerContext serverContext = BuilderUtils.newServerBuilder(SERVER_CTX, protocol)
// Fail only the first connect attempt
.appendEarlyConnectionAcceptor(conn -> errors.isEmpty() ? failed(DELIBERATE_EXCEPTION) : completed())
.appendEarlyConnectionAcceptor(conn -> firstAttempt.compareAndSet(true, false) ?
failed(DELIBERATE_EXCEPTION) : completed())
.listenBlockingAndAwait((ctx, request, responseFactory) -> responseFactory.ok());
BlockingHttpClient client = BuilderUtils.newClientBuilder(serverContext, CLIENT_CTX, protocol)
.appendConnectionFactoryFilter(original -> new DelegatingConnectionFactory<InetSocketAddress,
Expand All @@ -86,15 +90,21 @@ public Single<FilterableStreamingHttpConnection> newConnection(InetSocketAddress
}
})
.appendClientFilter(new RetryingHttpRequesterFilter.Builder()
.retryRetryableExceptions((metaData, t) -> {
errors.add((Throwable) t);
return BackOffPolicy.ofImmediateBounded();
})
.retryOther((metaData, t) -> {
errors.add(t);
return BackOffPolicy.ofImmediateBounded();
})
.build())
.buildBlocking()) {

HttpResponse response = client.request(client.get("/"));
assertThat(response.status(), is(OK));
Assertions.assertDoesNotThrow(() -> {
HttpResponse response = client.request(client.get("/"));
assertThat(response.status(), is(OK));
});

assertThat("Unexpected number of errors, likely retried more than expected", errors, hasSize(1));
assertThat("Did not propagate original IoException", errors.poll(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
}

// We expect this to timeout, because we have not completed the outstanding request.
assertFalse(onServerCloseLatch.await(30, MILLISECONDS));
assertFalse(onServerCloseLatch.await(300, MILLISECONDS));

requestBody.onComplete();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ void expiringAHostDoesntRaceWithConnectionAdding() throws Exception {
return null;
}).toFuture();

// Note that this will send a `EXPIRED` event and the host will remain, but be marked unhealthy.
sendServiceDiscoveryEvents(downEvent("address-1"));
f.get();

Expand All @@ -157,7 +158,10 @@ void expiringAHostDoesntRaceWithConnectionAdding() throws Exception {
// Confirm host is expired:
ExecutionException ex = assertThrows(ExecutionException.class,
() -> lb.selectConnection(alwaysNewConnectionFilter(), null).toFuture().get());
assertThat(ex.getCause(), instanceOf(NoAvailableHostException.class));

// We expect a `NoActiveHostException` because the host exists, but was marked
// as unhealthy because it is expired.
assertThat(ex.getCause(), instanceOf(NoActiveHostException.class));

lb.closeAsyncGracefully().toFuture().get();
verify(connection.get(), times(1)).closeAsyncGracefully();
Expand Down

0 comments on commit a1e136b

Please sign in to comment.