Skip to content

Commit

Permalink
Merge pull request #425: Add RpcRetryOption and use longer retry inte…
Browse files Browse the repository at this point in the history
…rval on RESOURCE_EXHAUSTED
  • Loading branch information
roxblnfk authored May 10, 2024
2 parents 8cdcc18 + 90a2a1c commit 6e710e3
Show file tree
Hide file tree
Showing 13 changed files with 415 additions and 40 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"spiral/attributes": "^3.1.4",
"spiral/roadrunner": "^2023.3.12 || ^2024.1",
"spiral/roadrunner-cli": "^2.5",
"spiral/roadrunner-kv": "^4.0",
"spiral/roadrunner-kv": "^4.2",
"spiral/roadrunner-worker": "^3.0",
"symfony/filesystem": "^5.4 || ^6.0 || ^7.0",
"symfony/http-client": "^5.4 || ^6.0 || ^7.0",
Expand Down
11 changes: 1 addition & 10 deletions psalm-baseline.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<files psalm-version="5.23.1@8471a896ccea3526b26d082f4461eeea467f10a4">
<files psalm-version="5.24.0@462c80e31c34e58cc4f750c656be3927e80e550e">
<file src="src/Activity.php">
<ImplicitToStringCast>
<code><![CDATA[$type]]></code>
Expand Down Expand Up @@ -43,15 +43,6 @@
<code><![CDATA[recordHeartbeatByToken]]></code>
</MissingReturnType>
</file>
<file src="src/Client/GRPC/BaseClient.php">
<ArgumentTypeCoercion>
<code><![CDATA[(int)$waitRetry->totalMicroseconds]]></code>
</ArgumentTypeCoercion>
<InvalidArgument>
<code><![CDATA[$retryOption->maximumInterval]]></code>
<code><![CDATA[$waitRetry]]></code>
</InvalidArgument>
</file>
<file src="src/Client/GRPC/Context.php">
<ArgumentTypeCoercion>
<code><![CDATA[$format]]></code>
Expand Down
88 changes: 88 additions & 0 deletions src/Client/Common/BackoffThrottler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
<?php

declare(strict_types=1);

namespace Temporal\Client\Common;

/**
* Used to throttle code execution in presence of failures using exponential backoff logic.
*
* The formula used to calculate the next sleep interval is:
*
* ```
* jitter = rand(-maxJitterCoefficient, +maxJitterCoefficient)
* wait = min(pow(backoffCoefficient, failureCount - 1) * initialInterval * (1 + jitter), maxInterval)
* ```
*
* Note
* `initialInterval` may be changed in runtime depending on the failure type.
* That it means that attempt X can possibly get a shorter throttle than attempt X-1.
*
* Example:
*
* ```php
* $throttler = new BackoffThrottler(maxInterval: 60_000, 0.1, 2.0);
*
* // First retry
* // There 1000 is initial interval for the RESOURCE_EXHAUSTED exception
* $throttler->calculateSleepTime(failureCount: 1, baseInterval: 1000);
*
* // Second retry
* // There 500 is a common initial interval for all other exceptions
* $throttler->calculateSleepTime(failureCount: 2, baseInterval: 500);
* ```
*
* @internal
*/
final class BackoffThrottler
{
/**
* @param int $maxInterval Maximum sleep interval in milliseconds. Must be greater than 0.
* @param float $maxJitterCoefficient Maximum jitter to apply. Must be in the range [0.0, 1.0).
* 0.1 means that actual retry time can be +/- 10% of the calculated time.
* @param float $backoffCoefficient Coefficient used to calculate the next retry backoff interval.
* The next retry interval is previous interval multiplied by this coefficient.
* Must be greater than 1.0.
*/
public function __construct(
private readonly int $maxInterval,
private readonly float $maxJitterCoefficient,
private readonly float $backoffCoefficient,
) {
$maxJitterCoefficient >= 0 && $maxJitterCoefficient < 1 or throw new \InvalidArgumentException(
'$jitterCoefficient must be in the range [0.0, 1.0).',
);
$this->maxInterval > 0 or throw new \InvalidArgumentException('$maxInterval must be greater than 0.');
$this->backoffCoefficient >= 1.0 or throw new \InvalidArgumentException(
'$backoffCoefficient must be greater than 1.',
);
}

/**
* Calculates the next sleep interval in milliseconds.
*
* @param int $failureCount number of failures
* @param int $initialInterval in milliseconds
*
* @return int<0, max>
*
* @psalm-assert int<1, max> $failureCount
* @psalm-assert int<1, max> $initialInterval
*
* @psalm-suppress InvalidOperand
*/
public function calculateSleepTime(int $failureCount, int $initialInterval): int
{
$failureCount > 0 or throw new \InvalidArgumentException('$failureCount must be greater than 0.');
$initialInterval > 0 or throw new \InvalidArgumentException('$initialInterval must be greater than 0.');

// Choose a random number in the range -maxJitterCoefficient ... +maxJitterCoefficient
$jitter = \random_int(-1000, 1000) * $this->maxJitterCoefficient / 1000;
$sleepTime = \min(
\pow($this->backoffCoefficient, $failureCount - 1) * $initialInterval * (1.0 + $jitter),
$this->maxInterval,
);

return \abs((int)$sleepTime);
}
}
2 changes: 1 addition & 1 deletion src/Client/Common/ClientContextInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public function withTimeout(float $timeout): static;
*/
public function withDeadline(\DateTimeInterface $deadline): static;

public function withRetryOptions(RetryOptions $options): static;
public function withRetryOptions(RpcRetryOptions $options): static;

/**
* A metadata map to send to the server
Expand Down
2 changes: 1 addition & 1 deletion src/Client/Common/ClientContextTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public function withDeadline(\DateTimeInterface $deadline): static
return $new;
}

public function withRetryOptions(RetryOptions $options): static
public function withRetryOptions(RpcRetryOptions $options): static
{
$new = clone $this;
$context = $new->client->getContext();
Expand Down
92 changes: 92 additions & 0 deletions src/Client/Common/RpcRetryOptions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
<?php

declare(strict_types=1);

namespace Temporal\Client\Common;

use JetBrains\PhpStorm\Pure;
use Temporal\Common\RetryOptions;
use Temporal\Internal\Support\DateInterval;

/**
* @psalm-import-type DateIntervalValue from DateInterval
* @psalm-immutable
*/
final class RpcRetryOptions extends RetryOptions
{
/**
* Interval of the first retry, on congestion related failures (i.e. RESOURCE_EXHAUSTED errors).
*
* If coefficient is 1.0 then it is used for all retries. Defaults to 1000ms.
*/
public ?\DateInterval $congestionInitialInterval = null;

/**
* Maximum amount of jitter to apply.
* Must be lower than 1.
*
* 0.1 means that actual retry time can be +/- 10% of the calculated time.
*/
public float $maximumJitterCoefficient = 0.1;

/**
* Converts {@see RetryOptions} to {@see RpcRetryOptions}.
*
* @internal
*/
public static function fromRetryOptions(RetryOptions $options): self
{
return $options instanceof self ? $options : (new self())
->withInitialInterval($options->initialInterval)
->withBackoffCoefficient($options->backoffCoefficient)
->withMaximumInterval($options->maximumInterval)
->withMaximumAttempts($options->maximumAttempts)
->withNonRetryableExceptions($options->nonRetryableExceptions);
}

/**
* Interval of the first retry, on congestion related failures (i.e. RESOURCE_EXHAUSTED errors).
* If coefficient is 1.0 then it is used for all retries. Defaults to 1000ms.
*
* @param DateIntervalValue|null $interval Interval to wait on first retry, on congestion failures.
* Defaults to 1000ms, which is used if set to {@see null}.
*
* @return self
*
* @psalm-suppress ImpureMethodCall
*/
#[Pure]
public function withCongestionInitialInterval($interval): self
{
$interval === null || DateInterval::assert($interval) or throw new \InvalidArgumentException(
'Invalid interval value.'
);

$self = clone $this;
$self->congestionInitialInterval = DateInterval::parseOrNull($interval, DateInterval::FORMAT_SECONDS);
return $self;
}

/**
* Maximum amount of jitter to apply.
*
* 0.2 means that actual retry time can be +/- 20% of the calculated time.
* Set to 0 to disable jitter. Must be lower than 1.
*
* @param null|float $coefficient Maximum amount of jitter.
* Default will be used if set to {@see null}.
*
* @return self
*/
#[Pure]
public function withMaximumJitterCoefficient(?float $coefficient): self
{
$coefficient === null || ($coefficient >= 0.0 && $coefficient < 1.0) or throw new \InvalidArgumentException(
'Maximum jitter coefficient must be in range [0, 1).'
);

$self = clone $this;
$self->maximumJitterCoefficient = $coefficient ?? 0.1;
return $self;
}
}
48 changes: 30 additions & 18 deletions src/Client/GRPC/BaseClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
use Grpc\UnaryCall;
use Temporal\Api\Workflowservice\V1\GetSystemInfoRequest;
use Temporal\Api\Workflowservice\V1\WorkflowServiceClient;
use Temporal\Client\Common\BackoffThrottler;
use Temporal\Client\Common\RpcRetryOptions;
use Temporal\Client\Common\ServerCapabilities;
use Temporal\Client\GRPC\Connection\Connection;
use Temporal\Client\GRPC\Connection\ConnectionInterface;
Expand Down Expand Up @@ -272,18 +274,11 @@ protected function invoke(string $method, object $arg, ?ContextInterface $ctx =
private function call(string $method, object $arg, ContextInterface $ctx): object
{
$attempt = 0;
$retryOption = $ctx->getRetryOptions();

$maxInterval = null;
if ($retryOption->maximumInterval !== null) {
$maxInterval = CarbonInterval::create($retryOption->maximumInterval);
}

$waitRetry = $retryOption->initialInterval ?? CarbonInterval::millisecond(500);
$waitRetry = CarbonInterval::create($waitRetry);
$retryOption = RpcRetryOptions::fromRetryOptions($ctx->getRetryOptions());
$initialIntervalMs = $congestionInitialIntervalMs = $throttler = null;

do {
$attempt++;
++$attempt;
try {
$options = $ctx->getOptions();
$deadline = $ctx->getDeadline();
Expand Down Expand Up @@ -312,23 +307,40 @@ private function call(string $method, object $arg, ContextInterface $ctx): objec
}

if ($retryOption->maximumAttempts !== 0 && $attempt >= $retryOption->maximumAttempts) {
// Reached maximum attempts
throw $e;
}

if ($ctx->getDeadline() !== null && $ctx->getDeadline() > new DateTimeImmutable()) {
// Deadline is reached
throw new TimeoutException('Call timeout has been reached');
}

// wait till the next call
$this->usleep((int)$waitRetry->totalMicroseconds);

$waitRetry = CarbonInterval::millisecond(
$waitRetry->totalMilliseconds * $retryOption->backoffCoefficient
// Init interval values in milliseconds
$initialIntervalMs ??= $retryOption->initialInterval === null
? (int)CarbonInterval::millisecond(50)->totalMilliseconds
: (int)(new CarbonInterval($retryOption->initialInterval))->totalMilliseconds;
$congestionInitialIntervalMs ??= $retryOption->congestionInitialInterval === null
? (int)CarbonInterval::millisecond(1000)->totalMilliseconds
: (int)(new CarbonInterval($retryOption->congestionInitialInterval))->totalMilliseconds;

$throttler ??= new BackoffThrottler(
maxInterval: $retryOption->maximumInterval !== null
? (int)(new CarbonInterval($retryOption->maximumInterval))->totalMilliseconds
: $initialIntervalMs * 200,
maxJitterCoefficient: $retryOption->maximumJitterCoefficient,
backoffCoefficient: $retryOption->backoffCoefficient
);

if ($maxInterval !== null && $maxInterval->totalMilliseconds < $waitRetry->totalMilliseconds) {
$waitRetry = $maxInterval;
}
// Initial interval always depends on the *most recent* failure.
$baseInterval = $e->getCode() === StatusCode::RESOURCE_EXHAUSTED
? $congestionInitialIntervalMs
: $initialIntervalMs;

$wait = $throttler->calculateSleepTime(failureCount: $attempt, initialInterval: $baseInterval);

// wait till the next call
$this->usleep($wait);
}
} while (true);
}
Expand Down
3 changes: 2 additions & 1 deletion src/Client/GRPC/Context.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Temporal\Client\GRPC;

use Carbon\CarbonInterval;
use Temporal\Client\Common\RpcRetryOptions;
use Temporal\Common\RetryOptions;
use Temporal\Common\SdkVersion;
use Temporal\Internal\Support\DateInterval;
Expand All @@ -25,7 +26,7 @@ final class Context implements ContextInterface

private function __construct()
{
$this->retryOptions = RetryOptions::new()
$this->retryOptions = RpcRetryOptions::new()
->withMaximumAttempts(0)
->withInitialInterval(CarbonInterval::millisecond(500));

Expand Down
10 changes: 5 additions & 5 deletions src/Common/RetryOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public function mergeWith(MethodRetry $retry = null): self
* @psalm-suppress ImpureMethodCall
*
* @param DateIntervalValue|null $interval
* @return self
* @return static
*/
#[Pure]
public function withInitialInterval($interval): self
Expand All @@ -143,7 +143,7 @@ public function withInitialInterval($interval): self
* @psalm-suppress ImpureMethodCall
*
* @param float $coefficient
* @return self
* @return static
*/
#[Pure]
public function withBackoffCoefficient(float $coefficient): self
Expand All @@ -159,7 +159,7 @@ public function withBackoffCoefficient(float $coefficient): self
* @psalm-suppress ImpureMethodCall
*
* @param DateIntervalValue|null $interval
* @return self
* @return static
*/
#[Pure]
public function withMaximumInterval($interval): self
Expand All @@ -175,7 +175,7 @@ public function withMaximumInterval($interval): self
* @psalm-suppress ImpureMethodCall
*
* @param int<0, max> $attempts
* @return self
* @return static
*/
#[Pure]
public function withMaximumAttempts(int $attempts): self
Expand All @@ -192,7 +192,7 @@ public function withMaximumAttempts(int $attempts): self
* @psalm-suppress ImpureMethodCall
*
* @param ExceptionsList $exceptions
* @return self
* @return static
*/
#[Pure]
public function withNonRetryableExceptions(array $exceptions): self
Expand Down
2 changes: 1 addition & 1 deletion src/Workflow/ChildWorkflowOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public function __construct()
* @param CronSchedule|null $cron
* @return $this
*/
public function mergeWith(MethodRetry $retry = null, CronSchedule $cron = null): self
public function mergeWith(?MethodRetry $retry = null, ?CronSchedule $cron = null): self
{
$self = clone $this;

Expand Down
Loading

0 comments on commit 6e710e3

Please sign in to comment.