diff --git a/temporal-sdk/src/main/java/io/temporal/failure/ApplicationFailure.java b/temporal-sdk/src/main/java/io/temporal/failure/ApplicationFailure.java index d06acd3b7..ffd63898c 100644 --- a/temporal-sdk/src/main/java/io/temporal/failure/ApplicationFailure.java +++ b/temporal-sdk/src/main/java/io/temporal/failure/ApplicationFailure.java @@ -24,6 +24,7 @@ import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.EncodedValues; import io.temporal.common.converter.Values; +import java.time.Duration; import java.util.Objects; import javax.annotation.Nullable; @@ -56,6 +57,7 @@ public final class ApplicationFailure extends TemporalFailure { private final String type; private final Values details; private boolean nonRetryable; + private Duration nextRetryDelay; /** * New ApplicationFailure with {@link #isNonRetryable()} flag set to false. @@ -90,7 +92,33 @@ public static ApplicationFailure newFailure(String message, String type, Object. */ public static ApplicationFailure newFailureWithCause( String message, String type, @Nullable Throwable cause, Object... details) { - return new ApplicationFailure(message, type, false, new EncodedValues(details), cause); + return new ApplicationFailure(message, type, false, new EncodedValues(details), cause, null); + } + + /** + * New ApplicationFailure with {@link #isNonRetryable()} flag set to false. + * + *

Note that this exception still may not be retried by the service if its type is included in + * the doNotRetry property of the correspondent retry policy. + * + * @param message optional error message + * @param type optional error type that is used by {@link + * io.temporal.common.RetryOptions.Builder#setDoNotRetry(String...)}. + * @param details optional details about the failure. They are serialized using the same approach + * as arguments and results. + * @param cause failure cause. Each element of the cause chain will be converted to + * ApplicationFailure for network transmission across network if it doesn't extend {@link + * TemporalFailure} + * @param nextRetryDelay delay before the next retry attempt. + */ + public static ApplicationFailure newFailureWithCauseAndDelay( + String message, + String type, + @Nullable Throwable cause, + Duration nextRetryDelay, + Object... details) { + return new ApplicationFailure( + message, type, false, new EncodedValues(details), cause, nextRetryDelay); } /** @@ -125,20 +153,31 @@ public static ApplicationFailure newNonRetryableFailure( */ public static ApplicationFailure newNonRetryableFailureWithCause( String message, String type, @Nullable Throwable cause, Object... details) { - return new ApplicationFailure(message, type, true, new EncodedValues(details), cause); + return new ApplicationFailure(message, type, true, new EncodedValues(details), cause, null); } static ApplicationFailure newFromValues( - String message, String type, boolean nonRetryable, Values details, Throwable cause) { - return new ApplicationFailure(message, type, nonRetryable, details, cause); + String message, + String type, + boolean nonRetryable, + Values details, + Throwable cause, + Duration nextRetryDelay) { + return new ApplicationFailure(message, type, nonRetryable, details, cause, nextRetryDelay); } ApplicationFailure( - String message, String type, boolean nonRetryable, Values details, Throwable cause) { + String message, + String type, + boolean nonRetryable, + Values details, + Throwable cause, + Duration nextRetryDelay) { super(getMessage(message, Objects.requireNonNull(type), nonRetryable), message, cause); this.type = type; this.details = details; this.nonRetryable = nonRetryable; + this.nextRetryDelay = nextRetryDelay; } public String getType() { @@ -149,6 +188,11 @@ public Values getDetails() { return details; } + @Nullable + public Duration getNextRetryDelay() { + return nextRetryDelay; + } + public void setNonRetryable(boolean nonRetryable) { this.nonRetryable = nonRetryable; } @@ -162,6 +206,10 @@ public void setDataConverter(DataConverter converter) { ((EncodedValues) details).setDataConverter(converter); } + public void setNextRetryDelay(Duration nextRetryDelay) { + this.nextRetryDelay = nextRetryDelay; + } + private static String getMessage(String message, String type, boolean nonRetryable) { return (Strings.isNullOrEmpty(message) ? "" : "message='" + message + "', ") + "type='" diff --git a/temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java b/temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java index 64e33a2c2..35b8e3654 100644 --- a/temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java +++ b/temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java @@ -40,6 +40,7 @@ import io.temporal.common.converter.EncodedValues; import io.temporal.common.converter.FailureConverter; import io.temporal.internal.activity.ActivityTaskHandlerImpl; +import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.sync.POJOWorkflowImplementationFactory; import io.temporal.serviceclient.CheckedExceptionWrapper; import java.io.PrintWriter; @@ -106,7 +107,10 @@ private TemporalFailure failureToExceptionImpl(Failure failure, DataConverter da info.getType(), info.getNonRetryable(), new EncodedValues(details, dataConverter), - cause); + cause, + info.hasNextRetryDelay() + ? ProtobufTimeUtils.toJavaDuration(info.getNextRetryDelay()) + : null); } case TIMEOUT_FAILURE_INFO: { @@ -151,7 +155,8 @@ private TemporalFailure failureToExceptionImpl(Failure failure, DataConverter da "ResetWorkflow", false, new EncodedValues(details, dataConverter), - cause); + cause, + null); } case ACTIVITY_FAILURE_INFO: { @@ -231,6 +236,9 @@ private Failure exceptionToFailure(Throwable throwable) { if (details.isPresent()) { info.setDetails(details.get()); } + if (ae.getNextRetryDelay() != null) { + info.setNextRetryDelay(ProtobufTimeUtils.toProtoDuration(ae.getNextRetryDelay())); + } failure.setApplicationFailureInfo(info); } else if (throwable instanceof TimeoutFailure) { TimeoutFailure te = (TimeoutFailure) throwable; diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java index 6b7fcfdea..da55f4e03 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java @@ -50,6 +50,7 @@ import io.temporal.workflow.Functions; import java.time.Duration; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.*; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -160,8 +161,9 @@ private RetryDecision shouldRetry( return new RetryDecision(RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED, null); } + Optional nextRetryDelay = getNextRetryDelay(attemptThrowable); long sleepMillis = retryOptions.calculateSleepTime(currentAttempt); - Duration sleep = Duration.ofMillis(sleepMillis); + Duration sleep = nextRetryDelay.orElse(Duration.ofMillis(sleepMillis)); if (RetryOptionsUtils.isDeadlineReached( executionContext.getScheduleToCloseDeadline(), sleepMillis)) { return new RetryDecision(RetryState.RETRY_STATE_TIMEOUT, null); @@ -807,6 +809,13 @@ private static boolean isNonRetryableApplicationFailure(@Nullable Throwable exec && ((ApplicationFailure) executionThrowable).isNonRetryable(); } + private static Optional getNextRetryDelay(@Nullable Throwable executionThrowable) { + if (executionThrowable instanceof ApplicationFailure) { + return Optional.ofNullable(((ApplicationFailure) executionThrowable).getNextRetryDelay()); + } + return Optional.empty(); + } + private static class RetryDecision { private final @Nullable RetryState retryState; private final @Nullable Duration nextAttemptBackoff; diff --git a/temporal-sdk/src/test/java/io/temporal/activity/ActivityNextRetryDelayTest.java b/temporal-sdk/src/test/java/io/temporal/activity/ActivityNextRetryDelayTest.java new file mode 100644 index 000000000..54acb2be6 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/activity/ActivityNextRetryDelayTest.java @@ -0,0 +1,109 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.activity; + +import static org.junit.Assert.*; +import static org.junit.Assume.assumeFalse; + +import io.temporal.failure.ApplicationFailure; +import io.temporal.testing.internal.SDKTestOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import io.temporal.workflow.shared.TestActivities; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class ActivityNextRetryDelayTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestWorkflowImpl.class) + .setActivityImplementations(new NextRetryDelayActivityImpl()) + .build(); + + @Test + public void activityNextRetryDelay() { + assumeFalse( + "Real Server doesn't support next retry delay yet", SDKTestWorkflowRule.useExternalService); + TestWorkflowReturnDuration workflow = + testWorkflowRule.newWorkflowStub(TestWorkflowReturnDuration.class); + Duration result = workflow.execute(false); + Assert.assertTrue(result.toMillis() > 5000 && result.toMillis() < 7000); + } + + @Test + public void localActivityNextRetryDelay() { + TestWorkflowReturnDuration workflow = + testWorkflowRule.newWorkflowStub(TestWorkflowReturnDuration.class); + Duration result = workflow.execute(true); + Assert.assertTrue(result.toMillis() > 5000 && result.toMillis() < 7000); + } + + @WorkflowInterface + public interface TestWorkflowReturnDuration { + @WorkflowMethod + Duration execute(boolean useLocalActivity); + } + + public static class TestWorkflowImpl implements TestWorkflowReturnDuration { + + private final TestActivities.NoArgsActivity activities = + Workflow.newActivityStub( + TestActivities.NoArgsActivity.class, + SDKTestOptions.newActivityOptions20sScheduleToClose()); + + private final TestActivities.NoArgsActivity localActivities = + Workflow.newLocalActivityStub( + TestActivities.NoArgsActivity.class, + SDKTestOptions.newLocalActivityOptions20sScheduleToClose()); + + @Override + public Duration execute(boolean useLocalActivity) { + long t1 = Workflow.currentTimeMillis(); + if (useLocalActivity) { + localActivities.execute(); + } else { + activities.execute(); + } + long t2 = Workflow.currentTimeMillis(); + return Duration.ofMillis(t2 - t1); + } + } + + public static class NextRetryDelayActivityImpl implements TestActivities.NoArgsActivity { + @Override + public void execute() { + int attempt = Activity.getExecutionContext().getInfo().getAttempt(); + if (attempt < 4) { + throw ApplicationFailure.newFailureWithCauseAndDelay( + "test retry delay failure " + attempt, + "test failure", + null, + Duration.ofSeconds(attempt)); + } + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/client/schedules/ScheduleTest.java b/temporal-sdk/src/test/java/io/temporal/client/schedules/ScheduleTest.java index 8d10e9d15..eed193188 100644 --- a/temporal-sdk/src/test/java/io/temporal/client/schedules/ScheduleTest.java +++ b/temporal-sdk/src/test/java/io/temporal/client/schedules/ScheduleTest.java @@ -251,7 +251,7 @@ public void triggerScheduleNoPolicy() { handle.delete(); } - @Test + @Test(timeout = 30000) public void backfillSchedules() { // assumeTrue("skipping for test server", SDKTestWorkflowRule.useExternalService); Instant now = Instant.now(); diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java index 5e1a2565a..f58482f21 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java @@ -1703,15 +1703,22 @@ private static RetryState attemptActivityRetry( throw new IllegalStateException("RetryPolicy is always present"); } Optional info = failure.map(Failure::getApplicationFailureInfo); + Optional nextRetryDelay = Optional.empty(); + if (info.isPresent()) { if (info.get().getNonRetryable()) { return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE; } + if (info.get().hasNextRetryDelay()) { + nextRetryDelay = + Optional.ofNullable(ProtobufTimeUtils.toJavaDuration(info.get().getNextRetryDelay())); + } } + TestServiceRetryState nextAttempt = data.retryState.getNextAttempt(failure); TestServiceRetryState.BackoffInterval backoffInterval = data.retryState.getBackoffIntervalInSeconds( - info.map(ApplicationFailureInfo::getType), data.store.currentTime()); + info.map(ApplicationFailureInfo::getType), data.store.currentTime(), nextRetryDelay); if (backoffInterval.getRetryState() == RetryState.RETRY_STATE_IN_PROGRESS) { data.nextBackoffInterval = ProtobufTimeUtils.toProtoDuration(backoffInterval.getInterval()); PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask(); diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestServiceRetryState.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestServiceRetryState.java index f239e1825..0ed7a3f7f 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestServiceRetryState.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestServiceRetryState.java @@ -26,7 +26,9 @@ import io.grpc.Status; import io.temporal.api.common.v1.RetryPolicy; import io.temporal.api.enums.v1.RetryState; +import io.temporal.api.failure.v1.ApplicationFailureInfo; import io.temporal.api.failure.v1.Failure; +import io.temporal.internal.common.ProtobufTimeUtils; import java.time.Duration; import java.util.List; import java.util.Optional; @@ -97,7 +99,8 @@ TestServiceRetryState getNextAttempt(Optional failure) { return new TestServiceRetryState(retryPolicy, expirationTime, attempt + 1, failure); } - BackoffInterval getBackoffIntervalInSeconds(Optional errorType, Timestamp currentTime) { + BackoffInterval getBackoffIntervalInSeconds( + Optional errorType, Timestamp currentTime, Optional nextRetryDelay) { RetryPolicy retryPolicy = getRetryPolicy(); // check if error is non-retryable List nonRetryableErrorTypes = retryPolicy.getNonRetryableErrorTypesList(); @@ -119,32 +122,38 @@ BackoffInterval getBackoffIntervalInSeconds(Optional errorType, Timestam // MaximumAttempts is the total attempts, including initial (non-retry) attempt. return new BackoffInterval(RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED); } - long initInterval = Durations.toMillis(retryPolicy.getInitialInterval()); - long nextInterval = - (long) (initInterval * Math.pow(retryPolicy.getBackoffCoefficient(), getAttempt() - 1)); - long maxInterval = Durations.toMillis(retryPolicy.getMaximumInterval()); - if (nextInterval <= 0) { - // math.Pow() could overflow - if (maxInterval > 0) { + + Optional info = lastFailure.map(Failure::getApplicationFailureInfo); + Duration backoffDuration; + if (nextRetryDelay.isPresent()) { + backoffDuration = nextRetryDelay.get(); + } else { + long initInterval = Durations.toMillis(retryPolicy.getInitialInterval()); + long nextInterval = + (long) (initInterval * Math.pow(retryPolicy.getBackoffCoefficient(), getAttempt() - 1)); + long maxInterval = Durations.toMillis(retryPolicy.getMaximumInterval()); + if (nextInterval <= 0) { + // math.Pow() could overflow + if (maxInterval > 0) { + nextInterval = maxInterval; + } + } + if (maxInterval > 0 && nextInterval > maxInterval) { + // cap next interval to MaxInterval nextInterval = maxInterval; - } else { + } else if (nextInterval <= 0) { return new BackoffInterval(RetryState.RETRY_STATE_TIMEOUT); } + backoffDuration = Duration.ofMillis(nextInterval); } - if (maxInterval > 0 && nextInterval > maxInterval) { - // cap next interval to MaxInterval - nextInterval = maxInterval; - } - - long backoffInterval = nextInterval; - Timestamp nextScheduleTime = Timestamps.add(currentTime, Durations.fromMillis(backoffInterval)); + Timestamp nextScheduleTime = + Timestamps.add(currentTime, ProtobufTimeUtils.toProtoDuration(backoffDuration)); if (expirationTime.getNanos() != 0 && Timestamps.compare(nextScheduleTime, expirationTime) > 0) { return new BackoffInterval(RetryState.RETRY_STATE_TIMEOUT); } - - return new BackoffInterval(Duration.ofMillis(backoffInterval)); + return new BackoffInterval(backoffDuration); } static RetryPolicy validateAndOverrideRetryPolicy(RetryPolicy p) { diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index 8bba37b0a..0595d6163 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -1297,7 +1297,8 @@ private void processFailWorkflowExecution( RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE); } else { failureType = Optional.of(failureInfo.getType()); - backoffInterval = rs.getBackoffIntervalInSeconds(failureType, store.currentTime()); + backoffInterval = + rs.getBackoffIntervalInSeconds(failureType, store.currentTime(), Optional.empty()); } } else if (failure.hasTerminatedFailureInfo() || failure.hasCanceledFailureInfo() @@ -1307,7 +1308,8 @@ private void processFailWorkflowExecution( new TestServiceRetryState.BackoffInterval(RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE); } else { // The failure may be retryable. (E.g. ActivityFailure) - backoffInterval = rs.getBackoffIntervalInSeconds(Optional.empty(), store.currentTime()); + backoffInterval = + rs.getBackoffIntervalInSeconds(Optional.empty(), store.currentTime(), Optional.empty()); } if (backoffInterval.getRetryState() == RetryState.RETRY_STATE_IN_PROGRESS) {