From 515dd297c91a9cbed994ed402f39ebd428e6a7d6 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Thu, 22 Aug 2024 17:18:54 -0600 Subject: [PATCH 01/18] http-netty: let RetryingHttpRequesterFilter return responses on failure Motivation: Sometimes people just want to get the last failed response when the retry loop ends. However, right now we only yield the exceptions that where created. Users can't do this smuggling themselves in a generic way via the HttpResponseException because it could lead to resource leaks. Modifications: Let users simply return the last failed response when the retry loop exits unsuccessfully. --- .../netty/RetryingHttpRequesterFilter.java | 112 +++++++++++++++--- 1 file changed, 95 insertions(+), 17 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index d5431fae35..d919b182bc 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -21,6 +21,8 @@ import io.servicetalk.client.api.LoadBalancerReadyEvent; import io.servicetalk.client.api.NoAvailableHostException; import io.servicetalk.client.api.ServiceDiscoverer; +import io.servicetalk.concurrent.Cancellable; +import io.servicetalk.concurrent.CompletableSource; import io.servicetalk.concurrent.api.AsyncContext; import io.servicetalk.concurrent.api.BiIntFunction; import io.servicetalk.concurrent.api.Completable; @@ -91,15 +93,16 @@ public final class RetryingHttpRequesterFilter implements StreamingHttpClientFilterFactory, ExecutionStrategyInfluencer { static final int DEFAULT_MAX_TOTAL_RETRIES = 4; private static final RetryingHttpRequesterFilter DISABLE_AUTO_RETRIES = - new RetryingHttpRequesterFilter(true, false, false, 1, null, + new RetryingHttpRequesterFilter(true, false, false, false, 1, null, (__, ___) -> NO_RETRIES, null); private static final RetryingHttpRequesterFilter DISABLE_ALL_RETRIES = - new RetryingHttpRequesterFilter(false, true, false, 0, null, + new RetryingHttpRequesterFilter(false, true, false, false, 0, null, (__, ___) -> NO_RETRIES, null); private final boolean waitForLb; private final boolean ignoreSdErrors; private final boolean mayReplayRequestPayload; + private final boolean returnFailedResponses; private final int maxTotalRetries; @Nullable private final Function responseMapper; @@ -109,13 +112,14 @@ public final class RetryingHttpRequesterFilter RetryingHttpRequesterFilter( final boolean waitForLb, final boolean ignoreSdErrors, final boolean mayReplayRequestPayload, - final int maxTotalRetries, + final boolean returnFailedResponses, final int maxTotalRetries, @Nullable final Function responseMapper, final BiFunction retryFor, @Nullable final RetryCallbacks onRequestRetry) { this.waitForLb = waitForLb; this.ignoreSdErrors = ignoreSdErrors; this.mayReplayRequestPayload = mayReplayRequestPayload; + this.returnFailedResponses = returnFailedResponses; this.maxTotalRetries = maxTotalRetries; this.responseMapper = responseMapper; this.retryFor = retryFor; @@ -190,11 +194,13 @@ public Completable apply(final int count, final Throwable t) { !(lbEvent instanceof LoadBalancerReadyEvent && ((LoadBalancerReadyEvent) lbEvent).isReady())) .ignoreElements(); - return applyRetryCallbacks( + return applyRetryCallbacksAndDraining( sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus), count, t); } - final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, t); + // Unwrap a WrappedResponseException before asking the policy for a policy. + final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, + t instanceof WrappedResponseException ? ((WrappedResponseException) t).exception : t); if (backOffPolicy != NO_RETRIES) { final int offsetCount = count - lbNotReadyCount; Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t); @@ -203,15 +209,46 @@ public Completable apply(final int count, final Throwable t) { retryWhen = retryWhen.concat(executor.timer(constant)); } - return applyRetryCallbacks(retryWhen, count, t); + return applyRetryCallbacksAndDraining(retryWhen, count, t); } return failed(t); } - Completable applyRetryCallbacks(final Completable completable, final int retryCount, final Throwable t) { - return retryCallbacks == null ? completable : - completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t)); + Completable applyRetryCallbacksAndDraining(final Completable completable, final int retryCount, + final Throwable t) { + if (retryCallbacks == null && !(t instanceof WrappedResponseException)) { + // No wrap necessary. + return completable; + } + return completable.liftSync(subscriber -> new CompletableSource.Subscriber() { + @Override + public void onSubscribe(Cancellable cancellable) { + subscriber.onSubscribe(cancellable); + } + + @Override + public void onComplete() { + try { + if (retryCallbacks != null) { + retryCallbacks.beforeRetry(retryCount, requestMetaData, t); + } + if (t instanceof WrappedResponseException) { + drainResponse(((WrappedResponseException) t).response).subscribe(); + } + } finally { + subscriber.onComplete(); + } + } + + @Override + public void onError(Throwable tt) { + // If we're retrying due to a wrapped response it's because the users want the actual response, + // not an exception. Therefore, we return the wrapped response and let it get unwrapped at the + // end of the retry pipeline. + subscriber.onError(t instanceof WrappedResponseException ? t : tt); + } + }); } } @@ -258,19 +295,31 @@ protected Single request(final StreamingHttpRequester del if (responseMapper != null) { single = single.flatMap(resp -> { final HttpResponseException exception = responseMapper.apply(resp); - return (exception != null ? - // Drain response payload body before discarding it: - resp.payloadBody().ignoreElements().onErrorComplete() - .concat(Single.failed(exception)) : - Single.succeeded(resp)) - .shareContextOnSubscribe(); + if (exception == null) { + return Single.succeeded(resp).shareContextOnSubscribe(); + } + if (returnFailedResponses) { + return Single.failed(new WrappedResponseException(resp, exception)); + } else { + return drainResponse(resp).concat(Single.failed(exception)); + } }); } // 1. Metadata is shared across retries // 2. Publisher state is restored to original state for each retry // duplicatedRequest isn't used below because retryWhen must be applied outside the defer operator for (2). - return single.retryWhen(retryStrategy(request, executionContext(), true)); + single = single.retryWhen(retryStrategy(request, executionContext(), true)); + if (returnFailedResponses) { + single = single.onErrorResume(t -> { + if (t instanceof WrappedResponseException) { + return Single.succeeded(((WrappedResponseException) t).response); + } else { + return Single.failed(t); + } + }); + } + return single; } } @@ -719,6 +768,7 @@ public static final class Builder { private int maxTotalRetries = DEFAULT_MAX_TOTAL_RETRIES; private boolean retryExpectationFailed; + private boolean returnFailedResponses; private BiFunction retryRetryableExceptions = (requestMetaData, e) -> BackOffPolicy.ofImmediateBounded(); @@ -745,6 +795,11 @@ public static final class Builder { @Nullable private RetryCallbacks onRequestRetry; + public Builder returnFailedResponses(final boolean returnFailedResponses) { + this.returnFailedResponses = returnFailedResponses; + return this; + } + /** * By default, automatic retries wait for the associated {@link LoadBalancer} to be * {@link LoadBalancerReadyEvent ready} before triggering a retry for requests. This behavior may add latency to @@ -1054,7 +1109,30 @@ public RetryingHttpRequesterFilter build() { return NO_RETRIES; }; return new RetryingHttpRequesterFilter(waitForLb, ignoreSdErrors, mayReplayRequestPayload, - maxTotalRetries, responseMapper, allPredicate, onRequestRetry); + returnFailedResponses, maxTotalRetries, responseMapper, allPredicate, onRequestRetry); + } + } + + private static Completable drainResponse(StreamingHttpResponse resp) { + return resp.payloadBody().ignoreElements().onErrorComplete(); + } + + private static final class WrappedResponseException extends Exception { + + private static final long serialVersionUID = 3905983622734400759L; + + final StreamingHttpResponse response; + final HttpResponseException exception; + + WrappedResponseException(final StreamingHttpResponse response, final HttpResponseException exception) { + this.response = response; + this.exception = exception; + } + + @Override + public synchronized Throwable fillInStackTrace() { + // just a carrier, the stack traces are not important. + return this; } } } From 5b7f01445a3590ab5ce0bd07a9fc50aadb18c4ed Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Thu, 22 Aug 2024 17:42:39 -0600 Subject: [PATCH 02/18] Add a test --- .../RetryingHttpRequesterFilterTest.java | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java index d7c5136bd3..bd09490638 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java @@ -65,6 +65,7 @@ import org.junit.jupiter.params.provider.ValueSource; import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; @@ -251,21 +252,30 @@ private void assertRequestRetryingPred(final BlockingHttpClient client) { assertThat("Unexpected calls to select.", (double) lbSelectInvoked.get(), closeTo(5.0, 1.0)); } - @Test - void testResponseMapper() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testResponseMapper(final boolean returnFailedResponses) throws Exception { AtomicInteger newConnectionCreated = new AtomicInteger(); AtomicInteger responseDrained = new AtomicInteger(); AtomicInteger onRequestRetryCounter = new AtomicInteger(); final int maxTotalRetries = 4; normalClient = normalClientBuilder .appendClientFilter(new Builder() + .returnFailedResponses(returnFailedResponses) .maxTotalRetries(maxTotalRetries) .responseMapper(metaData -> metaData.headers().contains(RETRYABLE_HEADER) ? new HttpResponseException("Retryable header", metaData) : null) // Disable request retrying .retryRetryableExceptions((requestMetaData, e) -> ofNoRetries()) // Retry only responses marked so - .retryResponses((requestMetaData, throwable) -> ofImmediate(maxTotalRetries - 1)) + .retryResponses((requestMetaData, throwable) -> { + if (throwable instanceof HttpResponseException && + ((HttpResponseException) throwable).getMessage().equals("Retryable header")) { + return ofImmediate(maxTotalRetries - 1); + } else { + throw new RuntimeException("Unexpected exception type"); + } + }) .onRequestRetry((count, req, t) -> assertThat(onRequestRetryCounter.incrementAndGet(), is(count))) .build()) @@ -281,9 +291,14 @@ public Single request(final StreamingHttpRequest request) }; }) .buildBlocking(); - HttpResponseException e = assertThrows(HttpResponseException.class, - () -> normalClient.request(normalClient.get("/"))); - assertThat("Unexpected exception.", e, instanceOf(HttpResponseException.class)); + if (returnFailedResponses) { + HttpResponse response = normalClient.request(normalClient.get("/")); + assertThat(response.status(), is(HttpResponseStatus.OK)); + } else { + HttpResponseException e = assertThrows(HttpResponseException.class, + () -> normalClient.request(normalClient.get("/"))); + assertThat("Unexpected exception.", e, instanceOf(HttpResponseException.class)); + } // The load balancer is allowed to be not ready one time, which is counted against total retry attempts but not // against actual requests being issued. assertThat("Unexpected calls to select.", lbSelectInvoked.get(), allOf(greaterThanOrEqualTo(maxTotalRetries), From b08742de64d6b70b2964bef0bebdad9614a67269 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Thu, 22 Aug 2024 17:51:03 -0600 Subject: [PATCH 03/18] make linters happy --- .../http/netty/RetryingHttpRequesterFilterTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java index bd09490638..63e4f64efc 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java @@ -65,7 +65,6 @@ import org.junit.jupiter.params.provider.ValueSource; import java.net.InetSocketAddress; -import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; @@ -259,21 +258,22 @@ void testResponseMapper(final boolean returnFailedResponses) throws Exception { AtomicInteger responseDrained = new AtomicInteger(); AtomicInteger onRequestRetryCounter = new AtomicInteger(); final int maxTotalRetries = 4; + final String retryMessage = "Retryable header"; normalClient = normalClientBuilder .appendClientFilter(new Builder() .returnFailedResponses(returnFailedResponses) .maxTotalRetries(maxTotalRetries) .responseMapper(metaData -> metaData.headers().contains(RETRYABLE_HEADER) ? - new HttpResponseException("Retryable header", metaData) : null) + new HttpResponseException(retryMessage, metaData) : null) // Disable request retrying .retryRetryableExceptions((requestMetaData, e) -> ofNoRetries()) // Retry only responses marked so .retryResponses((requestMetaData, throwable) -> { if (throwable instanceof HttpResponseException && - ((HttpResponseException) throwable).getMessage().equals("Retryable header")) { + retryMessage.equals(throwable.getMessage())) { return ofImmediate(maxTotalRetries - 1); } else { - throw new RuntimeException("Unexpected exception type"); + throw new RuntimeException("Unexpected exception"); } }) .onRequestRetry((count, req, t) -> From 3493dc4b526af8c9445826fd5643eb483ddd6887 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Tue, 8 Oct 2024 13:03:02 -0600 Subject: [PATCH 04/18] Some of idels feedback --- .../netty/RetryingHttpRequesterFilter.java | 73 +++++-------------- 1 file changed, 20 insertions(+), 53 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index d919b182bc..71b640490b 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -194,13 +194,14 @@ public Completable apply(final int count, final Throwable t) { !(lbEvent instanceof LoadBalancerReadyEvent && ((LoadBalancerReadyEvent) lbEvent).isReady())) .ignoreElements(); - return applyRetryCallbacksAndDraining( + return applyRetryCallbacks( sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus), count, t); } // Unwrap a WrappedResponseException before asking the policy for a policy. final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, - t instanceof WrappedResponseException ? ((WrappedResponseException) t).exception : t); + returnFailedResponses && t instanceof WrappedResponseException ? + ((WrappedResponseException) t).exception : t); if (backOffPolicy != NO_RETRIES) { final int offsetCount = count - lbNotReadyCount; Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t); @@ -209,46 +210,16 @@ public Completable apply(final int count, final Throwable t) { retryWhen = retryWhen.concat(executor.timer(constant)); } - return applyRetryCallbacksAndDraining(retryWhen, count, t); + return applyRetryCallbacks(retryWhen, count, t); } return failed(t); } - Completable applyRetryCallbacksAndDraining(final Completable completable, final int retryCount, - final Throwable t) { - if (retryCallbacks == null && !(t instanceof WrappedResponseException)) { - // No wrap necessary. - return completable; - } - return completable.liftSync(subscriber -> new CompletableSource.Subscriber() { - @Override - public void onSubscribe(Cancellable cancellable) { - subscriber.onSubscribe(cancellable); - } - - @Override - public void onComplete() { - try { - if (retryCallbacks != null) { - retryCallbacks.beforeRetry(retryCount, requestMetaData, t); - } - if (t instanceof WrappedResponseException) { - drainResponse(((WrappedResponseException) t).response).subscribe(); - } - } finally { - subscriber.onComplete(); - } - } - - @Override - public void onError(Throwable tt) { - // If we're retrying due to a wrapped response it's because the users want the actual response, - // not an exception. Therefore, we return the wrapped response and let it get unwrapped at the - // end of the retry pipeline. - subscriber.onError(t instanceof WrappedResponseException ? t : tt); - } - }); + Completable applyRetryCallbacks(final Completable completable, final int retryCount, + final Throwable t) { + return retryCallbacks == null ? completable : + completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t)); } } @@ -295,14 +266,16 @@ protected Single request(final StreamingHttpRequester del if (responseMapper != null) { single = single.flatMap(resp -> { final HttpResponseException exception = responseMapper.apply(resp); + Single respSingle; if (exception == null) { - return Single.succeeded(resp).shareContextOnSubscribe(); - } - if (returnFailedResponses) { - return Single.failed(new WrappedResponseException(resp, exception)); + respSingle = Single.succeeded(resp); } else { - return drainResponse(resp).concat(Single.failed(exception)); + // Drain response payload body before packaging it + respSingle = resp.payloadBody().ignoreElements().onErrorComplete() + .concat(Single.failed( + returnFailedResponses ? new WrappedResponseException(resp, exception) : exception)); } + return respSingle.shareContextOnSubscribe(); }); } @@ -311,13 +284,11 @@ protected Single request(final StreamingHttpRequester del // duplicatedRequest isn't used below because retryWhen must be applied outside the defer operator for (2). single = single.retryWhen(retryStrategy(request, executionContext(), true)); if (returnFailedResponses) { - single = single.onErrorResume(t -> { - if (t instanceof WrappedResponseException) { - return Single.succeeded(((WrappedResponseException) t).response); - } else { - return Single.failed(t); - } - }); + single = single.onErrorResume(WrappedResponseException.class, t -> Single.succeeded( + // The previous message was already drained but we can't just 'set' it because it then + // does a weird flow control thing. Therefore, we cheat by transforming in a way that + // simply discards the original. + t.response.transformMessageBody(ignored -> Publisher.empty()))); } return single; } @@ -1113,10 +1084,6 @@ public RetryingHttpRequesterFilter build() { } } - private static Completable drainResponse(StreamingHttpResponse resp) { - return resp.payloadBody().ignoreElements().onErrorComplete(); - } - private static final class WrappedResponseException extends Exception { private static final long serialVersionUID = 3905983622734400759L; From e80e98ecb1fd2fd64a3549d836fe88b82987dda3 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Tue, 8 Oct 2024 14:28:49 -0600 Subject: [PATCH 05/18] Remove the second wrapper --- .../netty/RetryingHttpRequesterFilter.java | 58 +++++-------------- 1 file changed, 16 insertions(+), 42 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index 71b640490b..3c10627c77 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -21,8 +21,6 @@ import io.servicetalk.client.api.LoadBalancerReadyEvent; import io.servicetalk.client.api.NoAvailableHostException; import io.servicetalk.client.api.ServiceDiscoverer; -import io.servicetalk.concurrent.Cancellable; -import io.servicetalk.concurrent.CompletableSource; import io.servicetalk.concurrent.api.AsyncContext; import io.servicetalk.concurrent.api.BiIntFunction; import io.servicetalk.concurrent.api.Completable; @@ -31,6 +29,7 @@ import io.servicetalk.concurrent.api.RetryStrategies; import io.servicetalk.concurrent.api.Single; import io.servicetalk.context.api.ContextMap; +import io.servicetalk.http.api.DefaultHttpHeadersFactory; import io.servicetalk.http.api.FilterableReservedStreamingHttpConnection; import io.servicetalk.http.api.FilterableStreamingHttpClient; import io.servicetalk.http.api.HttpExecutionStrategies; @@ -45,6 +44,7 @@ import io.servicetalk.http.api.StreamingHttpRequest; import io.servicetalk.http.api.StreamingHttpRequester; import io.servicetalk.http.api.StreamingHttpResponse; +import io.servicetalk.http.api.StreamingHttpResponses; import io.servicetalk.transport.api.ExecutionContext; import io.servicetalk.transport.api.ExecutionStrategyInfluencer; import io.servicetalk.transport.api.RetryableException; @@ -57,6 +57,7 @@ import java.util.function.UnaryOperator; import javax.annotation.Nullable; +import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR; import static io.servicetalk.concurrent.api.Completable.completed; import static io.servicetalk.concurrent.api.Completable.failed; import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffDeltaJitter; @@ -198,10 +199,7 @@ public Completable apply(final int count, final Throwable t) { sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus), count, t); } - // Unwrap a WrappedResponseException before asking the policy for a policy. - final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, - returnFailedResponses && t instanceof WrappedResponseException ? - ((WrappedResponseException) t).exception : t); + final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, t); if (backOffPolicy != NO_RETRIES) { final int offsetCount = count - lbNotReadyCount; Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t); @@ -216,8 +214,7 @@ public Completable apply(final int count, final Throwable t) { return failed(t); } - Completable applyRetryCallbacks(final Completable completable, final int retryCount, - final Throwable t) { + Completable applyRetryCallbacks(final Completable completable, final int retryCount, final Throwable t) { return retryCallbacks == null ? completable : completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t)); } @@ -266,16 +263,12 @@ protected Single request(final StreamingHttpRequester del if (responseMapper != null) { single = single.flatMap(resp -> { final HttpResponseException exception = responseMapper.apply(resp); - Single respSingle; - if (exception == null) { - respSingle = Single.succeeded(resp); - } else { - // Drain response payload body before packaging it - respSingle = resp.payloadBody().ignoreElements().onErrorComplete() - .concat(Single.failed( - returnFailedResponses ? new WrappedResponseException(resp, exception) : exception)); - } - return respSingle.shareContextOnSubscribe(); + return (exception != null ? + // Drain response payload body before discarding it: + resp.payloadBody().ignoreElements().onErrorComplete() + .concat(Single.failed(exception)) : + Single.succeeded(resp)) + .shareContextOnSubscribe(); }); } @@ -284,11 +277,11 @@ protected Single request(final StreamingHttpRequester del // duplicatedRequest isn't used below because retryWhen must be applied outside the defer operator for (2). single = single.retryWhen(retryStrategy(request, executionContext(), true)); if (returnFailedResponses) { - single = single.onErrorResume(WrappedResponseException.class, t -> Single.succeeded( - // The previous message was already drained but we can't just 'set' it because it then - // does a weird flow control thing. Therefore, we cheat by transforming in a way that - // simply discards the original. - t.response.transformMessageBody(ignored -> Publisher.empty()))); + single = single.onErrorResume(HttpResponseException.class, t -> { + HttpResponseMetaData metaData = t.metaData(); + return Single.succeeded(StreamingHttpResponses.newResponse(metaData.status(), metaData.version(), + metaData.headers(), DEFAULT_ALLOCATOR, DefaultHttpHeadersFactory.INSTANCE)); + }); } return single; } @@ -1083,23 +1076,4 @@ public RetryingHttpRequesterFilter build() { returnFailedResponses, maxTotalRetries, responseMapper, allPredicate, onRequestRetry); } } - - private static final class WrappedResponseException extends Exception { - - private static final long serialVersionUID = 3905983622734400759L; - - final StreamingHttpResponse response; - final HttpResponseException exception; - - WrappedResponseException(final StreamingHttpResponse response, final HttpResponseException exception) { - this.response = response; - this.exception = exception; - } - - @Override - public synchronized Throwable fillInStackTrace() { - // just a carrier, the stack traces are not important. - return this; - } - } } From 49d272d21641b24deeabf9c4450c76dc2f8f8818 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Mon, 28 Oct 2024 14:26:45 -0600 Subject: [PATCH 06/18] Keep the response body --- .../netty/RetryingHttpRequesterFilter.java | 41 +++++++++++++------ 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index 3c10627c77..3c5b1e647b 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -29,7 +29,6 @@ import io.servicetalk.concurrent.api.RetryStrategies; import io.servicetalk.concurrent.api.Single; import io.servicetalk.context.api.ContextMap; -import io.servicetalk.http.api.DefaultHttpHeadersFactory; import io.servicetalk.http.api.FilterableReservedStreamingHttpConnection; import io.servicetalk.http.api.FilterableStreamingHttpClient; import io.servicetalk.http.api.HttpExecutionStrategies; @@ -44,10 +43,10 @@ import io.servicetalk.http.api.StreamingHttpRequest; import io.servicetalk.http.api.StreamingHttpRequester; import io.servicetalk.http.api.StreamingHttpResponse; -import io.servicetalk.http.api.StreamingHttpResponses; import io.servicetalk.transport.api.ExecutionContext; import io.servicetalk.transport.api.ExecutionStrategyInfluencer; import io.servicetalk.transport.api.RetryableException; +import io.servicetalk.utils.internal.ThrowableUtils; import java.io.IOException; import java.time.Duration; @@ -57,7 +56,6 @@ import java.util.function.UnaryOperator; import javax.annotation.Nullable; -import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR; import static io.servicetalk.concurrent.api.Completable.completed; import static io.servicetalk.concurrent.api.Completable.failed; import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffDeltaJitter; @@ -215,8 +213,17 @@ public Completable apply(final int count, final Throwable t) { } Completable applyRetryCallbacks(final Completable completable, final int retryCount, final Throwable t) { - return retryCallbacks == null ? completable : - completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t)); + Completable result = (retryCallbacks == null ? completable : + completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t))); + if (returnFailedResponses && t instanceof HttpResponseException && + ((HttpResponseException) t).metaData() instanceof StreamingHttpResponse) { + // If we succeed, we need to drain the response body before we continue. If we fail we want to + // surface the original exception and don't worry about draining since it will be returned to + // the user. + result = result.onErrorMap(backoffError -> ThrowableUtils.addSuppressed(t, backoffError)) + .concat(drain((StreamingHttpResponse) ((HttpResponseException) t).metaData())); + } + return result; } } @@ -263,12 +270,16 @@ protected Single request(final StreamingHttpRequester del if (responseMapper != null) { single = single.flatMap(resp -> { final HttpResponseException exception = responseMapper.apply(resp); - return (exception != null ? - // Drain response payload body before discarding it: - resp.payloadBody().ignoreElements().onErrorComplete() - .concat(Single.failed(exception)) : - Single.succeeded(resp)) - .shareContextOnSubscribe(); + Single response; + if (exception == null) { + response = Single.succeeded(resp); + } else { + response = Single.failed(exception); + if (!returnFailedResponses) { + response = drain(resp).concat(response); + } + } + return response.shareContextOnSubscribe(); }); } @@ -279,8 +290,8 @@ protected Single request(final StreamingHttpRequester del if (returnFailedResponses) { single = single.onErrorResume(HttpResponseException.class, t -> { HttpResponseMetaData metaData = t.metaData(); - return Single.succeeded(StreamingHttpResponses.newResponse(metaData.status(), metaData.version(), - metaData.headers(), DEFAULT_ALLOCATOR, DefaultHttpHeadersFactory.INSTANCE)); + return (metaData instanceof StreamingHttpResponse ? + Single.succeeded((StreamingHttpResponse) metaData) : Single.failed(t)); }); } return single; @@ -1076,4 +1087,8 @@ public RetryingHttpRequesterFilter build() { returnFailedResponses, maxTotalRetries, responseMapper, allPredicate, onRequestRetry); } } + + private static Completable drain(StreamingHttpResponse response) { + return response.payloadBody().ignoreElements().onErrorComplete(); + } } From 8d07708cccb8dea93d9883dfb07a82b5868b3828 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Thu, 31 Oct 2024 16:24:06 -0600 Subject: [PATCH 07/18] Make sure we drain the response if the backoff Completable is cancelled --- .../servicetalk/http/netty/RetryingHttpRequesterFilter.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index 3c5b1e647b..14ed12ab9d 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -217,11 +217,15 @@ Completable applyRetryCallbacks(final Completable completable, final int retryCo completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t))); if (returnFailedResponses && t instanceof HttpResponseException && ((HttpResponseException) t).metaData() instanceof StreamingHttpResponse) { + StreamingHttpResponse response = (StreamingHttpResponse) ((HttpResponseException) t).metaData(); // If we succeed, we need to drain the response body before we continue. If we fail we want to // surface the original exception and don't worry about draining since it will be returned to // the user. result = result.onErrorMap(backoffError -> ThrowableUtils.addSuppressed(t, backoffError)) - .concat(drain((StreamingHttpResponse) ((HttpResponseException) t).metaData())); + // If we get cancelled we also need to drain the message body as there is no guarantee + // we'll ever receive a completion event, error or success. + .beforeCancel(() -> drain(response).subscribe()) + .concat(drain(response)); } return result; } From 0a47202031318c96557c8bddf206b3849e9d7cec Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Mon, 2 Dec 2024 12:47:36 -0700 Subject: [PATCH 08/18] Feedback --- .../netty/RetryingHttpRequesterFilter.java | 57 +++++++++++++------ .../RetryingHttpRequesterFilterTest.java | 3 +- 2 files changed, 42 insertions(+), 18 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index 14ed12ab9d..23cf5921ed 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -48,6 +48,9 @@ import io.servicetalk.transport.api.RetryableException; import io.servicetalk.utils.internal.ThrowableUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.time.Duration; import java.util.concurrent.atomic.AtomicReference; @@ -90,6 +93,8 @@ */ public final class RetryingHttpRequesterFilter implements StreamingHttpClientFilterFactory, ExecutionStrategyInfluencer { + + private static final Logger LOGGER = LoggerFactory.getLogger(RetryingHttpRequesterFilter.class); static final int DEFAULT_MAX_TOTAL_RETRIES = 4; private static final RetryingHttpRequesterFilter DISABLE_AUTO_RETRIES = new RetryingHttpRequesterFilter(true, false, false, false, 1, null, @@ -215,17 +220,25 @@ public Completable apply(final int count, final Throwable t) { Completable applyRetryCallbacks(final Completable completable, final int retryCount, final Throwable t) { Completable result = (retryCallbacks == null ? completable : completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t))); - if (returnFailedResponses && t instanceof HttpResponseException && - ((HttpResponseException) t).metaData() instanceof StreamingHttpResponse) { - StreamingHttpResponse response = (StreamingHttpResponse) ((HttpResponseException) t).metaData(); - // If we succeed, we need to drain the response body before we continue. If we fail we want to - // surface the original exception and don't worry about draining since it will be returned to - // the user. - result = result.onErrorMap(backoffError -> ThrowableUtils.addSuppressed(t, backoffError)) - // If we get cancelled we also need to drain the message body as there is no guarantee - // we'll ever receive a completion event, error or success. - .beforeCancel(() -> drain(response).subscribe()) - .concat(drain(response)); + if (returnFailedResponses) { + if (t instanceof HttpResponseException && + ((HttpResponseException) t).metaData() instanceof StreamingHttpResponse) { + StreamingHttpResponse response = (StreamingHttpResponse) ((HttpResponseException) t).metaData(); + // If we succeed, we need to drain the response body before we continue. If we fail we want to + // surface the original exception and don't worry about draining since it will be returned to + // the user. + result = result.onErrorMap(backoffError -> ThrowableUtils.addSuppressed(t, backoffError)) + // If we get cancelled we also need to drain the message body as there is no guarantee + // we'll ever receive a completion event, error or success. + .beforeCancel(() -> drain(response).subscribe()) + .concat(drain(response)); + } else if (LOGGER.isDebugEnabled()) { + Class exceptionClass = t.getClass(); + Class metadataClass = t instanceof HttpResponseException ? + ((HttpResponseException) t).metaData().getClass() : null; + LOGGER.debug("Couldn't unpack response due to unexpected dynamic types. " + + "Exception class: {}, metadataClass: {}", exceptionClass, metadataClass); + } } return result; } @@ -774,11 +787,6 @@ public static final class Builder { @Nullable private RetryCallbacks onRequestRetry; - public Builder returnFailedResponses(final boolean returnFailedResponses) { - this.returnFailedResponses = returnFailedResponses; - return this; - } - /** * By default, automatic retries wait for the associated {@link LoadBalancer} to be * {@link LoadBalancerReadyEvent ready} before triggering a retry for requests. This behavior may add latency to @@ -835,6 +843,23 @@ public Builder maxTotalRetries(final int maxRetries) { * @return {@code this} */ public Builder responseMapper(final Function mapper) { + return responseMapper(mapper, false); + } + + /** + * Selectively map a {@link HttpResponseMetaData response} to an {@link HttpResponseException} that can match a + * retry behaviour through {@link #retryResponses(BiFunction)}. + * + * @param mapper a {@link Function} that maps a {@link HttpResponseMetaData} to an + * {@link HttpResponseException} or returns {@code null} if there is no mapping for response meta-data. The + * mapper should return {@code null} if no retry is needed or if it cannot be determined that a retry is needed. + * @param returnFailedResponses whether to unwrap the response defined by the {@link HttpResponseException} + * meta-data in the case that the response is not retried. + * @return {@code this} + */ + public Builder responseMapper(final Function mapper, + boolean returnFailedResponses) { + this.returnFailedResponses = returnFailedResponses; this.responseMapper = requireNonNull(mapper); return this; } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java index 63e4f64efc..d2894f92e0 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java @@ -261,10 +261,9 @@ void testResponseMapper(final boolean returnFailedResponses) throws Exception { final String retryMessage = "Retryable header"; normalClient = normalClientBuilder .appendClientFilter(new Builder() - .returnFailedResponses(returnFailedResponses) .maxTotalRetries(maxTotalRetries) .responseMapper(metaData -> metaData.headers().contains(RETRYABLE_HEADER) ? - new HttpResponseException(retryMessage, metaData) : null) + new HttpResponseException(retryMessage, metaData) : null, returnFailedResponses) // Disable request retrying .retryRetryableExceptions((requestMetaData, e) -> ofNoRetries()) // Retry only responses marked so From ea561aa40bf466459dccf29620e089caab2c0d4e Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Mon, 2 Dec 2024 15:00:52 -0700 Subject: [PATCH 09/18] Even better log messages --- .../http/netty/RetryingHttpRequesterFilter.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index 23cf5921ed..1e11c233f1 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -95,6 +95,7 @@ public final class RetryingHttpRequesterFilter implements StreamingHttpClientFilterFactory, ExecutionStrategyInfluencer { private static final Logger LOGGER = LoggerFactory.getLogger(RetryingHttpRequesterFilter.class); + static final int DEFAULT_MAX_TOTAL_RETRIES = 4; private static final RetryingHttpRequesterFilter DISABLE_AUTO_RETRIES = new RetryingHttpRequesterFilter(true, false, false, false, 1, null, @@ -233,11 +234,14 @@ Completable applyRetryCallbacks(final Completable completable, final int retryCo .beforeCancel(() -> drain(response).subscribe()) .concat(drain(response)); } else if (LOGGER.isDebugEnabled()) { - Class exceptionClass = t.getClass(); - Class metadataClass = t instanceof HttpResponseException ? - ((HttpResponseException) t).metaData().getClass() : null; - LOGGER.debug("Couldn't unpack response due to unexpected dynamic types. " + - "Exception class: {}, metadataClass: {}", exceptionClass, metadataClass); + if (!(t instanceof HttpResponseException)) { + LOGGER.debug("Couldn't unpack response due to unexpected dynamic types. Required " + + "exception of type HttpResponseException, found {}", t.getClass()); + } else { + LOGGER.debug("Couldn't unpack response due to unexpected dynamic types. Required " + + "meta-data of type StreamingHttpResponse, found {}", + ((HttpResponseException) t).metaData().getClass()); + } } } return result; From 428a2d12c8a5987fd24fc9cd051d1c82bf4c7bb2 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Fri, 20 Dec 2024 12:12:51 -0700 Subject: [PATCH 10/18] Michaels feedback --- .../servicetalk/http/netty/RetryingHttpRequesterFilter.java | 6 ++++-- .../http/netty/RetryingHttpRequesterFilterTest.java | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index 1e11c233f1..350dc34b4b 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -844,6 +844,8 @@ public Builder maxTotalRetries(final int maxRetries) { * @param mapper a {@link Function} that maps a {@link HttpResponseMetaData} to an * {@link HttpResponseException} or returns {@code null} if there is no mapping for response meta-data. The * mapper should return {@code null} if no retry is needed or if it cannot be determined that a retry is needed. + * In the case that the request cannot be retried, the {@link HttpResponseException} will be returned via the + * error pathway. * @return {@code this} */ public Builder responseMapper(final Function mapper) { @@ -858,11 +860,11 @@ public Builder responseMapper(final Function mapper, - boolean returnFailedResponses) { + final boolean returnFailedResponses) { this.returnFailedResponses = returnFailedResponses; this.responseMapper = requireNonNull(mapper); return this; diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java index d2894f92e0..1e07a3209d 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java @@ -251,7 +251,7 @@ private void assertRequestRetryingPred(final BlockingHttpClient client) { assertThat("Unexpected calls to select.", (double) lbSelectInvoked.get(), closeTo(5.0, 1.0)); } - @ParameterizedTest + @ParameterizedTest(name = "{displayName} [{index}]: returnFailedResponses={0}") @ValueSource(booleans = {true, false}) void testResponseMapper(final boolean returnFailedResponses) throws Exception { AtomicInteger newConnectionCreated = new AtomicInteger(); From f005576b6e4b412aedd10fcc181f71486c788df1 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Fri, 3 Jan 2025 12:58:53 -0700 Subject: [PATCH 11/18] feedback --- .../netty/RetryingHttpRequesterFilter.java | 69 +++++++++++-------- .../RetryingHttpRequesterFilterTest.java | 24 +++---- 2 files changed, 50 insertions(+), 43 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index 350dc34b4b..9a08885b98 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -107,7 +107,7 @@ public final class RetryingHttpRequesterFilter private final boolean waitForLb; private final boolean ignoreSdErrors; private final boolean mayReplayRequestPayload; - private final boolean returnFailedResponses; + private final boolean returnOriginalResponses; private final int maxTotalRetries; @Nullable private final Function responseMapper; @@ -117,14 +117,14 @@ public final class RetryingHttpRequesterFilter RetryingHttpRequesterFilter( final boolean waitForLb, final boolean ignoreSdErrors, final boolean mayReplayRequestPayload, - final boolean returnFailedResponses, final int maxTotalRetries, + final boolean returnOriginalResponses, final int maxTotalRetries, @Nullable final Function responseMapper, final BiFunction retryFor, @Nullable final RetryCallbacks onRequestRetry) { this.waitForLb = waitForLb; this.ignoreSdErrors = ignoreSdErrors; this.mayReplayRequestPayload = mayReplayRequestPayload; - this.returnFailedResponses = returnFailedResponses; + this.returnOriginalResponses = returnOriginalResponses; this.maxTotalRetries = maxTotalRetries; this.responseMapper = responseMapper; this.retryFor = retryFor; @@ -221,7 +221,7 @@ public Completable apply(final int count, final Throwable t) { Completable applyRetryCallbacks(final Completable completable, final int retryCount, final Throwable t) { Completable result = (retryCallbacks == null ? completable : completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t))); - if (returnFailedResponses) { + if (returnOriginalResponses) { if (t instanceof HttpResponseException && ((HttpResponseException) t).metaData() instanceof StreamingHttpResponse) { StreamingHttpResponse response = (StreamingHttpResponse) ((HttpResponseException) t).metaData(); @@ -233,12 +233,12 @@ Completable applyRetryCallbacks(final Completable completable, final int retryCo // we'll ever receive a completion event, error or success. .beforeCancel(() -> drain(response).subscribe()) .concat(drain(response)); - } else if (LOGGER.isDebugEnabled()) { + } else { if (!(t instanceof HttpResponseException)) { LOGGER.debug("Couldn't unpack response due to unexpected dynamic types. Required " + "exception of type HttpResponseException, found {}", t.getClass()); } else { - LOGGER.debug("Couldn't unpack response due to unexpected dynamic types. Required " + + LOGGER.info("Couldn't unpack response due to unexpected dynamic types. Required " + "meta-data of type StreamingHttpResponse, found {}", ((HttpResponseException) t).metaData().getClass()); } @@ -290,13 +290,18 @@ protected Single request(final StreamingHttpRequester del if (responseMapper != null) { single = single.flatMap(resp -> { - final HttpResponseException exception = responseMapper.apply(resp); + HttpResponseException exception = null; + try { + exception = responseMapper.apply(resp); + } catch (Throwable t) { + LOGGER.warn("Failed to map the response. Proceeding with original response.", t); + } Single response; if (exception == null) { response = Single.succeeded(resp); } else { response = Single.failed(exception); - if (!returnFailedResponses) { + if (!returnOriginalResponses) { response = drain(resp).concat(response); } } @@ -308,7 +313,7 @@ protected Single request(final StreamingHttpRequester del // 2. Publisher state is restored to original state for each retry // duplicatedRequest isn't used below because retryWhen must be applied outside the defer operator for (2). single = single.retryWhen(retryStrategy(request, executionContext(), true)); - if (returnFailedResponses) { + if (returnOriginalResponses) { single = single.onErrorResume(HttpResponseException.class, t -> { HttpResponseMetaData metaData = t.metaData(); return (metaData instanceof StreamingHttpResponse ? @@ -764,7 +769,7 @@ public static final class Builder { private int maxTotalRetries = DEFAULT_MAX_TOTAL_RETRIES; private boolean retryExpectationFailed; - private boolean returnFailedResponses; + private boolean returnOriginalResponses; private BiFunction retryRetryableExceptions = (requestMetaData, e) -> BackOffPolicy.ofImmediateBounded(); @@ -842,30 +847,12 @@ public Builder maxTotalRetries(final int maxRetries) { * retry behaviour through {@link #retryResponses(BiFunction)}. * * @param mapper a {@link Function} that maps a {@link HttpResponseMetaData} to an - * {@link HttpResponseException} or returns {@code null} if there is no mapping for response meta-data. The - * mapper should return {@code null} if no retry is needed or if it cannot be determined that a retry is needed. + * {@link HttpResponseException} or returns {@code null} if there is no mapping for response meta-data. * In the case that the request cannot be retried, the {@link HttpResponseException} will be returned via the * error pathway. * @return {@code this} */ public Builder responseMapper(final Function mapper) { - return responseMapper(mapper, false); - } - - /** - * Selectively map a {@link HttpResponseMetaData response} to an {@link HttpResponseException} that can match a - * retry behaviour through {@link #retryResponses(BiFunction)}. - * - * @param mapper a {@link Function} that maps a {@link HttpResponseMetaData} to an - * {@link HttpResponseException} or returns {@code null} if there is no mapping for response meta-data. The - * mapper should return {@code null} if no retry is needed or if it cannot be determined that a retry is needed. - * @param returnFailedResponses whether to unwrap the response defined by the {@link HttpResponseException} - * meta-data in the case that the request is not retried. - * @return {@code this} - */ - public Builder responseMapper(final Function mapper, - final boolean returnFailedResponses) { - this.returnFailedResponses = returnFailedResponses; this.responseMapper = requireNonNull(mapper); return this; } @@ -987,7 +974,29 @@ public Builder retryDelayedRetries(// FIXME: 0.43 - remove deprecated method */ public Builder retryResponses( final BiFunction mapper) { + return retryResponses(mapper, false); + } + + /** + * The retrying-filter will evaluate {@link HttpResponseException} that resulted from the + * {@link #responseMapper(Function)}, and support different retry behaviour according to the + * {@link HttpRequestMetaData request} and the {@link HttpResponseMetaData response}. + *

+ * To disable retries you can return {@link BackOffPolicy#NO_RETRIES} from the {@code mapper}. + *

+ * It's important that this {@link Function} doesn't block to avoid performance impacts. + * + * @param mapper The mapper to map the {@link HttpRequestMetaData} and the + * {@link DelayedRetry delayed-exception} to a {@link BackOffPolicy}. + * @param returnOriginalResponses whether to unwrap the response defined by the {@link HttpResponseException} + * meta-data in the case that the request is not retried. + * @return {@code this}. + */ + public Builder retryResponses( + final BiFunction mapper, + final boolean returnOriginalResponses) { this.retryResponses = requireNonNull(mapper); + this.returnOriginalResponses = returnOriginalResponses; return this; } @@ -1119,7 +1128,7 @@ public RetryingHttpRequesterFilter build() { return NO_RETRIES; }; return new RetryingHttpRequesterFilter(waitForLb, ignoreSdErrors, mayReplayRequestPayload, - returnFailedResponses, maxTotalRetries, responseMapper, allPredicate, onRequestRetry); + returnOriginalResponses, maxTotalRetries, responseMapper, allPredicate, onRequestRetry); } } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java index 1e07a3209d..435d05fe40 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java @@ -65,6 +65,7 @@ import org.junit.jupiter.params.provider.ValueSource; import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; @@ -104,6 +105,7 @@ class RetryingHttpRequesterFilterTest { private static final String RETRYABLE_HEADER = "RETRYABLE"; + private static final String RESPONSE_BODY = "ok"; private final ServerContext svcCtx; private final SingleAddressHttpClientBuilder normalClientBuilder; @@ -119,7 +121,8 @@ class RetryingHttpRequesterFilterTest { RetryingHttpRequesterFilterTest() throws Exception { svcCtx = forAddress(localAddress(0)) .listenBlockingAndAwait((ctx, request, responseFactory) -> responseFactory.ok() - .addHeader(RETRYABLE_HEADER, "yes")); + .addHeader(RETRYABLE_HEADER, "yes") + .payloadBody(ctx.executionContext().bufferAllocator().fromAscii(RESPONSE_BODY))); failingConnClientBuilder = forSingleAddress(serverHostAndPort(svcCtx)) .loadBalancerFactory(new DefaultHttpLoadBalancerFactory<>(new InspectingLoadBalancerFactory<>())) .appendConnectionFactoryFilter(ClosingConnectionFactory::new); @@ -251,9 +254,9 @@ private void assertRequestRetryingPred(final BlockingHttpClient client) { assertThat("Unexpected calls to select.", (double) lbSelectInvoked.get(), closeTo(5.0, 1.0)); } - @ParameterizedTest(name = "{displayName} [{index}]: returnFailedResponses={0}") + @ParameterizedTest(name = "{displayName} [{index}]: returnOriginalResponses={0}") @ValueSource(booleans = {true, false}) - void testResponseMapper(final boolean returnFailedResponses) throws Exception { + void testResponseMapper(final boolean returnOriginalResponses) throws Exception { AtomicInteger newConnectionCreated = new AtomicInteger(); AtomicInteger responseDrained = new AtomicInteger(); AtomicInteger onRequestRetryCounter = new AtomicInteger(); @@ -263,18 +266,12 @@ void testResponseMapper(final boolean returnFailedResponses) throws Exception { .appendClientFilter(new Builder() .maxTotalRetries(maxTotalRetries) .responseMapper(metaData -> metaData.headers().contains(RETRYABLE_HEADER) ? - new HttpResponseException(retryMessage, metaData) : null, returnFailedResponses) + new HttpResponseException(retryMessage, metaData) : null) // Disable request retrying .retryRetryableExceptions((requestMetaData, e) -> ofNoRetries()) // Retry only responses marked so - .retryResponses((requestMetaData, throwable) -> { - if (throwable instanceof HttpResponseException && - retryMessage.equals(throwable.getMessage())) { - return ofImmediate(maxTotalRetries - 1); - } else { - throw new RuntimeException("Unexpected exception"); - } - }) + .retryResponses((requestMetaData, throwable) -> ofImmediate(maxTotalRetries - 1), + returnOriginalResponses) .onRequestRetry((count, req, t) -> assertThat(onRequestRetryCounter.incrementAndGet(), is(count))) .build()) @@ -290,9 +287,10 @@ public Single request(final StreamingHttpRequest request) }; }) .buildBlocking(); - if (returnFailedResponses) { + if (returnOriginalResponses) { HttpResponse response = normalClient.request(normalClient.get("/")); assertThat(response.status(), is(HttpResponseStatus.OK)); + assertThat(response.payloadBody().toString(StandardCharsets.US_ASCII), equalTo(RESPONSE_BODY)); } else { HttpResponseException e = assertThrows(HttpResponseException.class, () -> normalClient.request(normalClient.get("/"))); From a628a89eaa152c2df733802b67be4aee40d77aa8 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Fri, 3 Jan 2025 16:30:21 -0700 Subject: [PATCH 12/18] Test for leaks --- .../netty/RetryingHttpRequesterFilter.java | 44 ++++++---- .../RetryingHttpRequesterFilterTest.java | 83 +++++++++++++++++++ 2 files changed, 111 insertions(+), 16 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index 9a08885b98..182b65fa48 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -203,7 +203,13 @@ public Completable apply(final int count, final Throwable t) { sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus), count, t); } - final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, t); + final BackOffPolicy backOffPolicy; + try { + backOffPolicy = retryFor.apply(requestMetaData, t); + } catch (Throwable tt) { + LOGGER.warn("Unexpected exception when computing backoff policy.", tt); + return failed(ThrowableUtils.addSuppressed(t, tt)); + } if (backOffPolicy != NO_RETRIES) { final int offsetCount = count - lbNotReadyCount; Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t); @@ -222,9 +228,8 @@ Completable applyRetryCallbacks(final Completable completable, final int retryCo Completable result = (retryCallbacks == null ? completable : completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t))); if (returnOriginalResponses) { - if (t instanceof HttpResponseException && - ((HttpResponseException) t).metaData() instanceof StreamingHttpResponse) { - StreamingHttpResponse response = (StreamingHttpResponse) ((HttpResponseException) t).metaData(); + final StreamingHttpResponse response = extractStreamingResponse(t); + if (response != null) { // If we succeed, we need to drain the response body before we continue. If we fail we want to // surface the original exception and don't worry about draining since it will be returned to // the user. @@ -233,15 +238,6 @@ Completable applyRetryCallbacks(final Completable completable, final int retryCo // we'll ever receive a completion event, error or success. .beforeCancel(() -> drain(response).subscribe()) .concat(drain(response)); - } else { - if (!(t instanceof HttpResponseException)) { - LOGGER.debug("Couldn't unpack response due to unexpected dynamic types. Required " + - "exception of type HttpResponseException, found {}", t.getClass()); - } else { - LOGGER.info("Couldn't unpack response due to unexpected dynamic types. Required " + - "meta-data of type StreamingHttpResponse, found {}", - ((HttpResponseException) t).metaData().getClass()); - } } } return result; @@ -290,11 +286,12 @@ protected Single request(final StreamingHttpRequester del if (responseMapper != null) { single = single.flatMap(resp -> { - HttpResponseException exception = null; + final HttpResponseException exception; try { exception = responseMapper.apply(resp); } catch (Throwable t) { - LOGGER.warn("Failed to map the response. Proceeding with original response.", t); + LOGGER.warn("Failed to map the response.", t); + return drain(resp).toSingle().flatMap(ignored -> Single.failed(t)); } Single response; if (exception == null) { @@ -1115,7 +1112,7 @@ public RetryingHttpRequesterFilter build() { if (retryResponses != null && throwable instanceof HttpResponseException) { final BackOffPolicy backOffPolicy = - retryResponses.apply(requestMetaData, (HttpResponseException) throwable); + retryResponses.apply(requestMetaData, (HttpResponseException) throwable); if (backOffPolicy != NO_RETRIES) { return backOffPolicy; } @@ -1135,4 +1132,19 @@ public RetryingHttpRequesterFilter build() { private static Completable drain(StreamingHttpResponse response) { return response.payloadBody().ignoreElements().onErrorComplete(); } + + @Nullable + private static StreamingHttpResponse extractStreamingResponse(Throwable t) { + if (t instanceof HttpResponseException) { + HttpResponseException responseException = (HttpResponseException) t; + if (responseException.metaData() instanceof StreamingHttpResponse) { + return (StreamingHttpResponse) responseException.metaData(); + } else { + LOGGER.info("Couldn't unpack response due to unexpected dynamic types. Required " + + "meta-data of type StreamingHttpResponse, found {}", + responseException.metaData().getClass()); + } + } + return null; + } } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java index 435d05fe40..8caf1b41a3 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java @@ -62,6 +62,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.net.InetSocketAddress; @@ -69,6 +71,7 @@ import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; +import java.util.stream.Stream; import javax.annotation.Nullable; import static io.servicetalk.concurrent.api.Single.defer; @@ -307,6 +310,86 @@ public Single request(final StreamingHttpRequest request) assertThat("Unexpected number of connections was created", newConnectionCreated.get(), is(1)); } + private enum LambdaException { + RESPONSE_MAPPER, + RETRY_RETRYABLE_EXCEPTIONS, + RETRY_RESPONSES, + ON_REQUEST_RETRY + } + + private static Stream lambdaExceptions() { + return Stream.of(true, false).flatMap(returnOriginalResponses -> + Stream.of(LambdaException.values()) + .map(lambda -> Arguments.of(returnOriginalResponses, lambda))); + } + + @ParameterizedTest(name = "{displayName} [{index}]: returnOriginalResponses={0}, thrower={1}") + @MethodSource("lambdaExceptions") + void lambdaExceptions(final boolean returnOriginalResponses, final LambdaException thrower) throws Exception { + + final AtomicInteger newConnectionCreated = new AtomicInteger(); + final AtomicInteger responsesReceived = new AtomicInteger(); + final AtomicInteger responseDrained = new AtomicInteger(); + final AtomicInteger onRequestRetryCounter = new AtomicInteger(); + final String retryMessage = "Retryable header"; + normalClient = normalClientBuilder + .appendClientFilter(new Builder() + .maxTotalRetries(4) + .responseMapper(metaData -> { + if (thrower == LambdaException.RESPONSE_MAPPER) { + throw new RuntimeException("responseMapper"); + } + return metaData.headers().contains(RETRYABLE_HEADER) ? + new HttpResponseException(retryMessage, metaData) : null; + }) + // Disable request retrying + .retryRetryableExceptions((requestMetaData, e) -> { + if (thrower == LambdaException.RETRY_RETRYABLE_EXCEPTIONS) { + throw new RuntimeException("retryRetryableExceptions"); + } + return ofNoRetries(); + }) + // Retry only responses marked so + .retryResponses((requestMetaData, throwable) -> { + if (thrower == LambdaException.RETRY_RESPONSES) { + throw new RuntimeException("retryResponses"); + } + return ofImmediate(3); + }, returnOriginalResponses) + .onRequestRetry((count, req, t) -> { + if (thrower == LambdaException.ON_REQUEST_RETRY) { + throw new RuntimeException("onRequestRetryThrows"); + } + assertThat(onRequestRetryCounter.incrementAndGet(), is(count)); + }) + .build()) + .appendConnectionFilter(c -> { + newConnectionCreated.incrementAndGet(); + return new StreamingHttpConnectionFilter(c) { + @Override + public Single request(final StreamingHttpRequest request) { + return delegate().request(request) + .map(response -> { + responsesReceived.incrementAndGet(); + return response.transformPayloadBody(payload -> payload + .whenFinally(responseDrained::incrementAndGet)); + }); + } + }; + }) + .buildBlocking(); + // TODO: we don't really want to accept different behavior but it seems like the retry operator will swallow + // exceptions that we attempt to bubble up through the retry-strategy failure channel. + if (returnOriginalResponses && thrower != LambdaException.RESPONSE_MAPPER) { + normalClient.request(normalClient.get("/")); + } else { + assertThrows(Exception.class, () -> normalClient.request(normalClient.get("/"))); + } + assertThat("Response payload body was not drained on every mapping", responseDrained.get(), + is(responsesReceived.get())); + assertThat("Unexpected number of connections was created", newConnectionCreated.get(), is(1)); + } + @Test void singleInstanceFilter() { Assertions.assertThrows(IllegalStateException.class, () -> forResolvedAddress(localAddress(8888)) From cdb48b8230ed00b4b5828700113cb1677022aa15 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Fri, 3 Jan 2025 17:31:56 -0700 Subject: [PATCH 13/18] Always bubble up expcetions in lambdas --- .../netty/RetryingHttpRequesterFilter.java | 9 ++- .../RetryingHttpRequesterFilterTest.java | 56 ++++++------------- 2 files changed, 26 insertions(+), 39 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index 182b65fa48..1de85c89d3 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -208,7 +208,14 @@ public Completable apply(final int count, final Throwable t) { backOffPolicy = retryFor.apply(requestMetaData, t); } catch (Throwable tt) { LOGGER.warn("Unexpected exception when computing backoff policy.", tt); - return failed(ThrowableUtils.addSuppressed(t, tt)); + Completable result = failed(ThrowableUtils.addSuppressed(tt, t)); + if (returnOriginalResponses) { + StreamingHttpResponse response = extractStreamingResponse(t); + if (response != null) { + result = drain(response).concat(result); + } + } + return result; } if (backOffPolicy != NO_RETRIES) { final int offsetCount = count - lbNotReadyCount; diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java index 8caf1b41a3..4e270d0391 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java @@ -310,25 +310,22 @@ public Single request(final StreamingHttpRequest request) assertThat("Unexpected number of connections was created", newConnectionCreated.get(), is(1)); } - private enum LambdaException { + private enum ExceptionSource { RESPONSE_MAPPER, - RETRY_RETRYABLE_EXCEPTIONS, - RETRY_RESPONSES, - ON_REQUEST_RETRY + RETRY_RESPONSES } private static Stream lambdaExceptions() { return Stream.of(true, false).flatMap(returnOriginalResponses -> - Stream.of(LambdaException.values()) + Stream.of(ExceptionSource.values()) .map(lambda -> Arguments.of(returnOriginalResponses, lambda))); } @ParameterizedTest(name = "{displayName} [{index}]: returnOriginalResponses={0}, thrower={1}") @MethodSource("lambdaExceptions") - void lambdaExceptions(final boolean returnOriginalResponses, final LambdaException thrower) throws Exception { - + void lambdaExceptions(final boolean returnOriginalResponses, final ExceptionSource thrower) { final AtomicInteger newConnectionCreated = new AtomicInteger(); - final AtomicInteger responsesReceived = new AtomicInteger(); + final AtomicInteger requestsInitiated = new AtomicInteger(); final AtomicInteger responseDrained = new AtomicInteger(); final AtomicInteger onRequestRetryCounter = new AtomicInteger(); final String retryMessage = "Retryable header"; @@ -336,57 +333,40 @@ void lambdaExceptions(final boolean returnOriginalResponses, final LambdaExcepti .appendClientFilter(new Builder() .maxTotalRetries(4) .responseMapper(metaData -> { - if (thrower == LambdaException.RESPONSE_MAPPER) { + if (thrower == ExceptionSource.RESPONSE_MAPPER) { throw new RuntimeException("responseMapper"); } return metaData.headers().contains(RETRYABLE_HEADER) ? new HttpResponseException(retryMessage, metaData) : null; }) - // Disable request retrying - .retryRetryableExceptions((requestMetaData, e) -> { - if (thrower == LambdaException.RETRY_RETRYABLE_EXCEPTIONS) { - throw new RuntimeException("retryRetryableExceptions"); - } - return ofNoRetries(); - }) // Retry only responses marked so .retryResponses((requestMetaData, throwable) -> { - if (thrower == LambdaException.RETRY_RESPONSES) { + if (thrower == ExceptionSource.RETRY_RESPONSES) { throw new RuntimeException("retryResponses"); } return ofImmediate(3); }, returnOriginalResponses) - .onRequestRetry((count, req, t) -> { - if (thrower == LambdaException.ON_REQUEST_RETRY) { - throw new RuntimeException("onRequestRetryThrows"); - } - assertThat(onRequestRetryCounter.incrementAndGet(), is(count)); - }) + .onRequestRetry((count, req, t) -> + assertThat(onRequestRetryCounter.incrementAndGet(), is(count))) .build()) .appendConnectionFilter(c -> { newConnectionCreated.incrementAndGet(); return new StreamingHttpConnectionFilter(c) { @Override public Single request(final StreamingHttpRequest request) { - return delegate().request(request) - .map(response -> { - responsesReceived.incrementAndGet(); - return response.transformPayloadBody(payload -> payload - .whenFinally(responseDrained::incrementAndGet)); - }); + return Single.defer(() -> { + requestsInitiated.incrementAndGet(); + return delegate().request(request) + .map(response -> response.transformPayloadBody(payload -> payload + .whenFinally(responseDrained::incrementAndGet))); + }); } }; }) .buildBlocking(); - // TODO: we don't really want to accept different behavior but it seems like the retry operator will swallow - // exceptions that we attempt to bubble up through the retry-strategy failure channel. - if (returnOriginalResponses && thrower != LambdaException.RESPONSE_MAPPER) { - normalClient.request(normalClient.get("/")); - } else { - assertThrows(Exception.class, () -> normalClient.request(normalClient.get("/"))); - } - assertThat("Response payload body was not drained on every mapping", responseDrained.get(), - is(responsesReceived.get())); + assertThrows(Exception.class, () -> normalClient.request(normalClient.get("/"))); + assertThat("Response payload body was not drained on every mapping", responseDrained.get(), is(1)); + assertThat("Multiple requests initiated", requestsInitiated.get(), is(1)); assertThat("Unexpected number of connections was created", newConnectionCreated.get(), is(1)); } From 7a5133867755646d045672799b1de1a1cc979dea Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Fri, 3 Jan 2025 17:38:44 -0700 Subject: [PATCH 14/18] Enhance code comment --- .../io/servicetalk/http/netty/RetryingHttpRequesterFilter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index 1de85c89d3..e8be30c45f 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -242,7 +242,8 @@ Completable applyRetryCallbacks(final Completable completable, final int retryCo // the user. result = result.onErrorMap(backoffError -> ThrowableUtils.addSuppressed(t, backoffError)) // If we get cancelled we also need to drain the message body as there is no guarantee - // we'll ever receive a completion event, error or success. + // we'll ever receive a completion event, error or success. This is okay to do since + // the subscriber has signaled they're no longer interested in the response. .beforeCancel(() -> drain(response).subscribe()) .concat(drain(response)); } From d5b8c3dd0f7d296331ef34aa2cb47da681b88c74 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Fri, 3 Jan 2025 17:40:31 -0700 Subject: [PATCH 15/18] log unexpected types at level warn --- .../io/servicetalk/http/netty/RetryingHttpRequesterFilter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index e8be30c45f..b245495137 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -1148,7 +1148,7 @@ private static StreamingHttpResponse extractStreamingResponse(Throwable t) { if (responseException.metaData() instanceof StreamingHttpResponse) { return (StreamingHttpResponse) responseException.metaData(); } else { - LOGGER.info("Couldn't unpack response due to unexpected dynamic types. Required " + + LOGGER.warn("Couldn't unpack response due to unexpected dynamic types. Required " + "meta-data of type StreamingHttpResponse, found {}", responseException.metaData().getClass()); } From b2f940d8c7c3e71ce7baeab5cfdb38e28efff869 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Fri, 3 Jan 2025 19:49:18 -0700 Subject: [PATCH 16/18] Another better code comment --- .../servicetalk/http/netty/RetryingHttpRequesterFilter.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index b245495137..98a614d699 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -237,9 +237,9 @@ Completable applyRetryCallbacks(final Completable completable, final int retryCo if (returnOriginalResponses) { final StreamingHttpResponse response = extractStreamingResponse(t); if (response != null) { - // If we succeed, we need to drain the response body before we continue. If we fail we want to - // surface the original exception and don't worry about draining since it will be returned to - // the user. + // If we succeed, we need to drain the response body before we continue. The retry completable + // fails we want to surface the original exception and don't worry about draining since the + // response will be returned to the user. result = result.onErrorMap(backoffError -> ThrowableUtils.addSuppressed(t, backoffError)) // If we get cancelled we also need to drain the message body as there is no guarantee // we'll ever receive a completion event, error or success. This is okay to do since From 0a7172e19e7b17b71e7a3cfe233e9757f75977c8 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Wed, 8 Jan 2025 09:33:33 -0700 Subject: [PATCH 17/18] More feedback --- .../netty/RetryingHttpRequesterFilter.java | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index 98a614d699..ccb7d08742 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -203,11 +203,22 @@ public Completable apply(final int count, final Throwable t) { sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus), count, t); } - final BackOffPolicy backOffPolicy; try { - backOffPolicy = retryFor.apply(requestMetaData, t); + BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, t); + if (backOffPolicy != NO_RETRIES) { + final int offsetCount = count - lbNotReadyCount; + Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t); + if (t instanceof DelayedRetryException) { + final Duration constant = ((DelayedRetryException) t).delay(); + retryWhen = retryWhen.concat(executor.timer(constant)); + } + + return applyRetryCallbacks(retryWhen, count, t); + } } catch (Throwable tt) { - LOGGER.warn("Unexpected exception when computing backoff policy.", tt); + LOGGER.error("Unexpected exception when computing and applying backoff policy for {}({}). " + + "User-defined functions should not throw.", + RetryingHttpRequesterFilter.class.getName(), t.getMessage(), tt); Completable result = failed(ThrowableUtils.addSuppressed(tt, t)); if (returnOriginalResponses) { StreamingHttpResponse response = extractStreamingResponse(t); @@ -217,16 +228,6 @@ public Completable apply(final int count, final Throwable t) { } return result; } - if (backOffPolicy != NO_RETRIES) { - final int offsetCount = count - lbNotReadyCount; - Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t); - if (t instanceof DelayedRetryException) { - final Duration constant = ((DelayedRetryException) t).delay(); - retryWhen = retryWhen.concat(executor.timer(constant)); - } - - return applyRetryCallbacks(retryWhen, count, t); - } return failed(t); } @@ -298,8 +299,9 @@ protected Single request(final StreamingHttpRequester del try { exception = responseMapper.apply(resp); } catch (Throwable t) { - LOGGER.warn("Failed to map the response.", t); - return drain(resp).toSingle().flatMap(ignored -> Single.failed(t)); + LOGGER.error("Unexpected exception when mapping response ({}) to an exception. User-defined " + + "functions should not throw.", resp.status(), t); + return drain(resp).concat(Single.failed(t)); } Single response; if (exception == null) { From 74225d2f9127f536693094a697a5daf586b83f65 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Wed, 8 Jan 2025 09:46:13 -0700 Subject: [PATCH 18/18] Clarify javadoc comments --- .../netty/RetryingHttpRequesterFilter.java | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index ccb7d08742..3a82dcb06f 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -857,6 +857,10 @@ public Builder maxTotalRetries(final int maxRetries) { * {@link HttpResponseException} or returns {@code null} if there is no mapping for response meta-data. * In the case that the request cannot be retried, the {@link HttpResponseException} will be returned via the * error pathway. + *

+ * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link Function} doesn't throw exceptions. + * * @return {@code this} */ public Builder responseMapper(final Function mapper) { @@ -869,7 +873,8 @@ public Builder responseMapper(final Function * To disable retries you can return {@link BackOffPolicy#ofNoRetries()} from the {@code mapper}. *

- * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. * * @param mapper The mapper to map the {@link HttpRequestMetaData} and the * {@link RetryableException} to a {@link BackOffPolicy}. @@ -892,7 +897,8 @@ public Builder retryRetryableExceptions( *

* To disable retries you can return {@link BackOffPolicy#ofNoRetries()} from the {@code mapper}. *

- * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. * * @param mapper The mapper to map the {@link HttpRequestMetaData} and the * {@link IOException} to a {@link BackOffPolicy}. @@ -930,7 +936,8 @@ public Builder retryExpectationFailed(boolean retryExpectationFailed) { * To disable retries and proceed evaluating other retry functions you can return, * {@link BackOffPolicy#ofNoRetries()} from the passed {@code mapper}. *

- * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. * * @param mapper The mapper to map the {@link HttpRequestMetaData} and the * {@link DelayedRetryException delayed-exception} to a {@link BackOffPolicy}. @@ -952,7 +959,8 @@ public Builder retryDelayedRetryExceptions( *

* To disable retries you can return {@link BackOffPolicy#NO_RETRIES} from the {@code mapper}. *

- * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. * * @param mapper The mapper to map the {@link HttpRequestMetaData} and the * {@link DelayedRetry delayed-exception} to a {@link BackOffPolicy}. @@ -973,7 +981,8 @@ public Builder retryDelayedRetries(// FIXME: 0.43 - remove deprecated method *

* To disable retries you can return {@link BackOffPolicy#NO_RETRIES} from the {@code mapper}. *

- * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. * * @param mapper The mapper to map the {@link HttpRequestMetaData} and the * {@link DelayedRetry delayed-exception} to a {@link BackOffPolicy}. @@ -991,7 +1000,8 @@ public Builder retryResponses( *

* To disable retries you can return {@link BackOffPolicy#NO_RETRIES} from the {@code mapper}. *

- * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. * * @param mapper The mapper to map the {@link HttpRequestMetaData} and the * {@link DelayedRetry delayed-exception} to a {@link BackOffPolicy}. @@ -1013,7 +1023,8 @@ public Builder retryResponses( *

* To disable retries you can return {@link BackOffPolicy#NO_RETRIES} from the {@code mapper}. *

- * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. * * @param mapper {@link BiFunction} that checks whether a given combination of * {@link HttpRequestMetaData meta-data} and {@link Throwable cause} should be retried, producing a