Skip to content

Commit

Permalink
Add support for nextRetryDelay (#2081)
Browse files Browse the repository at this point in the history
Add support for nextRetryDelay
  • Loading branch information
Quinn-With-Two-Ns authored May 29, 2024
1 parent 0d847a6 commit 0d7ae22
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.EncodedValues;
import io.temporal.common.converter.Values;
import java.time.Duration;
import java.util.Objects;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -56,6 +57,7 @@ public final class ApplicationFailure extends TemporalFailure {
private final String type;
private final Values details;
private boolean nonRetryable;
private Duration nextRetryDelay;

/**
* New ApplicationFailure with {@link #isNonRetryable()} flag set to false.
Expand Down Expand Up @@ -90,7 +92,33 @@ public static ApplicationFailure newFailure(String message, String type, Object.
*/
public static ApplicationFailure newFailureWithCause(
String message, String type, @Nullable Throwable cause, Object... details) {
return new ApplicationFailure(message, type, false, new EncodedValues(details), cause);
return new ApplicationFailure(message, type, false, new EncodedValues(details), cause, null);
}

/**
* New ApplicationFailure with {@link #isNonRetryable()} flag set to false.
*
* <p>Note that this exception still may not be retried by the service if its type is included in
* the doNotRetry property of the correspondent retry policy.
*
* @param message optional error message
* @param type optional error type that is used by {@link
* io.temporal.common.RetryOptions.Builder#setDoNotRetry(String...)}.
* @param details optional details about the failure. They are serialized using the same approach
* as arguments and results.
* @param cause failure cause. Each element of the cause chain will be converted to
* ApplicationFailure for network transmission across network if it doesn't extend {@link
* TemporalFailure}
* @param nextRetryDelay delay before the next retry attempt.
*/
public static ApplicationFailure newFailureWithCauseAndDelay(
String message,
String type,
@Nullable Throwable cause,
Duration nextRetryDelay,
Object... details) {
return new ApplicationFailure(
message, type, false, new EncodedValues(details), cause, nextRetryDelay);
}

/**
Expand Down Expand Up @@ -125,20 +153,31 @@ public static ApplicationFailure newNonRetryableFailure(
*/
public static ApplicationFailure newNonRetryableFailureWithCause(
String message, String type, @Nullable Throwable cause, Object... details) {
return new ApplicationFailure(message, type, true, new EncodedValues(details), cause);
return new ApplicationFailure(message, type, true, new EncodedValues(details), cause, null);
}

static ApplicationFailure newFromValues(
String message, String type, boolean nonRetryable, Values details, Throwable cause) {
return new ApplicationFailure(message, type, nonRetryable, details, cause);
String message,
String type,
boolean nonRetryable,
Values details,
Throwable cause,
Duration nextRetryDelay) {
return new ApplicationFailure(message, type, nonRetryable, details, cause, nextRetryDelay);
}

ApplicationFailure(
String message, String type, boolean nonRetryable, Values details, Throwable cause) {
String message,
String type,
boolean nonRetryable,
Values details,
Throwable cause,
Duration nextRetryDelay) {
super(getMessage(message, Objects.requireNonNull(type), nonRetryable), message, cause);
this.type = type;
this.details = details;
this.nonRetryable = nonRetryable;
this.nextRetryDelay = nextRetryDelay;
}

public String getType() {
Expand All @@ -149,6 +188,11 @@ public Values getDetails() {
return details;
}

@Nullable
public Duration getNextRetryDelay() {
return nextRetryDelay;
}

public void setNonRetryable(boolean nonRetryable) {
this.nonRetryable = nonRetryable;
}
Expand All @@ -162,6 +206,10 @@ public void setDataConverter(DataConverter converter) {
((EncodedValues) details).setDataConverter(converter);
}

public void setNextRetryDelay(Duration nextRetryDelay) {
this.nextRetryDelay = nextRetryDelay;
}

private static String getMessage(String message, String type, boolean nonRetryable) {
return (Strings.isNullOrEmpty(message) ? "" : "message='" + message + "', ")
+ "type='"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.temporal.common.converter.EncodedValues;
import io.temporal.common.converter.FailureConverter;
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.sync.POJOWorkflowImplementationFactory;
import io.temporal.serviceclient.CheckedExceptionWrapper;
import java.io.PrintWriter;
Expand Down Expand Up @@ -106,7 +107,10 @@ private TemporalFailure failureToExceptionImpl(Failure failure, DataConverter da
info.getType(),
info.getNonRetryable(),
new EncodedValues(details, dataConverter),
cause);
cause,
info.hasNextRetryDelay()
? ProtobufTimeUtils.toJavaDuration(info.getNextRetryDelay())
: null);
}
case TIMEOUT_FAILURE_INFO:
{
Expand Down Expand Up @@ -151,7 +155,8 @@ private TemporalFailure failureToExceptionImpl(Failure failure, DataConverter da
"ResetWorkflow",
false,
new EncodedValues(details, dataConverter),
cause);
cause,
null);
}
case ACTIVITY_FAILURE_INFO:
{
Expand Down Expand Up @@ -231,6 +236,9 @@ private Failure exceptionToFailure(Throwable throwable) {
if (details.isPresent()) {
info.setDetails(details.get());
}
if (ae.getNextRetryDelay() != null) {
info.setNextRetryDelay(ProtobufTimeUtils.toProtoDuration(ae.getNextRetryDelay()));
}
failure.setApplicationFailureInfo(info);
} else if (throwable instanceof TimeoutFailure) {
TimeoutFailure te = (TimeoutFailure) throwable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.temporal.workflow.Functions;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.*;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -160,8 +161,9 @@ private RetryDecision shouldRetry(
return new RetryDecision(RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED, null);
}

Optional<Duration> nextRetryDelay = getNextRetryDelay(attemptThrowable);
long sleepMillis = retryOptions.calculateSleepTime(currentAttempt);
Duration sleep = Duration.ofMillis(sleepMillis);
Duration sleep = nextRetryDelay.orElse(Duration.ofMillis(sleepMillis));
if (RetryOptionsUtils.isDeadlineReached(
executionContext.getScheduleToCloseDeadline(), sleepMillis)) {
return new RetryDecision(RetryState.RETRY_STATE_TIMEOUT, null);
Expand Down Expand Up @@ -807,6 +809,13 @@ private static boolean isNonRetryableApplicationFailure(@Nullable Throwable exec
&& ((ApplicationFailure) executionThrowable).isNonRetryable();
}

private static Optional<Duration> getNextRetryDelay(@Nullable Throwable executionThrowable) {
if (executionThrowable instanceof ApplicationFailure) {
return Optional.ofNullable(((ApplicationFailure) executionThrowable).getNextRetryDelay());
}
return Optional.empty();
}

private static class RetryDecision {
private final @Nullable RetryState retryState;
private final @Nullable Duration nextAttemptBackoff;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.activity;

import static org.junit.Assert.*;
import static org.junit.Assume.assumeFalse;

import io.temporal.failure.ApplicationFailure;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import io.temporal.workflow.shared.TestActivities;
import java.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class ActivityNextRetryDelayTest {

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestWorkflowImpl.class)
.setActivityImplementations(new NextRetryDelayActivityImpl())
.build();

@Test
public void activityNextRetryDelay() {
assumeFalse(
"Real Server doesn't support next retry delay yet", SDKTestWorkflowRule.useExternalService);
TestWorkflowReturnDuration workflow =
testWorkflowRule.newWorkflowStub(TestWorkflowReturnDuration.class);
Duration result = workflow.execute(false);
Assert.assertTrue(result.toMillis() > 5000 && result.toMillis() < 7000);
}

@Test
public void localActivityNextRetryDelay() {
TestWorkflowReturnDuration workflow =
testWorkflowRule.newWorkflowStub(TestWorkflowReturnDuration.class);
Duration result = workflow.execute(true);
Assert.assertTrue(result.toMillis() > 5000 && result.toMillis() < 7000);
}

@WorkflowInterface
public interface TestWorkflowReturnDuration {
@WorkflowMethod
Duration execute(boolean useLocalActivity);
}

public static class TestWorkflowImpl implements TestWorkflowReturnDuration {

private final TestActivities.NoArgsActivity activities =
Workflow.newActivityStub(
TestActivities.NoArgsActivity.class,
SDKTestOptions.newActivityOptions20sScheduleToClose());

private final TestActivities.NoArgsActivity localActivities =
Workflow.newLocalActivityStub(
TestActivities.NoArgsActivity.class,
SDKTestOptions.newLocalActivityOptions20sScheduleToClose());

@Override
public Duration execute(boolean useLocalActivity) {
long t1 = Workflow.currentTimeMillis();
if (useLocalActivity) {
localActivities.execute();
} else {
activities.execute();
}
long t2 = Workflow.currentTimeMillis();
return Duration.ofMillis(t2 - t1);
}
}

public static class NextRetryDelayActivityImpl implements TestActivities.NoArgsActivity {
@Override
public void execute() {
int attempt = Activity.getExecutionContext().getInfo().getAttempt();
if (attempt < 4) {
throw ApplicationFailure.newFailureWithCauseAndDelay(
"test retry delay failure " + attempt,
"test failure",
null,
Duration.ofSeconds(attempt));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public void triggerScheduleNoPolicy() {
handle.delete();
}

@Test
@Test(timeout = 30000)
public void backfillSchedules() {
// assumeTrue("skipping for test server", SDKTestWorkflowRule.useExternalService);
Instant now = Instant.now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1703,15 +1703,22 @@ private static RetryState attemptActivityRetry(
throw new IllegalStateException("RetryPolicy is always present");
}
Optional<ApplicationFailureInfo> info = failure.map(Failure::getApplicationFailureInfo);
Optional<java.time.Duration> nextRetryDelay = Optional.empty();

if (info.isPresent()) {
if (info.get().getNonRetryable()) {
return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE;
}
if (info.get().hasNextRetryDelay()) {
nextRetryDelay =
Optional.ofNullable(ProtobufTimeUtils.toJavaDuration(info.get().getNextRetryDelay()));
}
}

TestServiceRetryState nextAttempt = data.retryState.getNextAttempt(failure);
TestServiceRetryState.BackoffInterval backoffInterval =
data.retryState.getBackoffIntervalInSeconds(
info.map(ApplicationFailureInfo::getType), data.store.currentTime());
info.map(ApplicationFailureInfo::getType), data.store.currentTime(), nextRetryDelay);
if (backoffInterval.getRetryState() == RetryState.RETRY_STATE_IN_PROGRESS) {
data.nextBackoffInterval = ProtobufTimeUtils.toProtoDuration(backoffInterval.getInterval());
PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask();
Expand Down
Loading

0 comments on commit 0d7ae22

Please sign in to comment.