Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http-netty: let RetryingHttpRequesterFilter return responses on failure #3048

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
515dd29
http-netty: let RetryingHttpRequesterFilter return responses on failure
bryce-anderson Aug 22, 2024
5b7f014
Add a test
bryce-anderson Aug 22, 2024
b08742d
make linters happy
bryce-anderson Aug 22, 2024
4b4b60c
Merge branch 'main' into bl_anderson/RetryingHttpRequesterCanReturnRe…
bryce-anderson Oct 8, 2024
3493dc4
Some of idels feedback
bryce-anderson Oct 8, 2024
e80e98e
Remove the second wrapper
bryce-anderson Oct 8, 2024
6ba45bc
Merge branch 'main' into bl_anderson/RetryingHttpRequesterCanReturnRe…
bryce-anderson Oct 11, 2024
6b45aa8
Merge branch 'main' into bl_anderson/RetryingHttpRequesterCanReturnRe…
bryce-anderson Oct 28, 2024
49d272d
Keep the response body
bryce-anderson Oct 28, 2024
f25458c
Merge branch 'main' into bl_anderson/RetryingHttpRequesterCanReturnRe…
bryce-anderson Oct 31, 2024
8d07708
Make sure we drain the response if the backoff Completable is cancelled
bryce-anderson Oct 31, 2024
3594f3a
Merge branch 'main' into bl_anderson/RetryingHttpRequesterCanReturnRe…
bryce-anderson Dec 2, 2024
0a47202
Feedback
bryce-anderson Dec 2, 2024
ea561aa
Even better log messages
bryce-anderson Dec 2, 2024
a4d023c
Merge remote-tracking branch 'origin/main' into bl_anderson/RetryingH…
bryce-anderson Dec 20, 2024
428a2d1
Michaels feedback
bryce-anderson Dec 20, 2024
f005576
feedback
bryce-anderson Jan 3, 2025
a628a89
Test for leaks
bryce-anderson Jan 3, 2025
cdb48b8
Always bubble up expcetions in lambdas
bryce-anderson Jan 4, 2025
7a51338
Enhance code comment
bryce-anderson Jan 4, 2025
d5b8c3d
log unexpected types at level warn
bryce-anderson Jan 4, 2025
b2f940d
Another better code comment
bryce-anderson Jan 4, 2025
0a7172e
More feedback
bryce-anderson Jan 8, 2025
74225d2
Clarify javadoc comments
bryce-anderson Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.servicetalk.client.api.LoadBalancerReadyEvent;
import io.servicetalk.client.api.NoAvailableHostException;
import io.servicetalk.client.api.ServiceDiscoverer;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.BiIntFunction;
import io.servicetalk.concurrent.api.Completable;
Expand Down Expand Up @@ -91,15 +93,16 @@ public final class RetryingHttpRequesterFilter
implements StreamingHttpClientFilterFactory, ExecutionStrategyInfluencer<HttpExecutionStrategy> {
static final int DEFAULT_MAX_TOTAL_RETRIES = 4;
private static final RetryingHttpRequesterFilter DISABLE_AUTO_RETRIES =
new RetryingHttpRequesterFilter(true, false, false, 1, null,
new RetryingHttpRequesterFilter(true, false, false, false, 1, null,
(__, ___) -> NO_RETRIES, null);
private static final RetryingHttpRequesterFilter DISABLE_ALL_RETRIES =
new RetryingHttpRequesterFilter(false, true, false, 0, null,
new RetryingHttpRequesterFilter(false, true, false, false, 0, null,
(__, ___) -> NO_RETRIES, null);

private final boolean waitForLb;
private final boolean ignoreSdErrors;
private final boolean mayReplayRequestPayload;
private final boolean returnFailedResponses;
private final int maxTotalRetries;
@Nullable
private final Function<HttpResponseMetaData, HttpResponseException> responseMapper;
Expand All @@ -109,13 +112,14 @@ public final class RetryingHttpRequesterFilter

RetryingHttpRequesterFilter(
final boolean waitForLb, final boolean ignoreSdErrors, final boolean mayReplayRequestPayload,
final int maxTotalRetries,
final boolean returnFailedResponses, final int maxTotalRetries,
@Nullable final Function<HttpResponseMetaData, HttpResponseException> responseMapper,
final BiFunction<HttpRequestMetaData, Throwable, BackOffPolicy> retryFor,
idelpivnitskiy marked this conversation as resolved.
Show resolved Hide resolved
@Nullable final RetryCallbacks onRequestRetry) {
this.waitForLb = waitForLb;
this.ignoreSdErrors = ignoreSdErrors;
this.mayReplayRequestPayload = mayReplayRequestPayload;
this.returnFailedResponses = returnFailedResponses;
this.maxTotalRetries = maxTotalRetries;
this.responseMapper = responseMapper;
this.retryFor = retryFor;
Expand Down Expand Up @@ -190,11 +194,13 @@ public Completable apply(final int count, final Throwable t) {
!(lbEvent instanceof LoadBalancerReadyEvent &&
((LoadBalancerReadyEvent) lbEvent).isReady()))
.ignoreElements();
return applyRetryCallbacks(
return applyRetryCallbacksAndDraining(
sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus), count, t);
}

final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, t);
// Unwrap a WrappedResponseException before asking the policy for a policy.
final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData,
t instanceof WrappedResponseException ? ((WrappedResponseException) t).exception : t);
if (backOffPolicy != NO_RETRIES) {
final int offsetCount = count - lbNotReadyCount;
Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t);
Expand All @@ -203,15 +209,46 @@ public Completable apply(final int count, final Throwable t) {
retryWhen = retryWhen.concat(executor.timer(constant));
}

return applyRetryCallbacks(retryWhen, count, t);
return applyRetryCallbacksAndDraining(retryWhen, count, t);
}

return failed(t);
}

Completable applyRetryCallbacks(final Completable completable, final int retryCount, final Throwable t) {
return retryCallbacks == null ? completable :
completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t));
Completable applyRetryCallbacksAndDraining(final Completable completable, final int retryCount,
final Throwable t) {
if (retryCallbacks == null && !(t instanceof WrappedResponseException)) {
// No wrap necessary.
return completable;
}
return completable.liftSync(subscriber -> new CompletableSource.Subscriber() {
@Override
public void onSubscribe(Cancellable cancellable) {
subscriber.onSubscribe(cancellable);
}

@Override
public void onComplete() {
try {
if (retryCallbacks != null) {
retryCallbacks.beforeRetry(retryCount, requestMetaData, t);
}
if (t instanceof WrappedResponseException) {
drainResponse(((WrappedResponseException) t).response).subscribe();
bryce-anderson marked this conversation as resolved.
Show resolved Hide resolved
}
} finally {
subscriber.onComplete();
}
}

@Override
public void onError(Throwable tt) {
// If we're retrying due to a wrapped response it's because the users want the actual response,
// not an exception. Therefore, we return the wrapped response and let it get unwrapped at the
// end of the retry pipeline.
subscriber.onError(t instanceof WrappedResponseException ? t : tt);
idelpivnitskiy marked this conversation as resolved.
Show resolved Hide resolved
}
});
}
}

Expand Down Expand Up @@ -258,19 +295,31 @@ protected Single<StreamingHttpResponse> request(final StreamingHttpRequester del
if (responseMapper != null) {
single = single.flatMap(resp -> {
final HttpResponseException exception = responseMapper.apply(resp);
idelpivnitskiy marked this conversation as resolved.
Show resolved Hide resolved
return (exception != null ?
// Drain response payload body before discarding it:
resp.payloadBody().ignoreElements().onErrorComplete()
.concat(Single.<StreamingHttpResponse>failed(exception)) :
Single.succeeded(resp))
.shareContextOnSubscribe();
if (exception == null) {
return Single.succeeded(resp).shareContextOnSubscribe();
}
if (returnFailedResponses) {
return Single.failed(new WrappedResponseException(resp, exception));
bryce-anderson marked this conversation as resolved.
Show resolved Hide resolved
} else {
return drainResponse(resp).concat(Single.<StreamingHttpResponse>failed(exception));
}
});
}

// 1. Metadata is shared across retries
// 2. Publisher state is restored to original state for each retry
// duplicatedRequest isn't used below because retryWhen must be applied outside the defer operator for (2).
return single.retryWhen(retryStrategy(request, executionContext(), true));
single = single.retryWhen(retryStrategy(request, executionContext(), true));
if (returnFailedResponses) {
single = single.onErrorResume(t -> {
if (t instanceof WrappedResponseException) {
bryce-anderson marked this conversation as resolved.
Show resolved Hide resolved
return Single.succeeded(((WrappedResponseException) t).response);
} else {
return Single.failed(t);
}
});
}
return single;
}
}

Expand Down Expand Up @@ -719,6 +768,7 @@ public static final class Builder {

private int maxTotalRetries = DEFAULT_MAX_TOTAL_RETRIES;
private boolean retryExpectationFailed;
private boolean returnFailedResponses;

private BiFunction<HttpRequestMetaData, RetryableException, BackOffPolicy>
retryRetryableExceptions = (requestMetaData, e) -> BackOffPolicy.ofImmediateBounded();
Expand All @@ -745,6 +795,11 @@ public static final class Builder {
@Nullable
private RetryCallbacks onRequestRetry;

public Builder returnFailedResponses(final boolean returnFailedResponses) {
this.returnFailedResponses = returnFailedResponses;
return this;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm certain this can have a better name and clearly it needs docs before merging. Name suggestions welcome.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think this API is a bit awkward: first you must turn a response into an HttpResponseException and then it's going to be discarded. Alternatively, we could just have a different lambda to the tune of Function<Boolean, HttpResponseMetadata> shouldRetry.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now we don't have RS operators to achieve retries without mapping into exceptions. If we go the route of clean retry of response meta-data without mapping to exceptions, it's possible but will take longer.

Current rational was that some users want to always map responses to exceptions, that's why we have independent responseMapper. Then some users may want to retry that, so there is a 2nd method for them to retryResponses. We decided to put them next to each other on the same builder instead of offering 2 different filters bcz they often used together.

I agree that having a 3rd method that works only if the other 2 also configured is not intuitive. Alternatively, we can consider adding a retryResponses overload that takes a boolean to make a decision if it need to unwrap the original response or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of the boolean overload, which would signal that it needs to be configured "together". Alternatively when building, we should at least check if this value is set to true and others are in their default state to reject the config?


/**
* By default, automatic retries wait for the associated {@link LoadBalancer} to be
* {@link LoadBalancerReadyEvent ready} before triggering a retry for requests. This behavior may add latency to
Expand Down Expand Up @@ -1054,7 +1109,30 @@ public RetryingHttpRequesterFilter build() {
return NO_RETRIES;
};
return new RetryingHttpRequesterFilter(waitForLb, ignoreSdErrors, mayReplayRequestPayload,
maxTotalRetries, responseMapper, allPredicate, onRequestRetry);
returnFailedResponses, maxTotalRetries, responseMapper, allPredicate, onRequestRetry);
}
}

private static Completable drainResponse(StreamingHttpResponse resp) {
return resp.payloadBody().ignoreElements().onErrorComplete();
}

private static final class WrappedResponseException extends Exception {

private static final long serialVersionUID = 3905983622734400759L;

final StreamingHttpResponse response;
final HttpResponseException exception;

WrappedResponseException(final StreamingHttpResponse response, final HttpResponseException exception) {
this.response = response;
this.exception = exception;
}

@Override
public synchronized Throwable fillInStackTrace() {
// just a carrier, the stack traces are not important.
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -251,21 +251,31 @@ private void assertRequestRetryingPred(final BlockingHttpClient client) {
assertThat("Unexpected calls to select.", (double) lbSelectInvoked.get(), closeTo(5.0, 1.0));
}

@Test
void testResponseMapper() {
@ParameterizedTest
bryce-anderson marked this conversation as resolved.
Show resolved Hide resolved
@ValueSource(booleans = {true, false})
void testResponseMapper(final boolean returnFailedResponses) throws Exception {
AtomicInteger newConnectionCreated = new AtomicInteger();
AtomicInteger responseDrained = new AtomicInteger();
AtomicInteger onRequestRetryCounter = new AtomicInteger();
final int maxTotalRetries = 4;
final String retryMessage = "Retryable header";
normalClient = normalClientBuilder
.appendClientFilter(new Builder()
.returnFailedResponses(returnFailedResponses)
.maxTotalRetries(maxTotalRetries)
.responseMapper(metaData -> metaData.headers().contains(RETRYABLE_HEADER) ?
new HttpResponseException("Retryable header", metaData) : null)
new HttpResponseException(retryMessage, metaData) : null)
// Disable request retrying
.retryRetryableExceptions((requestMetaData, e) -> ofNoRetries())
// Retry only responses marked so
.retryResponses((requestMetaData, throwable) -> ofImmediate(maxTotalRetries - 1))
.retryResponses((requestMetaData, throwable) -> {
if (throwable instanceof HttpResponseException &&
bryce-anderson marked this conversation as resolved.
Show resolved Hide resolved
retryMessage.equals(throwable.getMessage())) {
return ofImmediate(maxTotalRetries - 1);
} else {
throw new RuntimeException("Unexpected exception");
}
})
.onRequestRetry((count, req, t) ->
assertThat(onRequestRetryCounter.incrementAndGet(), is(count)))
.build())
Expand All @@ -281,9 +291,14 @@ public Single<StreamingHttpResponse> request(final StreamingHttpRequest request)
};
})
.buildBlocking();
HttpResponseException e = assertThrows(HttpResponseException.class,
() -> normalClient.request(normalClient.get("/")));
assertThat("Unexpected exception.", e, instanceOf(HttpResponseException.class));
if (returnFailedResponses) {
HttpResponse response = normalClient.request(normalClient.get("/"));
assertThat(response.status(), is(HttpResponseStatus.OK));
idelpivnitskiy marked this conversation as resolved.
Show resolved Hide resolved
} else {
HttpResponseException e = assertThrows(HttpResponseException.class,
() -> normalClient.request(normalClient.get("/")));
assertThat("Unexpected exception.", e, instanceOf(HttpResponseException.class));
}
// The load balancer is allowed to be not ready one time, which is counted against total retry attempts but not
// against actual requests being issued.
assertThat("Unexpected calls to select.", lbSelectInvoked.get(), allOf(greaterThanOrEqualTo(maxTotalRetries),
Expand Down
Loading