Skip to content

Commit

Permalink
Update Java SDK retry options for poll operations to match Core SDK. (#…
Browse files Browse the repository at this point in the history
…1989)

= What was changed

* Update default initial interval for non-poll retries to 100ms (from 50ms)
* Update default maximum interval for non-poll retries to 5s (was 6s)
* Update default backoff factor for non-poll retries to 1.7 (was 2.0)
* Update default jitter factor for non-poll retries to 20% (was 10%)
* Update default initial interval for poll retries to 200ms (was 50ms)
* Update default maximum interval for poll retries to 10s (was 60s)
* Update default backoff factor for poll retries to 2.0 (was 1.2)
* Update default jitter factor for poll retries to 20% (was 10%)
* Code cleanups in argument validation code
* Default maximum interval is now computed from initial interval as documented (was hardcoded at 6s)

= Why?

These changes bring the Java SDK's RPC retry options in line with those of the Core SDK, as the latter have been more carefully reasoned through... with one exception.

The Java poller's existing backoff factor of 1.2 was wildly inappropriate, causing the first 10 retries to hammer the server within less than 1.5 seconds. Even more confusingly, Java's maximum interval was far, far too long, as a 60s delay between the server becoming available and the worker noticing that fact is unreasonable and unnecessary. The poller now uses a 2.0 backoff and 10s cap, exactly in line with Core SDK.

Regarding Java's non-poll RPCs, OTOH, 2.0 is not as aggressive as Core SDK's choice of 1.5, and yet Core is actually in the wrong here: all 10 permitted retries can complete within the 10-second window expected for namespace migrations in Temporal Cloud. 1.7 seems to be the ideal number here, as it leads to a cumulative delay window of 11.8 to 17.8 seconds. I've used 1.7 here, and Core SDK should be updated to use it as well.
  • Loading branch information
chronos-tachyon authored Feb 23, 2024
1 parent 78e37a6 commit 2b05f07
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@

package io.temporal.serviceclient;

import static io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions.*;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessageV3;
import io.grpc.Status;
import io.temporal.internal.common.OptionsUtils;
import io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -119,20 +120,16 @@ private Builder(RpcRetryOptions options) {

/**
* Interval of the first retry, on regular failures. If coefficient is 1.0 then it is used for
* all retries. Defaults to 50ms.
* all retries. Defaults to 100ms.
*
* @param initialInterval Interval to wait on first retry. Default will be used if set to {@code
* null}.
*/
public Builder setInitialInterval(Duration initialInterval) {
if (initialInterval != null) {
if (initialInterval.isNegative() || initialInterval.isZero()) {
throw new IllegalArgumentException("Invalid interval: " + initialInterval);
}
this.initialInterval = initialInterval;
} else {
this.initialInterval = null;
if (isInvalidDuration(initialInterval)) {
throw new IllegalArgumentException("invalid interval: " + initialInterval);
}
this.initialInterval = initialInterval;
return this;
}

Expand All @@ -144,14 +141,10 @@ public Builder setInitialInterval(Duration initialInterval) {
* Defaults to 1000ms, which is used if set to {@code null}.
*/
public Builder setCongestionInitialInterval(Duration congestionInitialInterval) {
if (initialInterval != null) {
if (congestionInitialInterval.isNegative() || congestionInitialInterval.isZero()) {
throw new IllegalArgumentException("Invalid interval: " + congestionInitialInterval);
}
this.congestionInitialInterval = congestionInitialInterval;
} else {
this.congestionInitialInterval = null;
if (isInvalidDuration(congestionInitialInterval)) {
throw new IllegalArgumentException("invalid interval: " + congestionInitialInterval);
}
this.congestionInitialInterval = congestionInitialInterval;
return this;
}

Expand All @@ -165,33 +158,26 @@ public Builder setCongestionInitialInterval(Duration congestionInitialInterval)
* null}.
*/
public Builder setExpiration(Duration expiration) {
if (expiration != null) {
if (expiration.isNegative() || expiration.isZero()) {
throw new IllegalArgumentException("Invalid interval: " + expiration);
}
this.expiration = expiration;
} else {
this.expiration = null;
if (isInvalidDuration(expiration)) {
throw new IllegalArgumentException("invalid interval: " + expiration);
}
this.expiration = expiration;
return this;
}

/**
* Coefficient used to calculate the next retry interval. The next retry interval is previous
* interval multiplied by this coefficient. Must be 1 or larger. Default is 2.0.
* interval multiplied by this coefficient. Must be 1 or larger. Default is 1.5.
*
* @param backoffCoefficient Coefficient used to calculate the next retry interval. Defaults to
* 2.0, which is used if set to {@code 0}.
*/
public Builder setBackoffCoefficient(double backoffCoefficient) {
if (backoffCoefficient != 0.0) {
if (!Double.isFinite(backoffCoefficient) || (backoffCoefficient < 1.0)) {
throw new IllegalArgumentException("coefficient has to be >= 1.0: " + backoffCoefficient);
}
this.backoffCoefficient = backoffCoefficient;
} else {
this.backoffCoefficient = 0.0;
if (isInvalidBackoffCoefficient(backoffCoefficient)) {
throw new IllegalArgumentException(
"coefficient must be >= 1.0 and finite: " + backoffCoefficient);
}
this.backoffCoefficient = backoffCoefficient;
return this;
}

Expand All @@ -206,7 +192,7 @@ public Builder setBackoffCoefficient(double backoffCoefficient) {
* set to {@code 0}.
*/
public Builder setMaximumAttempts(int maximumAttempts) {
if (maximumAttempts < 0) {
if (isInvalidMaxAttempts(maximumAttempts)) {
throw new IllegalArgumentException("Invalid maximumAttempts: " + maximumAttempts);
}
this.maximumAttempts = maximumAttempts;
Expand All @@ -216,41 +202,31 @@ public Builder setMaximumAttempts(int maximumAttempts) {
/**
* Maximum interval between retries. Exponential backoff leads to interval increase. This value
* is the cap of the increase. <br>
* Default is 100x of initial interval. Can't be less than {@link #setInitialInterval(Duration)}
* Default is 50x of initial interval. Can't be less than {@link #setInitialInterval(Duration)}
*
* @param maximumInterval the maximum interval value. Defaults to 100x initial interval, which
* is used if set to {@code null}.
* @param maximumInterval the maximum interval value. Defaults to 50x initial interval, which is
* used if set to {@code null}.
*/
public Builder setMaximumInterval(Duration maximumInterval) {
if (maximumInterval != null) {
if ((maximumInterval.isNegative() || maximumInterval.isZero())) {
throw new IllegalArgumentException("Invalid interval: " + maximumInterval);
}
this.maximumInterval = maximumInterval;
} else {
this.maximumInterval = null;
if (isInvalidDuration(maximumInterval)) {
throw new IllegalArgumentException("invalid interval: " + maximumInterval);
}
this.maximumInterval = maximumInterval;
return this;
}

/**
* Maximum amount of jitter to apply. 0.2 means that actual retry time can be +/- 20% of the
* calculated time. Set to 0 to disable jitter. Must be lower than 1. Default is 0.1.
* calculated time. Set to 0 to disable jitter. Must be lower than 1. Default is 0.2.
*
* @param maximumJitterCoefficient Maximum amount of jitter. Default will be used if set to -1.
*/
public Builder setMaximumJitterCoefficient(double maximumJitterCoefficient) {
if (maximumJitterCoefficient != -1.0) {
if (!Double.isFinite(maximumJitterCoefficient)
|| maximumJitterCoefficient < 0
|| maximumJitterCoefficient >= 1.0) {
throw new IllegalArgumentException(
"maximumJitterCoefficient has to be >= 0 and < 1.0: " + maximumJitterCoefficient);
}
this.maximumJitterCoefficient = maximumJitterCoefficient;
} else {
this.maximumJitterCoefficient = -1.0;
if (isInvalidJitterCoefficient(maximumJitterCoefficient)) {
throw new IllegalArgumentException(
"coefficient must be >= 0 and < 1.0: " + maximumJitterCoefficient);
}
this.maximumJitterCoefficient = maximumJitterCoefficient;
return this;
}

Expand Down Expand Up @@ -334,7 +310,7 @@ private List<DoNotRetryItem> merge(List<DoNotRetryItem> o1, List<DoNotRetryItem>
if (o2 != null) {
return new ArrayList<>(o2);
}
if (o1.size() > 0) {
if (o1 != null && !o1.isEmpty()) {
return new ArrayList<>(o1);
}
return null;
Expand Down Expand Up @@ -364,51 +340,63 @@ public RpcRetryOptions buildWithDefaultsFrom(RpcRetryOptions rpcRetryOptions) {

/** Validates property values and builds RetryOptions with default values. */
public RpcRetryOptions validateBuildWithDefaults() {
double backoff = backoffCoefficient;
if (backoff == 0d) {
backoff = DefaultStubServiceOperationRpcRetryOptions.BACKOFF;
double backoffCoefficient = this.backoffCoefficient;
if (backoffCoefficient < 1) {
backoffCoefficient = BACKOFF;
}
if (initialInterval == null || initialInterval.isZero() || initialInterval.isNegative()) {
initialInterval = DefaultStubServiceOperationRpcRetryOptions.INITIAL_INTERVAL;
Duration initialInterval = this.initialInterval;
if (initialInterval == null) {
initialInterval = INITIAL_INTERVAL;
}
if (congestionInitialInterval == null
|| congestionInitialInterval.isZero()
|| congestionInitialInterval.isNegative()) {
congestionInitialInterval =
DefaultStubServiceOperationRpcRetryOptions.CONGESTION_INITIAL_INTERVAL;
Duration congestionInitialInterval = this.congestionInitialInterval;
if (congestionInitialInterval == null) {
congestionInitialInterval = CONGESTION_INITIAL_INTERVAL;
}
if (expiration == null || expiration.isZero() || expiration.isNegative()) {
expiration = DefaultStubServiceOperationRpcRetryOptions.EXPIRATION_INTERVAL;
Duration expiration = this.expiration;
if (expiration == null) {
expiration = EXPIRATION_INTERVAL;
}

Duration maxInterval = this.maximumInterval;

if (maxInterval == null || maxInterval.isZero() || maxInterval.isNegative()) {
if (maximumAttempts == 0) {
maxInterval = DefaultStubServiceOperationRpcRetryOptions.MAXIMUM_INTERVAL;
} else {
maxInterval = null;
}
Duration maximumInterval = this.maximumInterval;
if (maximumInterval == null && maximumAttempts == 0) {
maximumInterval = initialInterval.multipliedBy(MAXIMUM_INTERVAL_MULTIPLIER);
}

if (maximumJitterCoefficient == -1.0) {
maximumJitterCoefficient =
DefaultStubServiceOperationRpcRetryOptions.MAXIMUM_JITTER_COEFFICIENT;
double maximumJitterCoefficient = this.maximumJitterCoefficient;
if (maximumJitterCoefficient < 0) {
maximumJitterCoefficient = MAXIMUM_JITTER_COEFFICIENT;
}

RpcRetryOptions result =
new RpcRetryOptions(
initialInterval,
congestionInitialInterval,
backoff,
backoffCoefficient,
expiration,
maximumAttempts,
maxInterval,
maximumInterval,
maximumJitterCoefficient,
MoreObjects.firstNonNull(doNotRetry, Collections.emptyList()));
result.validate();
return result;
}

private static boolean isInvalidDuration(Duration d) {
if (d == null) {
return false;
}
return d.isNegative() || d.isZero();
}

private static boolean isInvalidMaxAttempts(int i) {
return i < 0;
}

private static boolean isInvalidBackoffCoefficient(double v) {
return !Double.isFinite(v) || (v != 0.0 && v < 1.0);
}

private static boolean isInvalidJitterCoefficient(double v) {
return !Double.isFinite(v) || (v != -1.0 && (v < 0.0 || v >= 1.0));
}
}

private final Duration initialInterval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@

/** Default rpc retry options for long polls like waiting for the workflow finishing and result. */
public class DefaultStubLongPollRpcRetryOptions {
public static final Duration INITIAL_INTERVAL = Duration.ofMillis(50);

public static final Duration INITIAL_INTERVAL = Duration.ofMillis(200);
public static final Duration CONGESTION_INITIAL_INTERVAL = Duration.ofMillis(1000);
public static final Duration MAXIMUM_INTERVAL = Duration.ofMinutes(1);
public static final double BACKOFF = 1.2;
public static final double MAXIMUM_JITTER_COEFFICIENT = 0.1;
public static final Duration MAXIMUM_INTERVAL = Duration.ofSeconds(10);
public static final double BACKOFF = 2.0;
public static final double MAXIMUM_JITTER_COEFFICIENT = 0.2;

// partial build because expiration is not set, long polls work with absolute deadlines instead
public static final RpcRetryOptions INSTANCE = getBuilder().build();
Expand All @@ -41,14 +42,11 @@ public class DefaultStubLongPollRpcRetryOptions {
}

private static RpcRetryOptions.Builder getBuilder() {
RpcRetryOptions.Builder roBuilder =
RpcRetryOptions.newBuilder()
.setInitialInterval(INITIAL_INTERVAL)
.setCongestionInitialInterval(CONGESTION_INITIAL_INTERVAL)
.setBackoffCoefficient(BACKOFF)
.setMaximumInterval(MAXIMUM_INTERVAL)
.setMaximumJitterCoefficient(MAXIMUM_JITTER_COEFFICIENT);

return roBuilder;
return RpcRetryOptions.newBuilder()
.setInitialInterval(INITIAL_INTERVAL)
.setCongestionInitialInterval(CONGESTION_INITIAL_INTERVAL)
.setBackoffCoefficient(BACKOFF)
.setMaximumInterval(MAXIMUM_INTERVAL)
.setMaximumJitterCoefficient(MAXIMUM_JITTER_COEFFICIENT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,19 @@
* finishing).
*/
public class DefaultStubServiceOperationRpcRetryOptions {
public static final Duration INITIAL_INTERVAL = Duration.ofMillis(50);

public static final Duration INITIAL_INTERVAL = Duration.ofMillis(100);
public static final Duration CONGESTION_INITIAL_INTERVAL = Duration.ofMillis(1000);
public static final Duration EXPIRATION_INTERVAL = Duration.ofMinutes(1);
public static final Duration MAXIMUM_INTERVAL;
public static final double BACKOFF = 2;
public static final double MAXIMUM_JITTER_COEFFICIENT = 0.1;
public static final int MAXIMUM_INTERVAL_MULTIPLIER = 50;
public static final Duration MAXIMUM_INTERVAL =
INITIAL_INTERVAL.multipliedBy(MAXIMUM_INTERVAL_MULTIPLIER);
public static final double BACKOFF = 1.7;
public static final double MAXIMUM_JITTER_COEFFICIENT = 0.2;

public static final RpcRetryOptions INSTANCE;

static {
Duration maxInterval = EXPIRATION_INTERVAL.dividedBy(10);
if (maxInterval.compareTo(INITIAL_INTERVAL) < 0) {
maxInterval = INITIAL_INTERVAL;
}
MAXIMUM_INTERVAL = maxInterval;

INSTANCE = getBuilder().validateBuildWithDefaults();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

package io.temporal.internal.retryer;

import static io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions.CONGESTION_INITIAL_INTERVAL;
import static io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions.MAXIMUM_JITTER_COEFFICIENT;
import static org.junit.Assert.*;

import io.grpc.Context;
Expand Down Expand Up @@ -101,8 +103,9 @@ public void testDoNotRetry() {
try {
DEFAULT_SYNC_RETRYER.retry(
() -> {
if (attempts.incrementAndGet() > 1)
if (attempts.incrementAndGet() > 1) {
fail("We should not retry on exception that we specified to don't retry");
}
throw new StatusRuntimeException(Status.fromCode(STATUS_CODE));
},
new GrpcRetryer.GrpcRetryerOptions(options, null),
Expand Down Expand Up @@ -130,7 +133,9 @@ public void testInterruptedException() {
try {
DEFAULT_SYNC_RETRYER.retry(
() -> {
if (attempts.incrementAndGet() > 1) fail("We should not retry on InterruptedException");
if (attempts.incrementAndGet() > 1) {
fail("We should not retry on InterruptedException");
}
throw new InterruptedException();
},
new GrpcRetryer.GrpcRetryerOptions(options, null),
Expand All @@ -156,8 +161,9 @@ public void testNotStatusRuntimeException() {
try {
DEFAULT_SYNC_RETRYER.retry(
() -> {
if (attempts.incrementAndGet() > 1)
if (attempts.incrementAndGet() > 1) {
fail("We should not retry if the exception is not StatusRuntimeException");
}
throw new IllegalArgumentException("simulated");
},
new GrpcRetryer.GrpcRetryerOptions(options, null),
Expand Down Expand Up @@ -335,7 +341,7 @@ public void testCongestionAndJitterAreNotMandatory() {
assertEquals(3, options.getMaximumAttempts());

// The following were added latter; they must silently use default values if unspecified
assertEquals(Duration.ofMillis(1000), options.getCongestionInitialInterval());
assertEquals(0.1, options.getMaximumJitterCoefficient(), 0.01);
assertEquals(CONGESTION_INITIAL_INTERVAL, options.getCongestionInitialInterval());
assertEquals(MAXIMUM_JITTER_COEFFICIENT, options.getMaximumJitterCoefficient(), 0.01);
}
}

0 comments on commit 2b05f07

Please sign in to comment.