Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http-netty: let RetryingHttpRequesterFilter return responses on failure #3048

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
515dd29
http-netty: let RetryingHttpRequesterFilter return responses on failure
bryce-anderson Aug 22, 2024
5b7f014
Add a test
bryce-anderson Aug 22, 2024
b08742d
make linters happy
bryce-anderson Aug 22, 2024
4b4b60c
Merge branch 'main' into bl_anderson/RetryingHttpRequesterCanReturnRe…
bryce-anderson Oct 8, 2024
3493dc4
Some of idels feedback
bryce-anderson Oct 8, 2024
e80e98e
Remove the second wrapper
bryce-anderson Oct 8, 2024
6ba45bc
Merge branch 'main' into bl_anderson/RetryingHttpRequesterCanReturnRe…
bryce-anderson Oct 11, 2024
6b45aa8
Merge branch 'main' into bl_anderson/RetryingHttpRequesterCanReturnRe…
bryce-anderson Oct 28, 2024
49d272d
Keep the response body
bryce-anderson Oct 28, 2024
f25458c
Merge branch 'main' into bl_anderson/RetryingHttpRequesterCanReturnRe…
bryce-anderson Oct 31, 2024
8d07708
Make sure we drain the response if the backoff Completable is cancelled
bryce-anderson Oct 31, 2024
3594f3a
Merge branch 'main' into bl_anderson/RetryingHttpRequesterCanReturnRe…
bryce-anderson Dec 2, 2024
0a47202
Feedback
bryce-anderson Dec 2, 2024
ea561aa
Even better log messages
bryce-anderson Dec 2, 2024
a4d023c
Merge remote-tracking branch 'origin/main' into bl_anderson/RetryingH…
bryce-anderson Dec 20, 2024
428a2d1
Michaels feedback
bryce-anderson Dec 20, 2024
f005576
feedback
bryce-anderson Jan 3, 2025
a628a89
Test for leaks
bryce-anderson Jan 3, 2025
cdb48b8
Always bubble up expcetions in lambdas
bryce-anderson Jan 4, 2025
7a51338
Enhance code comment
bryce-anderson Jan 4, 2025
d5b8c3d
log unexpected types at level warn
bryce-anderson Jan 4, 2025
b2f940d
Another better code comment
bryce-anderson Jan 4, 2025
0a7172e
More feedback
bryce-anderson Jan 8, 2025
74225d2
Clarify javadoc comments
bryce-anderson Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.api.ExecutionStrategyInfluencer;
import io.servicetalk.transport.api.RetryableException;
import io.servicetalk.utils.internal.ThrowableUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -89,17 +93,21 @@
*/
public final class RetryingHttpRequesterFilter
implements StreamingHttpClientFilterFactory, ExecutionStrategyInfluencer<HttpExecutionStrategy> {

private static final Logger LOGGER = LoggerFactory.getLogger(RetryingHttpRequesterFilter.class);

static final int DEFAULT_MAX_TOTAL_RETRIES = 4;
private static final RetryingHttpRequesterFilter DISABLE_AUTO_RETRIES =
new RetryingHttpRequesterFilter(true, false, false, 1, null,
new RetryingHttpRequesterFilter(true, false, false, false, 1, null,
(__, ___) -> NO_RETRIES, null);
private static final RetryingHttpRequesterFilter DISABLE_ALL_RETRIES =
new RetryingHttpRequesterFilter(false, true, false, 0, null,
new RetryingHttpRequesterFilter(false, true, false, false, 0, null,
(__, ___) -> NO_RETRIES, null);

private final boolean waitForLb;
private final boolean ignoreSdErrors;
private final boolean mayReplayRequestPayload;
private final boolean returnOriginalResponses;
private final int maxTotalRetries;
@Nullable
private final Function<HttpResponseMetaData, HttpResponseException> responseMapper;
Expand All @@ -109,13 +117,14 @@ public final class RetryingHttpRequesterFilter

RetryingHttpRequesterFilter(
final boolean waitForLb, final boolean ignoreSdErrors, final boolean mayReplayRequestPayload,
final int maxTotalRetries,
final boolean returnOriginalResponses, final int maxTotalRetries,
@Nullable final Function<HttpResponseMetaData, HttpResponseException> responseMapper,
final BiFunction<HttpRequestMetaData, Throwable, BackOffPolicy> retryFor,
idelpivnitskiy marked this conversation as resolved.
Show resolved Hide resolved
@Nullable final RetryCallbacks onRequestRetry) {
this.waitForLb = waitForLb;
this.ignoreSdErrors = ignoreSdErrors;
this.mayReplayRequestPayload = mayReplayRequestPayload;
this.returnOriginalResponses = returnOriginalResponses;
this.maxTotalRetries = maxTotalRetries;
this.responseMapper = responseMapper;
this.retryFor = retryFor;
Expand Down Expand Up @@ -194,24 +203,53 @@ public Completable apply(final int count, final Throwable t) {
sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus), count, t);
}

final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, t);
if (backOffPolicy != NO_RETRIES) {
final int offsetCount = count - lbNotReadyCount;
Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t);
if (t instanceof DelayedRetryException) {
final Duration constant = ((DelayedRetryException) t).delay();
retryWhen = retryWhen.concat(executor.timer(constant));
}
try {
BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, t);
if (backOffPolicy != NO_RETRIES) {
final int offsetCount = count - lbNotReadyCount;
Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t);
if (t instanceof DelayedRetryException) {
final Duration constant = ((DelayedRetryException) t).delay();
retryWhen = retryWhen.concat(executor.timer(constant));
}

return applyRetryCallbacks(retryWhen, count, t);
return applyRetryCallbacks(retryWhen, count, t);
}
} catch (Throwable tt) {
LOGGER.error("Unexpected exception when computing and applying backoff policy for {}({}). " +
"User-defined functions should not throw.",
RetryingHttpRequesterFilter.class.getName(), t.getMessage(), tt);
Completable result = failed(ThrowableUtils.addSuppressed(tt, t));
if (returnOriginalResponses) {
StreamingHttpResponse response = extractStreamingResponse(t);
if (response != null) {
result = drain(response).concat(result);
}
}
return result;
}

return failed(t);
}

Completable applyRetryCallbacks(final Completable completable, final int retryCount, final Throwable t) {
return retryCallbacks == null ? completable :
completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t));
Completable result = (retryCallbacks == null ? completable :
completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t)));
if (returnOriginalResponses) {
final StreamingHttpResponse response = extractStreamingResponse(t);
if (response != null) {
// If we succeed, we need to drain the response body before we continue. The retry completable
// fails we want to surface the original exception and don't worry about draining since the
// response will be returned to the user.
result = result.onErrorMap(backoffError -> ThrowableUtils.addSuppressed(t, backoffError))
bryce-anderson marked this conversation as resolved.
Show resolved Hide resolved
// If we get cancelled we also need to drain the message body as there is no guarantee
// we'll ever receive a completion event, error or success. This is okay to do since
// the subscriber has signaled they're no longer interested in the response.
.beforeCancel(() -> drain(response).subscribe())
.concat(drain(response));
}
}
return result;
}
}

Expand Down Expand Up @@ -257,20 +295,39 @@ protected Single<StreamingHttpResponse> request(final StreamingHttpRequester del

if (responseMapper != null) {
single = single.flatMap(resp -> {
final HttpResponseException exception = responseMapper.apply(resp);
return (exception != null ?
// Drain response payload body before discarding it:
resp.payloadBody().ignoreElements().onErrorComplete()
.concat(Single.<StreamingHttpResponse>failed(exception)) :
Single.succeeded(resp))
.shareContextOnSubscribe();
final HttpResponseException exception;
try {
exception = responseMapper.apply(resp);
} catch (Throwable t) {
LOGGER.error("Unexpected exception when mapping response ({}) to an exception. User-defined " +
"functions should not throw.", resp.status(), t);
return drain(resp).concat(Single.failed(t));
}
Single<StreamingHttpResponse> response;
if (exception == null) {
response = Single.succeeded(resp);
} else {
response = Single.failed(exception);
if (!returnOriginalResponses) {
response = drain(resp).concat(response);
}
}
return response.shareContextOnSubscribe();
});
}

// 1. Metadata is shared across retries
// 2. Publisher state is restored to original state for each retry
// duplicatedRequest isn't used below because retryWhen must be applied outside the defer operator for (2).
return single.retryWhen(retryStrategy(request, executionContext(), true));
single = single.retryWhen(retryStrategy(request, executionContext(), true));
if (returnOriginalResponses) {
single = single.onErrorResume(HttpResponseException.class, t -> {
HttpResponseMetaData metaData = t.metaData();
return (metaData instanceof StreamingHttpResponse ?
Single.succeeded((StreamingHttpResponse) metaData) : Single.failed(t));
});
}
return single;
}
}

Expand Down Expand Up @@ -719,6 +776,7 @@ public static final class Builder {

private int maxTotalRetries = DEFAULT_MAX_TOTAL_RETRIES;
private boolean retryExpectationFailed;
private boolean returnOriginalResponses;

private BiFunction<HttpRequestMetaData, RetryableException, BackOffPolicy>
retryRetryableExceptions = (requestMetaData, e) -> BackOffPolicy.ofImmediateBounded();
Expand Down Expand Up @@ -796,8 +854,13 @@ public Builder maxTotalRetries(final int maxRetries) {
* retry behaviour through {@link #retryResponses(BiFunction)}.
*
* @param mapper a {@link Function} that maps a {@link HttpResponseMetaData} to an
* {@link HttpResponseException} or returns {@code null} if there is no mapping for response meta-data. The
* mapper should return {@code null} if no retry is needed or if it cannot be determined that a retry is needed.
* {@link HttpResponseException} or returns {@code null} if there is no mapping for response meta-data.
* In the case that the request cannot be retried, the {@link HttpResponseException} will be returned via the
* error pathway.
* <p>
* <strong>It's important that this {@link Function} doesn't block to avoid performance impacts.</strong>
* <strong>It's important that this {@link Function} doesn't throw exceptions.</strong>
*
* @return {@code this}
*/
public Builder responseMapper(final Function<HttpResponseMetaData, HttpResponseException> mapper) {
Expand All @@ -810,7 +873,8 @@ public Builder responseMapper(final Function<HttpResponseMetaData, HttpResponseE
* <p>
* To disable retries you can return {@link BackOffPolicy#ofNoRetries()} from the {@code mapper}.
* <p>
* <strong>It's important that this {@link Function} doesn't block to avoid performance impacts.</strong>
* <strong>It's important that this {@link BiFunction} doesn't block to avoid performance impacts.</strong>
* <strong>It's important that this {@link BiFunction} doesn't throw exceptions.</strong>
*
* @param mapper The mapper to map the {@link HttpRequestMetaData} and the
* {@link RetryableException} to a {@link BackOffPolicy}.
Expand All @@ -833,7 +897,8 @@ public Builder retryRetryableExceptions(
* <p>
* To disable retries you can return {@link BackOffPolicy#ofNoRetries()} from the {@code mapper}.
* <p>
* <strong>It's important that this {@link Function} doesn't block to avoid performance impacts.</strong>
* <strong>It's important that this {@link BiFunction} doesn't block to avoid performance impacts.</strong>
* <strong>It's important that this {@link BiFunction} doesn't throw exceptions.</strong>
*
* @param mapper The mapper to map the {@link HttpRequestMetaData} and the
* {@link IOException} to a {@link BackOffPolicy}.
Expand Down Expand Up @@ -871,7 +936,8 @@ public Builder retryExpectationFailed(boolean retryExpectationFailed) {
* To disable retries and proceed evaluating other retry functions you can return,
* {@link BackOffPolicy#ofNoRetries()} from the passed {@code mapper}.
* <p>
* <strong>It's important that this {@link Function} doesn't block to avoid performance impacts.</strong>
* <strong>It's important that this {@link BiFunction} doesn't block to avoid performance impacts.</strong>
* <strong>It's important that this {@link BiFunction} doesn't throw exceptions.</strong>
*
* @param mapper The mapper to map the {@link HttpRequestMetaData} and the
* {@link DelayedRetryException delayed-exception} to a {@link BackOffPolicy}.
Expand All @@ -893,7 +959,8 @@ public Builder retryDelayedRetryExceptions(
* <p>
* To disable retries you can return {@link BackOffPolicy#NO_RETRIES} from the {@code mapper}.
* <p>
* <strong>It's important that this {@link Function} doesn't block to avoid performance impacts.</strong>
* <strong>It's important that this {@link BiFunction} doesn't block to avoid performance impacts.</strong>
* <strong>It's important that this {@link BiFunction} doesn't throw exceptions.</strong>
*
* @param mapper The mapper to map the {@link HttpRequestMetaData} and the
* {@link DelayedRetry delayed-exception} to a {@link BackOffPolicy}.
Expand All @@ -914,15 +981,39 @@ public Builder retryDelayedRetries(// FIXME: 0.43 - remove deprecated method
* <p>
* To disable retries you can return {@link BackOffPolicy#NO_RETRIES} from the {@code mapper}.
* <p>
* <strong>It's important that this {@link Function} doesn't block to avoid performance impacts.</strong>
* <strong>It's important that this {@link BiFunction} doesn't block to avoid performance impacts.</strong>
* <strong>It's important that this {@link BiFunction} doesn't throw exceptions.</strong>
*
* @param mapper The mapper to map the {@link HttpRequestMetaData} and the
* {@link DelayedRetry delayed-exception} to a {@link BackOffPolicy}.
* @return {@code this}.
*/
public Builder retryResponses(
final BiFunction<HttpRequestMetaData, HttpResponseException, BackOffPolicy> mapper) {
return retryResponses(mapper, false);
}

/**
* The retrying-filter will evaluate {@link HttpResponseException} that resulted from the
* {@link #responseMapper(Function)}, and support different retry behaviour according to the
* {@link HttpRequestMetaData request} and the {@link HttpResponseMetaData response}.
* <p>
* To disable retries you can return {@link BackOffPolicy#NO_RETRIES} from the {@code mapper}.
* <p>
* <strong>It's important that this {@link BiFunction} doesn't block to avoid performance impacts.</strong>
* <strong>It's important that this {@link BiFunction} doesn't throw exceptions.</strong>
*
* @param mapper The mapper to map the {@link HttpRequestMetaData} and the
* {@link DelayedRetry delayed-exception} to a {@link BackOffPolicy}.
* @param returnOriginalResponses whether to unwrap the response defined by the {@link HttpResponseException}
* meta-data in the case that the request is not retried.
* @return {@code this}.
*/
public Builder retryResponses(
final BiFunction<HttpRequestMetaData, HttpResponseException, BackOffPolicy> mapper,
final boolean returnOriginalResponses) {
this.retryResponses = requireNonNull(mapper);
this.returnOriginalResponses = returnOriginalResponses;
return this;
}

Expand All @@ -932,7 +1023,8 @@ public Builder retryResponses(
* <p>
* To disable retries you can return {@link BackOffPolicy#NO_RETRIES} from the {@code mapper}.
* <p>
* <strong>It's important that this {@link Function} doesn't block to avoid performance impacts.</strong>
* <strong>It's important that this {@link BiFunction} doesn't block to avoid performance impacts.</strong>
* <strong>It's important that this {@link BiFunction} doesn't throw exceptions.</strong>
*
* @param mapper {@link BiFunction} that checks whether a given combination of
* {@link HttpRequestMetaData meta-data} and {@link Throwable cause} should be retried, producing a
Expand Down Expand Up @@ -1041,7 +1133,7 @@ public RetryingHttpRequesterFilter build() {

if (retryResponses != null && throwable instanceof HttpResponseException) {
final BackOffPolicy backOffPolicy =
retryResponses.apply(requestMetaData, (HttpResponseException) throwable);
retryResponses.apply(requestMetaData, (HttpResponseException) throwable);
if (backOffPolicy != NO_RETRIES) {
return backOffPolicy;
}
Expand All @@ -1054,7 +1146,26 @@ public RetryingHttpRequesterFilter build() {
return NO_RETRIES;
};
return new RetryingHttpRequesterFilter(waitForLb, ignoreSdErrors, mayReplayRequestPayload,
maxTotalRetries, responseMapper, allPredicate, onRequestRetry);
returnOriginalResponses, maxTotalRetries, responseMapper, allPredicate, onRequestRetry);
}
}

private static Completable drain(StreamingHttpResponse response) {
return response.payloadBody().ignoreElements().onErrorComplete();
}

@Nullable
private static StreamingHttpResponse extractStreamingResponse(Throwable t) {
if (t instanceof HttpResponseException) {
HttpResponseException responseException = (HttpResponseException) t;
if (responseException.metaData() instanceof StreamingHttpResponse) {
return (StreamingHttpResponse) responseException.metaData();
} else {
LOGGER.warn("Couldn't unpack response due to unexpected dynamic types. Required " +
"meta-data of type StreamingHttpResponse, found {}",
responseException.metaData().getClass());
}
}
return null;
}
}
Loading
Loading