diff --git a/zuul-core/src/main/java/com/netflix/zuul/filters/endpoint/ProxyEndpoint.java b/zuul-core/src/main/java/com/netflix/zuul/filters/endpoint/ProxyEndpoint.java index 790a0f6e88..a6f07a5451 100644 --- a/zuul-core/src/main/java/com/netflix/zuul/filters/endpoint/ProxyEndpoint.java +++ b/zuul-core/src/main/java/com/netflix/zuul/filters/endpoint/ProxyEndpoint.java @@ -89,6 +89,10 @@ import io.netty.util.concurrent.Promise; import io.perfmark.PerfMark; import io.perfmark.TaskCloseable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; import java.io.UnsupportedEncodingException; import java.net.InetAddress; import java.net.URLDecoder; @@ -100,9 +104,6 @@ import java.util.Set; import java.util.StringTokenizer; import java.util.concurrent.atomic.AtomicReference; -import javax.annotation.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Not thread safe! New instance of this class is created per HTTP/1.1 request proxied to the origin but NOT for each @@ -672,10 +673,14 @@ private void processErrorFromOrigin(final Throwable ex, final Channel origCh) { origin.onRequestExceptionWithServer(zuulRequest, chosenServer.get(), attemptNum, niwsEx); } - if ((isBelowRetryLimit()) && (isRetryable(err))) { + boolean retryable = isRetryable(err); + if (retryable) { + origin.adjustRetryPolicyIfNeeded(zuulRequest); + } + + if (retryable && isBelowRetryLimit()) { // retry request with different origin passport.add(PassportState.ORIGIN_RETRY_START); - origin.adjustRetryPolicyIfNeeded(zuulRequest); proxyRequestToOrigin(); } else { // Record the exception in context. An error filter should later run which can translate this into an @@ -733,8 +738,8 @@ protected boolean isRetryable(final ErrorType err) { if ((err == OutboundErrorType.RESET_CONNECTION) || (err == OutboundErrorType.CONNECT_ERROR) || (err == OutboundErrorType.READ_TIMEOUT - && IDEMPOTENT_HTTP_METHODS.contains( - zuulRequest.getMethod().toUpperCase()))) { + && IDEMPOTENT_HTTP_METHODS.contains( + zuulRequest.getMethod().toUpperCase()))) { return isRequestReplayable(); } return false; @@ -742,8 +747,8 @@ protected boolean isRetryable(final ErrorType err) { /** * Request is replayable on a different origin IFF - * A) we have not started to send response back to the client AND - * B) we have not lost any of its body chunks + * A) we have not started to send response back to the client AND + * B) we have not lost any of its body chunks */ protected boolean isRequestReplayable() { if (startedSendingResponseToClient) { @@ -901,7 +906,12 @@ protected void handleOriginNonSuccessResponse(final HttpResponse originResponse, // Flag this error with the ExecutionListener. origin.onRequestExceptionWithServer(zuulRequest, chosenServer, attemptNum, new ClientException(niwsErrorType)); - if ((isBelowRetryLimit()) && (isRetryable5xxResponse(zuulRequest, originResponse))) { + boolean retryable5xxResponse = isRetryable5xxResponse(zuulRequest, originResponse); + if (retryable5xxResponse) { + origin.adjustRetryPolicyIfNeeded(zuulRequest); + } + + if (retryable5xxResponse && isBelowRetryLimit()) { logger.debug( "Retrying: status={}, attemptNum={}, maxRetries={}, startedSendingResponseToClient={}, hasCompleteBody={}, method={}", respStatus, @@ -921,7 +931,6 @@ protected void handleOriginNonSuccessResponse(final HttpResponse originResponse, // retry request with different origin passport.add(PassportState.ORIGIN_RETRY_START); - origin.adjustRetryPolicyIfNeeded(zuulRequest); proxyRequestToOrigin(); } else { SessionContext zuulCtx = context; @@ -1035,7 +1044,7 @@ private static HttpRequestMessage massageRequestURI(HttpRequestMessage request) /** * Get the implementing origin. - * + *

* Note: this method gets called in the constructor so if overloading it or any methods called within, you cannot * rely on your own constructor parameters. */ @@ -1099,7 +1108,7 @@ protected String getClientName(SessionContext context) { /** * Inject your own custom VIP based on your own processing - * + *

* Note: this method gets called in the constructor so if overloading it or any methods called within, you cannot * rely on your own constructor parameters. * diff --git a/zuul-core/src/test/java/com/netflix/zuul/filters/endpoint/ProxyEndpointTest.java b/zuul-core/src/test/java/com/netflix/zuul/filters/endpoint/ProxyEndpointTest.java index 3b6940564d..f5638df9e0 100644 --- a/zuul-core/src/test/java/com/netflix/zuul/filters/endpoint/ProxyEndpointTest.java +++ b/zuul-core/src/test/java/com/netflix/zuul/filters/endpoint/ProxyEndpointTest.java @@ -17,42 +17,81 @@ package com.netflix.zuul.filters.endpoint; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import com.netflix.appinfo.InstanceInfo; import com.netflix.spectator.api.Spectator; import com.netflix.zuul.context.CommonContextKeys; import com.netflix.zuul.context.SessionContext; import com.netflix.zuul.discovery.DiscoveryResult; +import com.netflix.zuul.exception.OutboundErrorType; import com.netflix.zuul.message.http.HttpRequestMessage; import com.netflix.zuul.message.http.HttpRequestMessageImpl; +import com.netflix.zuul.message.http.HttpResponseMessage; import com.netflix.zuul.netty.NettyRequestAttemptFactory; +import com.netflix.zuul.netty.connectionpool.PooledConnection; import com.netflix.zuul.netty.server.MethodBinding; +import com.netflix.zuul.netty.timeouts.OriginTimeoutManager; +import com.netflix.zuul.niws.RequestAttempts; import com.netflix.zuul.origins.BasicNettyOriginManager; +import com.netflix.zuul.origins.NettyOrigin; import com.netflix.zuul.passport.CurrentPassport; +import com.netflix.zuul.passport.PassportItem; +import com.netflix.zuul.passport.PassportState; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.local.LocalAddress; +import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.util.concurrent.Promise; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; @ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) class ProxyEndpointTest { - ProxyEndpoint proxyEndpoint; - HttpRequestMessage request; + @Mock + private ChannelHandlerContext chc; + + @Mock + private NettyOrigin nettyOrigin; + + @Mock + private OriginTimeoutManager timeoutManager; + + @Mock + private NettyRequestAttemptFactory attemptFactory; + + private ProxyEndpoint proxyEndpoint; + private SessionContext context; + private HttpRequestMessage request; + private HttpResponse response; + private CurrentPassport passport; @BeforeEach void setup() { - ChannelHandlerContext chc = mock(ChannelHandlerContext.class); - NettyRequestAttemptFactory attemptFactory = mock(NettyRequestAttemptFactory.class); + EmbeddedChannel channel = new EmbeddedChannel(); + doReturn(channel).when(chc).channel(); + context = new SessionContext(); request = new HttpRequestMessageImpl( - new SessionContext(), + context, "HTTP/1.1", "POST", "/some/where", @@ -64,13 +103,34 @@ void setup() { "localhost", new LocalAddress("777"), false); + request.storeInboundRequest(); request.setBody("Hello There".getBytes()); - request.getContext() - .set(CommonContextKeys.ORIGIN_MANAGER, new BasicNettyOriginManager(Spectator.globalRegistry())); - request.getContext().setRouteVIP("some-vip"); - request.getContext().put(CommonContextKeys.PASSPORT, CurrentPassport.create()); - proxyEndpoint = new ProxyEndpoint(request, chc, null, MethodBinding.NO_OP_BINDING, attemptFactory); + BasicNettyOriginManager originManager = new BasicNettyOriginManager(Spectator.globalRegistry()); + + context.set(CommonContextKeys.ORIGIN_MANAGER, originManager); + context.setRouteVIP("some-vip"); + passport = CurrentPassport.create(); + context.put(CommonContextKeys.PASSPORT, passport); + context.put(CommonContextKeys.REQUEST_ATTEMPTS, new RequestAttempts()); + + Promise promise = channel.eventLoop().newPromise(); + doReturn(promise).when(nettyOrigin).connectToOrigin(any(), any(), anyInt(), any(), any(), any()); + + proxyEndpoint = spy(new ProxyEndpoint(request, chc, null, MethodBinding.NO_OP_BINDING, attemptFactory) { + @Override + public NettyOrigin getOrigin(HttpRequestMessage request) { + return nettyOrigin; + } + + @Override + protected OriginTimeoutManager getTimeoutManager(NettyOrigin origin) { + return timeoutManager; + } + }); + + doNothing().when(proxyEndpoint).operationComplete(any()); + doNothing().when(proxyEndpoint).invokeNext((HttpResponseMessage) any()); } @Test @@ -82,18 +142,97 @@ void testRetryWillResetBodyReader() { request.getBodyContents() .forEach((b) -> b.content().readerIndex(b.content().capacity())); - HttpResponse response = mock(HttpResponse.class); - when(response.status()).thenReturn(new HttpResponseStatus(503, "Retry")); + createResponse(HttpResponseStatus.SERVICE_UNAVAILABLE); + + DiscoveryResult discoveryResult = createDiscoveryResult(); + + // when retrying a response, the request body reader should have it's indexes reset + proxyEndpoint.handleOriginNonSuccessResponse(response, discoveryResult); + assertEquals("Hello There", new String(request.getBody())); + } + + @Test + void retryWhenNoAdjustment() { + createResponse(HttpResponseStatus.SERVICE_UNAVAILABLE); + + proxyEndpoint.handleOriginNonSuccessResponse(response, createDiscoveryResult()); + verify(nettyOrigin).adjustRetryPolicyIfNeeded(request); + verify(nettyOrigin).connectToOrigin(any(), any(), anyInt(), any(), any(), any()); + } + + @Test + void testRetryAdjustsLimit() { + createResponse(HttpResponseStatus.SERVICE_UNAVAILABLE); + disableRetriesOnAdjustment(); + + proxyEndpoint.handleOriginNonSuccessResponse(response, createDiscoveryResult()); + validateNoRetry(); + } + + @Test + void noRetryAdjustmentOnNonRetriableStatusCode() { + createResponse(HttpResponseStatus.BAD_REQUEST); + proxyEndpoint.handleOriginNonSuccessResponse(response, createDiscoveryResult()); + verify(nettyOrigin, never()).adjustRetryPolicyIfNeeded(request); + validateNoRetry(); + } + + @Test + public void onErrorFromOriginNoRetryAdjustment() { + doReturn(OutboundErrorType.RESET_CONNECTION).when(attemptFactory).mapNettyToOutboundErrorType(any()); + proxyEndpoint.errorFromOrigin(new RuntimeException()); + + verify(nettyOrigin).adjustRetryPolicyIfNeeded(request); + verify(nettyOrigin).connectToOrigin(any(), any(), anyInt(), any(), any(), any()); + } + + @Test + void onErrorFromOriginWithRetryAdjustment() { + doReturn(OutboundErrorType.RESET_CONNECTION).when(attemptFactory).mapNettyToOutboundErrorType(any()); + disableRetriesOnAdjustment(); + proxyEndpoint.errorFromOrigin(new RuntimeException()); + validateNoRetry(); + } + + @Test + public void onErrorFromOriginNoRetryOnNonRetriableError() { + doReturn(OutboundErrorType.OTHER).when(attemptFactory).mapNettyToOutboundErrorType(any()); + disableRetriesOnAdjustment(); + + proxyEndpoint.errorFromOrigin(new RuntimeException()); + verify(nettyOrigin, never()).adjustRetryPolicyIfNeeded(request); + validateNoRetry(); + } + + private void validateNoRetry() { + verify(nettyOrigin, never()).connectToOrigin(any(), any(), anyInt(), any(), any(), any()); + passport.getHistory().stream() + .map(PassportItem::getState) + .filter(s -> s == PassportState.ORIGIN_RETRY_START) + .findAny() + .ifPresent(s -> fail()); + } + + private void disableRetriesOnAdjustment() { + doAnswer(invocation -> { + doReturn(-1).when(nettyOrigin).getMaxRetriesForRequest(context); + return null; + }) + .when(nettyOrigin) + .adjustRetryPolicyIfNeeded(request); + } + + private static DiscoveryResult createDiscoveryResult() { InstanceInfo instanceInfo = InstanceInfo.Builder.newBuilder() .setAppName("app") .setHostName("localhost") .setPort(443) .build(); - DiscoveryResult discoveryResult = DiscoveryResult.from(instanceInfo, true); + return DiscoveryResult.from(instanceInfo, true); + } - // when retrying a response, the request body reader should have it's indexes reset - proxyEndpoint.handleOriginNonSuccessResponse(response, discoveryResult); - assertEquals("Hello There", new String(request.getBody())); + private void createResponse(HttpResponseStatus status) { + response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, status); } }