Skip to content

Commit

Permalink
More idel feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
daschl committed Oct 3, 2023
1 parent 3f194cd commit 135653d
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,14 @@ public HttpExecutionStrategy executionStrategy() {
final StreamingHttpRequestResponseFactory reqRespFactory = defaultReqRespFactory(roConfig,
executionContext.bufferAllocator());

final StreamingHttpConnectionFilterFactory connectionFilterFactory =
StreamingHttpConnectionFilterFactory connectionFilterFactory =
ctx.builder.addIdleTimeoutConnectionFilter ?
appendConnectionFilter(ctx.builder.connectionFilterFactory, DEFAULT_IDLE_TIMEOUT_FILTER) :
ctx.builder.connectionFilterFactory;

connectionFilterFactory = appendConnectionFilter(connectionFilterFactory,
HttpMessageDiscardWatchdogClientFilter.INSTANCE);

if (roConfig.isH2PriorKnowledge() &&
// Direct connection or HTTP proxy
(!roConfig.hasProxy() || sslContext == null)) {
Expand Down Expand Up @@ -317,10 +321,6 @@ connectionFilterFactory, new AlpnReqRespFactoryFunc(
ctx.builder.retryingHttpRequesterFilter);
}

// This filter cleans up tracked and discarded message payloads.
currClientFilterFactory = appendFilter(currClientFilterFactory,
HttpMessageDiscardWatchdogClientFilter.INSTANCE);

// Internal retries must be one of the last filters in the chain.
currClientFilterFactory = appendFilter(currClientFilterFactory, InternalRetryingHttpClientFilter.INSTANCE);
FilterableStreamingHttpClient wrappedClient =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.FilterableStreamingHttpClient;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.StreamingHttpClientFilter;
import io.servicetalk.http.api.StreamingHttpClientFilterFactory;
import io.servicetalk.http.api.StreamingHttpConnectionFilter;
import io.servicetalk.http.api.StreamingHttpConnectionFilterFactory;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequester;
import io.servicetalk.http.api.StreamingHttpResponse;
Expand All @@ -38,7 +41,7 @@
* Filter which tracks HTTP responses and makes sure that if an exception is raised during filter pipeline
* processing message payload bodies are cleaned up.
*/
final class HttpMessageDiscardWatchdogClientFilter implements StreamingHttpClientFilterFactory {
final class HttpMessageDiscardWatchdogClientFilter implements StreamingHttpConnectionFilterFactory {

private static final ContextMap.Key<AtomicReference<Publisher<?>>> MESSAGE_PUBLISHER_KEY = ContextMap.Key
.newKey(HttpMessageDiscardWatchdogClientFilter.class.getName() + ".messagePublisher",
Expand All @@ -49,7 +52,7 @@ final class HttpMessageDiscardWatchdogClientFilter implements StreamingHttpClien
/**
* Instance of {@link HttpMessageDiscardWatchdogClientFilter}.
*/
static final StreamingHttpClientFilterFactory INSTANCE = new HttpMessageDiscardWatchdogClientFilter();
static final StreamingHttpConnectionFilterFactory INSTANCE = new HttpMessageDiscardWatchdogClientFilter();

/**
* Instance of {@link HttpLifecycleObserverRequesterFilter} with the cleaner implementation.
Expand All @@ -61,12 +64,11 @@ private HttpMessageDiscardWatchdogClientFilter() {
}

@Override
public StreamingHttpClientFilter create(final FilterableStreamingHttpClient client) {
return new StreamingHttpClientFilter(client) {
public StreamingHttpConnectionFilter create(final FilterableStreamingHttpConnection connection) {
return new StreamingHttpConnectionFilter(connection) {
@Override
protected Single<StreamingHttpResponse> request(final StreamingHttpRequester delegate,
final StreamingHttpRequest request) {
return delegate.request(request).map(response -> {
public Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
return delegate().request(request).map(response -> {
// always write the buffer publisher into the request context. When a downstream subscriber
// arrives, mark the message as subscribed explicitly (having a message present and no
// subscription is an indicator that it must be freed later on).
Expand All @@ -85,10 +87,7 @@ protected Single<StreamingHttpResponse> request(final StreamingHttpRequester del
}

return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> {
final AtomicReference<?> maybePublisher = request.context().get(MESSAGE_PUBLISHER_KEY);
if (maybePublisher != null) {
maybePublisher.set(null);
}
reference.set(null);
return HttpMessageDiscardWatchdogServiceFilter.NoopSubscriber.INSTANCE;
}));
});
Expand All @@ -108,18 +107,21 @@ public StreamingHttpClientFilter create(final FilterableStreamingHttpClient clie
@Override
protected Single<StreamingHttpResponse> request(final StreamingHttpRequester delegate,
final StreamingHttpRequest request) {
return delegate.request(request).onErrorResume(originalThrowable -> Single.defer(() -> {
return delegate.request(request).onErrorResume(originalThrowable -> {
final AtomicReference<?> maybePublisher = request.context().get(MESSAGE_PUBLISHER_KEY);
if (maybePublisher != null) {
Publisher<?> message = (Publisher<?>) maybePublisher.get();
if (message != null) {
// No-one subscribed to the message (or there is none), so if there is a message
// proactively clean it up.
return message.ignoreElements().concat(Single.failed(originalThrowable));
return message
.ignoreElements()
.concat(Single.<StreamingHttpResponse>failed(originalThrowable))
.shareContextOnSubscribe();
}
}
return Single.failed(originalThrowable);
}));
return Single.<StreamingHttpResponse>failed(originalThrowable).shareContextOnSubscribe();
});
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,7 @@ public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
}

return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> {
final AtomicReference<?> maybePublisher = request.context().get(MESSAGE_PUBLISHER_KEY);
if (maybePublisher != null) {
maybePublisher.set(null);
}
reference.set(null);
return NoopSubscriber.INSTANCE;
}));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@ public Single<StreamingHttpResponse> apply(final StreamingHttpRequester requeste
public String toString() {
return "Throws exception in filter which drops message";
}
}, DeliberateException.class),
Arguments.of(new ResponseTransformer() {
@Override
public Single<StreamingHttpResponse> apply(final StreamingHttpRequester requester,
final StreamingHttpRequest request) {
return requester
.request(request)
.flatMap(dropped -> Single.failed(new DeliberateException()));
}

@Override
public String toString() {
return "Returns a failed Single which drops message";
}
}, DeliberateException.class)
);
}
Expand Down

0 comments on commit 135653d

Please sign in to comment.