Skip to content

Commit

Permalink
H2PriorKnowledgeFeatureParityTest.serverGracefulClose ConnectionAccep…
Browse files Browse the repository at this point in the history
…tor disable offloading (#2444)

Motivation:
H2PriorKnowledgeFeatureParityTest.serverGracefulClose installs a ConnectionAcceptor which
intercepts messages through the netty pipeline. However the filter is offloaded by default
and may miss events.

Modifications:
- Override `requiredOffloads` when creating the `ConnectionAcceptorFactory` to disable
  offloading.

Fixes:
#2378
  • Loading branch information
Scottmitch authored Nov 24, 2022
1 parent dd426a9 commit 40697bf
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@
import io.servicetalk.http.api.StreamingHttpServiceFilterFactory;
import io.servicetalk.http.netty.NettyHttp2ExceptionUtils.H2StreamResetException;
import io.servicetalk.logging.api.LogLevel;
import io.servicetalk.transport.api.ConnectExecutionStrategy;
import io.servicetalk.transport.api.ConnectionAcceptor;
import io.servicetalk.transport.api.ConnectionAcceptorFactory;
import io.servicetalk.transport.api.ConnectionContext;
import io.servicetalk.transport.api.DelegatingConnectionAcceptor;
import io.servicetalk.transport.api.HostAndPort;
Expand Down Expand Up @@ -1843,11 +1846,23 @@ private InetSocketAddress bindHttpEchoServer(@Nullable StreamingHttpServiceFilte
}

if (connectionOnClosingLatch != null) {
serverBuilder.appendConnectionAcceptorFilter(original -> new DelegatingConnectionAcceptor(original) {
serverBuilder.appendConnectionAcceptorFilter(new ConnectionAcceptorFactory() {
@Override
public Completable accept(final ConnectionContext context) {
onGracefulClosureStarted(context, connectionOnClosingLatch);
return completed();
public ConnectionAcceptor create(final ConnectionAcceptor original) {
return new DelegatingConnectionAcceptor(original) {
@Override
public Completable accept(final ConnectionContext context) {
return Completable.defer(() -> {
onGracefulClosureStarted(context, connectionOnClosingLatch);
return completed().shareContextOnSubscribe();
});
}
};
}

@Override
public ConnectExecutionStrategy requiredOffloads() {
return ConnectExecutionStrategy.offloadNone();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.TestPublisher;
import io.servicetalk.concurrent.api.TestSubscription;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.StreamingHttpClient;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.transport.api.ConnectExecutionStrategy;
import io.servicetalk.transport.api.ConnectionAcceptor;
import io.servicetalk.transport.api.ConnectionAcceptorFactory;
import io.servicetalk.transport.api.ConnectionContext;
import io.servicetalk.transport.api.DelegatingConnectionAcceptor;
import io.servicetalk.transport.api.ServerContext;
import io.servicetalk.transport.netty.internal.ExecutionContextExtension;
import io.servicetalk.transport.netty.internal.FlushStrategy;
Expand Down Expand Up @@ -89,11 +95,26 @@ void updateFlushStrategy(HttpExecutionStrategy serverExecutionStrategy,

serverContext = HttpServers.forAddress(localAddress(0))
.ioExecutor(contextRule.ioExecutor())
.appendConnectionAcceptorFilter(original -> original.append(ctx -> {
customCancellableRef.set(
((NettyConnectionContext) ctx).updateFlushStrategy((__, ___) -> customStrategy));
return completed();
}))
.appendConnectionAcceptorFilter(new ConnectionAcceptorFactory() {
@Override
public ConnectionAcceptor create(ConnectionAcceptor original) {
return new DelegatingConnectionAcceptor(original) {
@Override
public Completable accept(final ConnectionContext context) {
return Completable.defer(() -> {
customCancellableRef.set(((NettyConnectionContext) context)
.updateFlushStrategy((__, ___) -> customStrategy));
return completed().shareContextOnSubscribe();
});
}
};
}

@Override
public ConnectExecutionStrategy requiredOffloads() {
return ConnectExecutionStrategy.offloadNone();
}
})
.executionStrategy(serverExecutionStrategy)
.listenStreaming((ctx, request, responseFactory) -> {
if (handledFirstRequest.compareAndSet(false, true)) {
Expand Down

0 comments on commit 40697bf

Please sign in to comment.