Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support the timeout retry policy #70

Merged
merged 1 commit into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ public enum StepInstanceAction {
STEP_INSTANCE_STATUS_TO_ACTION_MAP.put(
StepInstance.Status.PLATFORM_FAILED,
Arrays.asList(StepInstanceAction.STOP, StepInstanceAction.KILL, StepInstanceAction.SKIP));
STEP_INSTANCE_STATUS_TO_ACTION_MAP.put(
StepInstance.Status.TIMEOUT_FAILED,
Arrays.asList(StepInstanceAction.STOP, StepInstanceAction.KILL, StepInstanceAction.SKIP));

STEP_INSTANCE_STATUS_TO_ACTION_MAP.put(
StepInstance.Status.FATALLY_FAILED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ private Constants() {}
/** maximum retry wait limit for platform errors. */
public static final int MAX_PLATFORM_RETRY_LIMIT_SECS = 24 * 3600; // 1 day

/** maximum retry wait limit for timeout errors. */
public static final int MAX_TIMEOUT_RETRY_LIMIT_SECS = 24 * 3600; // 1 days

/** Max timeout limit in milliseconds. */
public static final long MAX_TIME_OUT_LIMIT_IN_MILLIS = TimeUnit.DAYS.toMillis(120); // 120 days

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ private Defaults() {}
/** Defaults for fixed retry delay for user errors. */
private static final long DEFAULT_FIXED_USER_RETRY_BACKOFF_SECS = 60L;

/** Defaults for fixed retry delay for timeout errors. */
private static final long DEFAULT_FIXED_TIMEOUT_RETRY_BACKOFF_SECS = 60L;

/** Defaults for exponential retry exponent for user errors. */
private static final int DEFAULT_ERROR_RETRY_EXPONENT = 2;

Expand All @@ -57,32 +60,47 @@ private Defaults() {}
/** Defaults for exponential max retry limit for platform errors. */
private static final long DEFAULT_PLATFORM_RETRY_LIMIT_SECS = 3600L;

/** Defaults for exponential retry exponent for timeout errors. */
private static final int DEFAULT_TIMEOUT_RETRY_EXPONENT = 2;

/** Defaults for exponential retry base backoff for timeout errors. */
private static final long DEFAULT_BASE_TIMEOUT_RETRY_BACKOFF_SECS = 60L;

/** Defaults for exponential max retry limit for timeout errors. */
private static final long DEFAULT_TIMEOUT_RETRY_LIMIT_SECS = 3600L;

/** Default Exponential backoff. */
public static final RetryPolicy.ExponentialBackoff DEFAULT_EXPONENTIAL_BACK_OFF =
RetryPolicy.ExponentialBackoff.builder()
.errorRetryExponent(DEFAULT_ERROR_RETRY_EXPONENT)
.errorRetryBackoffInSecs(DEFAULT_BASE_ERROR_RETRY_BACKOFF_SECS)
.errorRetryLimitInSecs(DEFAULT_ERROR_RETRY_LIMIT_SECS)
.platformRetryBackoffInSecs(DEFAULT_BASE_PLATFORM_RETRY_BACKOFF_SECS)
.platformRetryExponent(DEFAULT_PLATFORM_RETRY_EXPONENT)
.platformRetryBackoffInSecs(DEFAULT_BASE_PLATFORM_RETRY_BACKOFF_SECS)
.platformRetryLimitInSecs(DEFAULT_PLATFORM_RETRY_LIMIT_SECS)
.timeoutRetryExponent(DEFAULT_TIMEOUT_RETRY_EXPONENT)
.timeoutRetryBackoffInSecs(DEFAULT_BASE_TIMEOUT_RETRY_BACKOFF_SECS)
.timeoutRetryLimitInSecs(DEFAULT_TIMEOUT_RETRY_LIMIT_SECS)
.build();

/** Default Fixed backoff. */
public static final RetryPolicy.FixedBackoff DEFAULT_FIXED_BACK_OFF =
RetryPolicy.FixedBackoff.builder()
.platformRetryBackoffInSecs(DEFAULT_FIXED_PLATFORM_RETRY_BACKOFF_SECS)
.errorRetryBackoffInSecs(DEFAULT_FIXED_USER_RETRY_BACKOFF_SECS)
.platformRetryBackoffInSecs(DEFAULT_FIXED_PLATFORM_RETRY_BACKOFF_SECS)
.timeoutRetryBackoffInSecs(DEFAULT_FIXED_TIMEOUT_RETRY_BACKOFF_SECS)
.build();

private static final long DEFAULT_USER_RETRY_LIMIT = 2L;
private static final long DEFAULT_PLATFORM_RETRY_LIMIT = 10L;
private static final long DEFAULT_TIMEOUT_RETRY_LIMIT = 0L;

/** Default retry policy if unset. */
public static final RetryPolicy DEFAULT_RETRY_POLICY =
RetryPolicy.builder()
.errorRetryLimit(DEFAULT_USER_RETRY_LIMIT)
.platformRetryLimit(DEFAULT_PLATFORM_RETRY_LIMIT)
.timeoutRetryLimit(DEFAULT_TIMEOUT_RETRY_LIMIT)
.backoff(DEFAULT_EXPONENTIAL_BACK_OFF)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder(
value = {"error_retry_limit", "platform_retry_limit", "backoff"},
value = {"error_retry_limit", "platform_retry_limit", "timeout_retry_limit", "backoff"},
alphabetic = true)
@JsonDeserialize(builder = RetryPolicy.RetryPolicyBuilder.class)
@Getter
Expand All @@ -46,6 +46,9 @@ public class RetryPolicy {
@Max(Constants.MAX_RETRY_LIMIT)
private final Long platformRetryLimit;

@Max(Constants.MAX_RETRY_LIMIT)
private final Long timeoutRetryLimit;

/** Backoff strategy. */
private final Backoff backoff;

Expand All @@ -67,7 +70,7 @@ public static BackoffPolicyType create(String type) {
* Merge a given step retry policy with DEFAULT RETRY POLICY.
*
* @param policy retry policy
* @return retry policy
* @return final retry policy
*/
public static RetryPolicy tryMergeWithDefault(RetryPolicy policy) {
RetryPolicy defaultRetryPolicy = Defaults.DEFAULT_RETRY_POLICY;
Expand All @@ -82,6 +85,10 @@ public static RetryPolicy tryMergeWithDefault(RetryPolicy policy) {
if (retryPolicyBuilder.platformRetryLimit == null) {
retryPolicyBuilder.platformRetryLimit = defaultRetryPolicy.platformRetryLimit;
}
// Merge from default.
if (retryPolicyBuilder.timeoutRetryLimit == null) {
retryPolicyBuilder.timeoutRetryLimit = defaultRetryPolicy.timeoutRetryLimit;
}
if (retryPolicyBuilder.backoff == null) {
retryPolicyBuilder.backoff = defaultRetryPolicy.backoff;
} else {
Expand Down Expand Up @@ -112,6 +119,9 @@ public interface Backoff {
/** Get next retry delay for platform errors. */
int getNextRetryDelayForPlatformError(long platformRetries);

/** Get next retry delay for timeout errors. */
int getNextRetryDelayForTimeoutError(long timeoutRetries);

/** Merge with default and get new backoff. */
Backoff mergeWithDefault();
}
Expand All @@ -127,7 +137,10 @@ public interface Backoff {
"error_retry_limit_in_secs",
"platform_retry_backoff_in_secs",
"platform_retry_exponent",
"platform_retry_limit_in_secs"
"platform_retry_limit_in_secs",
"timeout_retry_backoff_in_secs",
"timeout_retry_exponent",
"timeout_retry_limit_in_secs"
},
alphabetic = true)
@JsonDeserialize(builder = ExponentialBackoff.ExponentialBackoffBuilder.class)
Expand Down Expand Up @@ -156,6 +169,17 @@ public static class ExponentialBackoff implements Backoff {
@Max(Constants.MAX_PLATFORM_RETRY_LIMIT_SECS)
private final Long platformRetryLimitInSecs;

/** Base time in seconds to wait between retries for timeout errors. */
@Max(Constants.MAX_TIMEOUT_RETRY_LIMIT_SECS)
private final Long timeoutRetryBackoffInSecs;

/** Base exponent for timeout errors. */
private final Integer timeoutRetryExponent;

/** Max time in seconds to wait between retries for timeout errors. */
@Max(Constants.MAX_TIMEOUT_RETRY_LIMIT_SECS)
private final Long timeoutRetryLimitInSecs;

@Override
public BackoffPolicyType getType() {
return BackoffPolicyType.EXPONENTIAL_BACKOFF;
Expand All @@ -174,6 +198,13 @@ public int getNextRetryDelayForPlatformError(long platformRetries) {
return (int) Math.min(waitVal, platformRetryLimitInSecs);
}

@Override
public int getNextRetryDelayForTimeoutError(long timeoutRetries) {
long waitVal =
(long) (timeoutRetryBackoffInSecs * Math.pow(timeoutRetryExponent, timeoutRetries));
return (int) Math.min(waitVal, timeoutRetryLimitInSecs);
}

@Override
public Backoff mergeWithDefault() {
RetryPolicy.ExponentialBackoff defaultExponentialBackoff =
Expand Down Expand Up @@ -203,6 +234,18 @@ public Backoff mergeWithDefault() {
exponentialBackoffBuilder.platformRetryExponent =
defaultExponentialBackoff.platformRetryExponent;
}
if (exponentialBackoffBuilder.timeoutRetryBackoffInSecs == null) {
exponentialBackoffBuilder.timeoutRetryBackoffInSecs =
defaultExponentialBackoff.timeoutRetryBackoffInSecs;
}
if (exponentialBackoffBuilder.timeoutRetryLimitInSecs == null) {
exponentialBackoffBuilder.timeoutRetryLimitInSecs =
defaultExponentialBackoff.timeoutRetryLimitInSecs;
}
if (exponentialBackoffBuilder.timeoutRetryExponent == null) {
exponentialBackoffBuilder.timeoutRetryExponent =
defaultExponentialBackoff.timeoutRetryExponent;
}
return exponentialBackoffBuilder.build();
}

Expand All @@ -217,7 +260,11 @@ public static final class ExponentialBackoffBuilder {}
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder(
value = {"error_retry_backoff_in_secs", "platform_retry_backoff_in_secs"},
value = {
"error_retry_backoff_in_secs",
"platform_retry_backoff_in_secs",
"timeout_retry_backoff_in_secs"
},
alphabetic = true)
@JsonDeserialize(builder = FixedBackoff.FixedBackoffBuilder.class)
@Getter
Expand All @@ -229,6 +276,9 @@ public static class FixedBackoff implements Backoff {
/** Constant wait between platform retries. */
private final Long platformRetryBackoffInSecs;

/** Constant wait between timeout retries. */
private final Long timeoutRetryBackoffInSecs;

@Override
public BackoffPolicyType getType() {
return BackoffPolicyType.FIXED_BACKOFF;
Expand All @@ -244,6 +294,11 @@ public int getNextRetryDelayForPlatformError(long platformRetries) {
return platformRetryBackoffInSecs.intValue();
}

@Override
public int getNextRetryDelayForTimeoutError(long timeoutRetries) {
return timeoutRetryBackoffInSecs.intValue();
}

@Override
public Backoff mergeWithDefault() {
RetryPolicy.FixedBackoff defaultFixedBackoff = Defaults.DEFAULT_FIXED_BACK_OFF;
Expand All @@ -255,6 +310,10 @@ public Backoff mergeWithDefault() {
fixedBackoffBuilder.platformRetryBackoffInSecs =
defaultFixedBackoff.platformRetryBackoffInSecs;
}
if (fixedBackoffBuilder.timeoutRetryBackoffInSecs == null) {
fixedBackoffBuilder.timeoutRetryBackoffInSecs =
defaultFixedBackoff.timeoutRetryBackoffInSecs;
}
return fixedBackoffBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ public StepDependencies getSignalDependencies() {
"error_retry_limit",
"platform_retries",
"platform_retry_limit",
"timeout_retries",
"timeout_retry_limit",
"manual_retries",
"retryable",
"backoff"
Expand All @@ -162,6 +164,8 @@ public static class StepRetry {
private long errorRetryLimit;
private long platformRetries; // retry count due to platform failure
private long platformRetryLimit;
private long timeoutRetries; // retry count due to timeout failure
private long timeoutRetryLimit;
private long manualRetries; // retry count due to manual api call to restart
private boolean retryable = true; // mark if the step is retryable by the system
private RetryPolicy.Backoff backoff;
Expand All @@ -176,12 +180,19 @@ public boolean hasReachedPlatformRetryLimit() {
return platformRetries >= platformRetryLimit || !retryable;
}

/** check if reaching timeout retry limit. */
public boolean hasReachedTimeoutRetryLimit() {
return timeoutRetries >= timeoutRetryLimit || !retryable;
}

/** increment corresponding retry count based on status. */
public void incrementByStatus(Status status) {
if (status == Status.USER_FAILED) {
errorRetries++;
} else if (status == Status.PLATFORM_FAILED) {
platformRetries++;
} else if (status == Status.TIMEOUT_FAILED) {
timeoutRetries++;
} else if (status.isRestartable()) {
manualRetries++;
} else {
Expand All @@ -193,14 +204,16 @@ public void incrementByStatus(Status status) {
/**
* Get next retry delay based on error and configured retry policy.
*
* @param status status
* @return delay for the next attempt
* @param status the step instance status
* @return the next retry delay in secs.
*/
public int getNextRetryDelay(Status status) {
if (status == Status.USER_FAILED) {
return backoff.getNextRetryDelayForUserError(errorRetries);
} else if (status == Status.PLATFORM_FAILED) {
return backoff.getNextRetryDelayForPlatformError(platformRetries);
} else if (status == Status.TIMEOUT_FAILED) {
return backoff.getNextRetryDelayForTimeoutError(timeoutRetries);
} else {
// Not expected to get retry delay for any other errors.
throw new MaestroInvalidStatusException(
Expand All @@ -214,6 +227,7 @@ public static StepRetry from(@Nullable RetryPolicy policy) {
StepRetry stepRetry = new StepRetry();
stepRetry.errorRetryLimit = retryPolicy.getErrorRetryLimit();
stepRetry.platformRetryLimit = retryPolicy.getPlatformRetryLimit();
stepRetry.timeoutRetryLimit = retryPolicy.getTimeoutRetryLimit();
stepRetry.retryable = true;
stepRetry.backoff = retryPolicy.getBackoff();
return stepRetry;
Expand Down Expand Up @@ -281,7 +295,9 @@ public enum Status {

/** Step is stopped by a user or the workflow, terminal state. */
STOPPED(true, false, false, false),
/** Step is timed out by the system, terminal state. */
/** Step is failed due to execution timeout error, terminal state. */
TIMEOUT_FAILED(true, false, true, true),
/** Step is fatally timed out by the system, terminal state. */
TIMED_OUT(true, false, false, false);

@JsonIgnore private final boolean terminal; // if it is terminal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,11 +542,12 @@ private boolean isStepSatisfied(
return satisfied;
}

/** Check if timed out, which is based on step start time. */
/** Check if the execution is timed out, which is based on step start time. */
private boolean isTimeout(StepRuntimeSummary runtimeSummary) {
if (runtimeSummary.getRuntimeState() != null
&& runtimeSummary.getRuntimeState().getStartTime() != null) {
if (runtimeSummary.getRuntimeState().getStatus() == StepInstance.Status.TIMED_OUT) {
if (runtimeSummary.getRuntimeState().getStatus() == StepInstance.Status.TIMED_OUT
|| runtimeSummary.getRuntimeState().getStatus() == StepInstance.Status.TIMEOUT_FAILED) {
return true;
}
long timeoutInMillis =
Expand Down Expand Up @@ -650,12 +651,7 @@ public boolean execute(Workflow workflow, Task task, WorkflowExecutor executor)
runtimeSummary.getRuntimeState().getStatus().name());

if (isTimeout(runtimeSummary)) {
LOG.info(
"Workflow instance {}'s step {} is timed out.",
workflowSummary.getIdentity(),
runtimeSummary.getIdentity());
terminate(workflowSummary, runtimeSummary, StepInstance.Status.TIMED_OUT);
runtimeSummary.addTimeline(TimelineLogEvent.info("Step instance is timed out."));
handleTimeoutError(workflowSummary, runtimeSummary);
} else {
tryUpdateByAction(workflowSummary, stepDefinition, runtimeSummary);
}
Expand Down Expand Up @@ -685,6 +681,20 @@ public boolean execute(Workflow workflow, Task task, WorkflowExecutor executor)
}
}

private void handleTimeoutError(
WorkflowSummary workflowSummary, StepRuntimeSummary runtimeSummary) {
LOG.info(
"Workflow instance {}'s step {} is timed out.",
workflowSummary.getIdentity(),
runtimeSummary.getIdentity());
if (runtimeSummary.getStepRetry().hasReachedTimeoutRetryLimit()) {
terminate(workflowSummary, runtimeSummary, StepInstance.Status.TIMED_OUT);
runtimeSummary.addTimeline(TimelineLogEvent.info("Step instance is timed out."));
} else {
runtimeSummary.markTerminated(StepInstance.Status.TIMEOUT_FAILED, tracingManager);
}
}

// Figure out if an exception is retryable
private boolean isRetryableError(Exception e) {
if (e.getCause() instanceof SQLException) {
Expand Down Expand Up @@ -837,6 +847,7 @@ private boolean doExecute(
case INTERNALLY_FAILED: // Ignoring failure model as the error happens within Maestro
case USER_FAILED:
case PLATFORM_FAILED:
case TIMEOUT_FAILED:
case STOPPED:
case TIMED_OUT:
doneWithExecute = true;
Expand Down Expand Up @@ -904,7 +915,8 @@ private void terminateAllSteps(Workflow workflow, WorkflowSummary summary, Strin
void updateRetryDelayTimeToTimeline(StepRuntimeSummary runtimeSummary) {
StepInstance.Status status = runtimeSummary.getRuntimeState().getStatus();
if (status == StepInstance.Status.USER_FAILED
|| status == StepInstance.Status.PLATFORM_FAILED) {
|| status == StepInstance.Status.PLATFORM_FAILED
|| status == StepInstance.Status.TIMEOUT_FAILED) {
int nextRetryDelayInSecs =
runtimeSummary
.getStepRetry()
Expand Down Expand Up @@ -1069,6 +1081,7 @@ private void deriveTaskStatus(Task task, StepRuntimeSummary runtimeSummary) {
break;
case USER_FAILED:
case PLATFORM_FAILED:
case TIMEOUT_FAILED:
task.setStatus(Task.Status.FAILED);
task.setStartDelayInSeconds(
runtimeSummary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public final class AggregatedViewHelper {
StepInstance.Status.USER_FAILED, WorkflowInstance.Status.FAILED);
STEP_INSTANCE_STATUS_TO_WORKFLOW_INSTANCE_STATUS.put(
StepInstance.Status.PLATFORM_FAILED, WorkflowInstance.Status.FAILED);
STEP_INSTANCE_STATUS_TO_WORKFLOW_INSTANCE_STATUS.put(
StepInstance.Status.TIMEOUT_FAILED, WorkflowInstance.Status.FAILED);

// STOPPED statuses
STEP_INSTANCE_STATUS_TO_WORKFLOW_INSTANCE_STATUS.put(
Expand Down
Loading