Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust retry policy before checking isBelowRetryLimit #1775

Merged
merged 1 commit into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@
import io.netty.util.concurrent.Promise;
import io.perfmark.PerfMark;
import io.perfmark.TaskCloseable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.URLDecoder;
Expand All @@ -100,9 +104,6 @@
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Not thread safe! New instance of this class is created per HTTP/1.1 request proxied to the origin but NOT for each
Expand Down Expand Up @@ -672,10 +673,14 @@ private void processErrorFromOrigin(final Throwable ex, final Channel origCh) {
origin.onRequestExceptionWithServer(zuulRequest, chosenServer.get(), attemptNum, niwsEx);
}

if ((isBelowRetryLimit()) && (isRetryable(err))) {
boolean retryable = isRetryable(err);
if (retryable) {
origin.adjustRetryPolicyIfNeeded(zuulRequest);
}

if (retryable && isBelowRetryLimit()) {
// retry request with different origin
passport.add(PassportState.ORIGIN_RETRY_START);
origin.adjustRetryPolicyIfNeeded(zuulRequest);
proxyRequestToOrigin();
} else {
// Record the exception in context. An error filter should later run which can translate this into an
Expand Down Expand Up @@ -733,17 +738,17 @@ protected boolean isRetryable(final ErrorType err) {
if ((err == OutboundErrorType.RESET_CONNECTION)
|| (err == OutboundErrorType.CONNECT_ERROR)
|| (err == OutboundErrorType.READ_TIMEOUT
&& IDEMPOTENT_HTTP_METHODS.contains(
zuulRequest.getMethod().toUpperCase()))) {
&& IDEMPOTENT_HTTP_METHODS.contains(
zuulRequest.getMethod().toUpperCase()))) {
return isRequestReplayable();
}
return false;
}

/**
* Request is replayable on a different origin IFF
* A) we have not started to send response back to the client AND
* B) we have not lost any of its body chunks
* A) we have not started to send response back to the client AND
* B) we have not lost any of its body chunks
*/
protected boolean isRequestReplayable() {
if (startedSendingResponseToClient) {
Expand Down Expand Up @@ -901,7 +906,12 @@ protected void handleOriginNonSuccessResponse(final HttpResponse originResponse,
// Flag this error with the ExecutionListener.
origin.onRequestExceptionWithServer(zuulRequest, chosenServer, attemptNum, new ClientException(niwsErrorType));

if ((isBelowRetryLimit()) && (isRetryable5xxResponse(zuulRequest, originResponse))) {
boolean retryable5xxResponse = isRetryable5xxResponse(zuulRequest, originResponse);
if (retryable5xxResponse) {
origin.adjustRetryPolicyIfNeeded(zuulRequest);
}

if (retryable5xxResponse && isBelowRetryLimit()) {
logger.debug(
"Retrying: status={}, attemptNum={}, maxRetries={}, startedSendingResponseToClient={}, hasCompleteBody={}, method={}",
respStatus,
Expand All @@ -921,7 +931,6 @@ protected void handleOriginNonSuccessResponse(final HttpResponse originResponse,

// retry request with different origin
passport.add(PassportState.ORIGIN_RETRY_START);
origin.adjustRetryPolicyIfNeeded(zuulRequest);
proxyRequestToOrigin();
} else {
SessionContext zuulCtx = context;
Expand Down Expand Up @@ -1035,7 +1044,7 @@ private static HttpRequestMessage massageRequestURI(HttpRequestMessage request)

/**
* Get the implementing origin.
*
* <p>
* Note: this method gets called in the constructor so if overloading it or any methods called within, you cannot
* rely on your own constructor parameters.
*/
Expand Down Expand Up @@ -1099,7 +1108,7 @@ protected String getClientName(SessionContext context) {

/**
* Inject your own custom VIP based on your own processing
*
* <p>
* Note: this method gets called in the constructor so if overloading it or any methods called within, you cannot
* rely on your own constructor parameters.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,81 @@
package com.netflix.zuul.filters.endpoint;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import com.netflix.appinfo.InstanceInfo;
import com.netflix.spectator.api.Spectator;
import com.netflix.zuul.context.CommonContextKeys;
import com.netflix.zuul.context.SessionContext;
import com.netflix.zuul.discovery.DiscoveryResult;
import com.netflix.zuul.exception.OutboundErrorType;
import com.netflix.zuul.message.http.HttpRequestMessage;
import com.netflix.zuul.message.http.HttpRequestMessageImpl;
import com.netflix.zuul.message.http.HttpResponseMessage;
import com.netflix.zuul.netty.NettyRequestAttemptFactory;
import com.netflix.zuul.netty.connectionpool.PooledConnection;
import com.netflix.zuul.netty.server.MethodBinding;
import com.netflix.zuul.netty.timeouts.OriginTimeoutManager;
import com.netflix.zuul.niws.RequestAttempts;
import com.netflix.zuul.origins.BasicNettyOriginManager;
import com.netflix.zuul.origins.NettyOrigin;
import com.netflix.zuul.passport.CurrentPassport;
import com.netflix.zuul.passport.PassportItem;
import com.netflix.zuul.passport.PassportState;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.local.LocalAddress;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.concurrent.Promise;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class ProxyEndpointTest {

ProxyEndpoint proxyEndpoint;
HttpRequestMessage request;
@Mock
private ChannelHandlerContext chc;

@Mock
private NettyOrigin nettyOrigin;

@Mock
private OriginTimeoutManager timeoutManager;

@Mock
private NettyRequestAttemptFactory attemptFactory;

private ProxyEndpoint proxyEndpoint;
private SessionContext context;
private HttpRequestMessage request;
private HttpResponse response;
private CurrentPassport passport;

@BeforeEach
void setup() {
ChannelHandlerContext chc = mock(ChannelHandlerContext.class);
NettyRequestAttemptFactory attemptFactory = mock(NettyRequestAttemptFactory.class);
EmbeddedChannel channel = new EmbeddedChannel();
doReturn(channel).when(chc).channel();

context = new SessionContext();
request = new HttpRequestMessageImpl(
new SessionContext(),
context,
"HTTP/1.1",
"POST",
"/some/where",
Expand All @@ -64,13 +103,34 @@ void setup() {
"localhost",
new LocalAddress("777"),
false);
request.storeInboundRequest();

request.setBody("Hello There".getBytes());
request.getContext()
.set(CommonContextKeys.ORIGIN_MANAGER, new BasicNettyOriginManager(Spectator.globalRegistry()));
request.getContext().setRouteVIP("some-vip");
request.getContext().put(CommonContextKeys.PASSPORT, CurrentPassport.create());
proxyEndpoint = new ProxyEndpoint(request, chc, null, MethodBinding.NO_OP_BINDING, attemptFactory);
BasicNettyOriginManager originManager = new BasicNettyOriginManager(Spectator.globalRegistry());

context.set(CommonContextKeys.ORIGIN_MANAGER, originManager);
context.setRouteVIP("some-vip");
passport = CurrentPassport.create();
context.put(CommonContextKeys.PASSPORT, passport);
context.put(CommonContextKeys.REQUEST_ATTEMPTS, new RequestAttempts());

Promise<PooledConnection> promise = channel.eventLoop().newPromise();
doReturn(promise).when(nettyOrigin).connectToOrigin(any(), any(), anyInt(), any(), any(), any());

proxyEndpoint = spy(new ProxyEndpoint(request, chc, null, MethodBinding.NO_OP_BINDING, attemptFactory) {
@Override
public NettyOrigin getOrigin(HttpRequestMessage request) {
return nettyOrigin;
}

@Override
protected OriginTimeoutManager getTimeoutManager(NettyOrigin origin) {
return timeoutManager;
}
});

doNothing().when(proxyEndpoint).operationComplete(any());
doNothing().when(proxyEndpoint).invokeNext((HttpResponseMessage) any());
}

@Test
Expand All @@ -82,18 +142,97 @@ void testRetryWillResetBodyReader() {
request.getBodyContents()
.forEach((b) -> b.content().readerIndex(b.content().capacity()));

HttpResponse response = mock(HttpResponse.class);
when(response.status()).thenReturn(new HttpResponseStatus(503, "Retry"));
createResponse(HttpResponseStatus.SERVICE_UNAVAILABLE);

DiscoveryResult discoveryResult = createDiscoveryResult();

// when retrying a response, the request body reader should have it's indexes reset
proxyEndpoint.handleOriginNonSuccessResponse(response, discoveryResult);
assertEquals("Hello There", new String(request.getBody()));
}

@Test
void retryWhenNoAdjustment() {
createResponse(HttpResponseStatus.SERVICE_UNAVAILABLE);

proxyEndpoint.handleOriginNonSuccessResponse(response, createDiscoveryResult());
verify(nettyOrigin).adjustRetryPolicyIfNeeded(request);
verify(nettyOrigin).connectToOrigin(any(), any(), anyInt(), any(), any(), any());
}

@Test
void testRetryAdjustsLimit() {
createResponse(HttpResponseStatus.SERVICE_UNAVAILABLE);
disableRetriesOnAdjustment();

proxyEndpoint.handleOriginNonSuccessResponse(response, createDiscoveryResult());
validateNoRetry();
}

@Test
void noRetryAdjustmentOnNonRetriableStatusCode() {
createResponse(HttpResponseStatus.BAD_REQUEST);
proxyEndpoint.handleOriginNonSuccessResponse(response, createDiscoveryResult());
verify(nettyOrigin, never()).adjustRetryPolicyIfNeeded(request);
validateNoRetry();
}

@Test
public void onErrorFromOriginNoRetryAdjustment() {
doReturn(OutboundErrorType.RESET_CONNECTION).when(attemptFactory).mapNettyToOutboundErrorType(any());
proxyEndpoint.errorFromOrigin(new RuntimeException());

verify(nettyOrigin).adjustRetryPolicyIfNeeded(request);
verify(nettyOrigin).connectToOrigin(any(), any(), anyInt(), any(), any(), any());
}

@Test
void onErrorFromOriginWithRetryAdjustment() {
doReturn(OutboundErrorType.RESET_CONNECTION).when(attemptFactory).mapNettyToOutboundErrorType(any());
disableRetriesOnAdjustment();

proxyEndpoint.errorFromOrigin(new RuntimeException());
validateNoRetry();
}

@Test
public void onErrorFromOriginNoRetryOnNonRetriableError() {
doReturn(OutboundErrorType.OTHER).when(attemptFactory).mapNettyToOutboundErrorType(any());
disableRetriesOnAdjustment();

proxyEndpoint.errorFromOrigin(new RuntimeException());
verify(nettyOrigin, never()).adjustRetryPolicyIfNeeded(request);
validateNoRetry();
}

private void validateNoRetry() {
verify(nettyOrigin, never()).connectToOrigin(any(), any(), anyInt(), any(), any(), any());
passport.getHistory().stream()
.map(PassportItem::getState)
.filter(s -> s == PassportState.ORIGIN_RETRY_START)
.findAny()
.ifPresent(s -> fail());
}

private void disableRetriesOnAdjustment() {
doAnswer(invocation -> {
doReturn(-1).when(nettyOrigin).getMaxRetriesForRequest(context);
return null;
})
.when(nettyOrigin)
.adjustRetryPolicyIfNeeded(request);
}

private static DiscoveryResult createDiscoveryResult() {
InstanceInfo instanceInfo = InstanceInfo.Builder.newBuilder()
.setAppName("app")
.setHostName("localhost")
.setPort(443)
.build();
DiscoveryResult discoveryResult = DiscoveryResult.from(instanceInfo, true);
return DiscoveryResult.from(instanceInfo, true);
}

// when retrying a response, the request body reader should have it's indexes reset
proxyEndpoint.handleOriginNonSuccessResponse(response, discoveryResult);
assertEquals("Hello There", new String(request.getBody()));
private void createResponse(HttpResponseStatus status) {
response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, status);
}
}