Skip to content

Commit

Permalink
Merge pull request #367: SDK 2.7.0-beta - interceptors
Browse files Browse the repository at this point in the history
  • Loading branch information
roxblnfk authored Nov 2, 2023
2 parents b22625e + 647b6fa commit 775cc98
Show file tree
Hide file tree
Showing 182 changed files with 5,900 additions and 1,078 deletions.
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
"autoload-dev": {
"psr-4": {
"Temporal\\Tests\\": "tests",
"Temporal\\Tests\\Interceptor\\": "tests/Fixtures/src/Interceptor",
"Temporal\\Tests\\Workflow\\": "tests/Fixtures/src/Workflow",
"Temporal\\Tests\\Activity\\": "tests/Fixtures/src/Activity",
"Temporal\\Tests\\DTO\\": "tests/Fixtures/src/DTO",
Expand All @@ -92,7 +93,7 @@
},
"extra": {
"branch-alias": {
"dev-master": "2.6.x-dev"
"dev-master": "2.7.x-dev"
}
},
"config": {
Expand Down
88 changes: 74 additions & 14 deletions src/Client/GRPC/BaseClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
namespace Temporal\Client\GRPC;

use Carbon\CarbonInterval;
use Closure;
use Exception;
use Grpc\UnaryCall;
use Temporal\Api\Workflowservice\V1\WorkflowServiceClient;
use Temporal\Exception\Client\ServiceClientException;
use Temporal\Exception\Client\TimeoutException;
use Temporal\Interceptor\GrpcClientInterceptor;
use Temporal\Internal\Interceptor\Pipeline;

abstract class BaseClient implements ServiceClientInterface
{
Expand All @@ -26,36 +30,35 @@ abstract class BaseClient implements ServiceClientInterface
];
private WorkflowServiceClient $workflowService;

/** @var null|Closure(string $method, object $arg, ContextInterface $ctx): object */
private ?Closure $invokePipeline = null;

/** @var callable */
private $workflowServiceCloser;

/**
* @param WorkflowServiceClient $workflowService
*/
public function __construct(WorkflowServiceClient $workflowService)
{
$this->workflowService = $workflowService;
}

/**
* Close connection and destruct client.
*/
public function __destruct()
{
$this->close();
$this->workflowServiceCloser = $this->makeClientCloser($workflowService);
}

/**
* Close the communication channel associated with this stub.
*/
public function close(): void
{
$this->workflowService->close();
($this->workflowServiceCloser)();
}

/**
* @param string $address
* @return ServiceClientInterface
* @return static
* @psalm-suppress UndefinedClass
*/
public static function create(string $address): ServiceClientInterface
public static function create(string $address): static
{
$client = new WorkflowServiceClient(
$address,
Expand All @@ -71,7 +74,7 @@ public static function create(string $address): ServiceClientInterface
* @param string|null $clientKey
* @param string|null $clientPem
* @param string|null $overrideServerName
* @return ServiceClientInterface
* @return static
*
* @psalm-suppress UndefinedClass
* @psalm-suppress UnusedVariable
Expand All @@ -82,8 +85,7 @@ public static function createSSL(
string $clientKey = null,
string $clientPem = null,
string $overrideServerName = null
): ServiceClientInterface
{
): static {
$options = [
'credentials' => \Grpc\ChannelCredentials::createSsl(
\is_file($crt) ? \file_get_contents($crt) : null,
Expand All @@ -102,10 +104,25 @@ public static function createSSL(
return new static($client);
}

/**
* @param null|Pipeline<GrpcClientInterceptor, object> $pipeline
*
* @return static
*/
final public function withInterceptorsPipeline(?Pipeline $pipeline): static
{
$clone = clone $this;
/** @see GrpcClientInterceptor::interceptCall() */
$callable = $pipeline?->with(Closure::fromCallable([$clone, 'call']), 'interceptCall');
$clone->invokePipeline = $callable === null ? null : Closure::fromCallable($callable);
return $clone;
}

/**
* @param non-empty-string $method RPC method name
* @param object $arg
* @param ContextInterface|null $ctx
*
* @return mixed
*
* @throw ClientException
Expand All @@ -114,6 +131,25 @@ protected function invoke(string $method, object $arg, ContextInterface $ctx = n
{
$ctx = $ctx ?? Context::default();

return $this->invokePipeline !== null
? ($this->invokePipeline)($method, $arg, $ctx)
: $this->call($method, $arg, $ctx);
}

/**
* Call a gRPC method.
* Used in {@see withInterceptorsPipeline()}
*
* @param non-empty-string $method
* @param object $arg
* @param ContextInterface $ctx
*
* @return object
*
* @throws Exception
*/
private function call(string $method, object $arg, ContextInterface $ctx): object
{
$attempt = 0;
$retryOption = $ctx->getRetryOptions();

Expand Down Expand Up @@ -174,4 +210,28 @@ protected function invoke(string $method, object $arg, ContextInterface $ctx = n
}
} while (true);
}

/**
* Makes an object that will close workflow service client connection on parent class destruct.
*
* @param WorkflowServiceClient $workflowServiceClient
*
* @return callable
*/
private function makeClientCloser(WorkflowServiceClient $workflowServiceClient): callable
{
return new class ($workflowServiceClient) {
public function __construct(public WorkflowServiceClient $client) { }

public function __invoke(): void
{
$this->client->close();
}

public function __destruct()
{
$this->client->close();
}
};
}
}
16 changes: 15 additions & 1 deletion src/Client/GRPC/ServiceClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
* file that was distributed with this source code.
*/


namespace Temporal\Client\GRPC;

use Temporal\Api\Workflowservice\V1;
use Temporal\Client\ServerCapabilities;
use Temporal\Exception\Client\ServiceClientException;

class ServiceClient extends BaseClient
{
private ?ServerCapabilities $capabilities = null;

/**
* RegisterNamespace creates a new namespace which can be used as a container for
* all resources.
Expand Down Expand Up @@ -1037,5 +1039,17 @@ public function ListBatchOperations(V1\ListBatchOperationsRequest $arg, ContextI
{
return $this->invoke("ListBatchOperations", $arg, $ctx);
}

// todo Remove from autogenerated file
public function getServerCapabilities(): ?ServerCapabilities
{
return $this->capabilities;
}

// todo Remove from autogenerated file
public function setServerCapabilities(ServerCapabilities $capabilities): void
{
$this->capabilities = $capabilities;
}
}

64 changes: 64 additions & 0 deletions src/Client/Interceptor/SystemInfoInterceptor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?php

/**
* This file is part of Temporal package.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Temporal\Client\Interceptor;

use Temporal\Api\Workflowservice\V1\GetSystemInfoRequest;
use Temporal\Client\GRPC\ContextInterface;
use Temporal\Client\GRPC\ServiceClient;
use Temporal\Client\GRPC\StatusCode;
use Temporal\Client\ServerCapabilities;
use Temporal\Exception\Client\ServiceClientException;
use Temporal\Interceptor\GrpcClientInterceptor;

/**
* @psalm-immutable
*/
final class SystemInfoInterceptor implements GrpcClientInterceptor
{
private bool $systemInfoRequested = false;

public function __construct(
private ServiceClient $serviceClient
) {
}

/**
* @param non-empty-string $method
* @param callable(non-empty-string, object, ContextInterface): object $next
*/
public function interceptCall(string $method, object $arg, ContextInterface $ctx, callable $next): object
{
if ($this->systemInfoRequested) {
return $next($method, $arg, $ctx);

Check failure on line 41 in src/Client/Interceptor/SystemInfoInterceptor.php

View workflow job for this annotation

GitHub Actions / Psalm Validation (PHP 8.2, OS ubuntu-latest)

ImpureFunctionCall

src/Client/Interceptor/SystemInfoInterceptor.php:41:20: ImpureFunctionCall: Cannot call an impure function from a mutation-free context (see https://psalm.dev/202)
}

try {
$systemInfo = $this->serviceClient->getSystemInfo(new GetSystemInfoRequest());

Check failure on line 45 in src/Client/Interceptor/SystemInfoInterceptor.php

View workflow job for this annotation

GitHub Actions / Psalm Validation (PHP 8.2, OS ubuntu-latest)

ImpureMethodCall

src/Client/Interceptor/SystemInfoInterceptor.php:45:49: ImpureMethodCall: Cannot call a possibly-mutating method Temporal\Client\GRPC\ServiceClient::getSystemInfo from a mutation-free context (see https://psalm.dev/203)

$capabilities = $systemInfo->getCapabilities();

Check failure on line 47 in src/Client/Interceptor/SystemInfoInterceptor.php

View workflow job for this annotation

GitHub Actions / Psalm Validation (PHP 8.2, OS ubuntu-latest)

ImpureMethodCall

src/Client/Interceptor/SystemInfoInterceptor.php:47:42: ImpureMethodCall: Cannot call a possibly-mutating method Temporal\Api\Workflowservice\V1\GetSystemInfoResponse::getCapabilities from a mutation-free context (see https://psalm.dev/203)
if ($capabilities !== null && $this->serviceClient->getServerCapabilities() === null) {
$this->serviceClient->setServerCapabilities(new ServerCapabilities(

Check failure on line 49 in src/Client/Interceptor/SystemInfoInterceptor.php

View workflow job for this annotation

GitHub Actions / Psalm Validation (PHP 8.2, OS ubuntu-latest)

ImpureMethodCall

src/Client/Interceptor/SystemInfoInterceptor.php:49:39: ImpureMethodCall: Cannot call a possibly-mutating method Temporal\Client\GRPC\ServiceClient::setServerCapabilities from a mutation-free context (see https://psalm.dev/203)
signalAndQueryHeader: $capabilities->getSignalAndQueryHeader(),

Check failure on line 50 in src/Client/Interceptor/SystemInfoInterceptor.php

View workflow job for this annotation

GitHub Actions / Psalm Validation (PHP 8.2, OS ubuntu-latest)

ImpureMethodCall

src/Client/Interceptor/SystemInfoInterceptor.php:50:58: ImpureMethodCall: Cannot call a possibly-mutating method Temporal\Api\Workflowservice\V1\GetSystemInfoResponse\Capabilities::getSignalAndQueryHeader from a mutation-free context (see https://psalm.dev/203)
internalErrorDifferentiation: $capabilities->getInternalErrorDifferentiation()

Check failure on line 51 in src/Client/Interceptor/SystemInfoInterceptor.php

View workflow job for this annotation

GitHub Actions / Psalm Validation (PHP 8.2, OS ubuntu-latest)

ImpureMethodCall

src/Client/Interceptor/SystemInfoInterceptor.php:51:66: ImpureMethodCall: Cannot call a possibly-mutating method Temporal\Api\Workflowservice\V1\GetSystemInfoResponse\Capabilities::getInternalErrorDifferentiation from a mutation-free context (see https://psalm.dev/203)
));
}
} catch (ServiceClientException $e) {
if ($e->getCode() !== StatusCode::UNIMPLEMENTED) {
throw $e;
}
}

$this->systemInfoRequested = true;

Check failure on line 60 in src/Client/Interceptor/SystemInfoInterceptor.php

View workflow job for this annotation

GitHub Actions / Psalm Validation (PHP 8.2, OS ubuntu-latest)

InaccessibleProperty

src/Client/Interceptor/SystemInfoInterceptor.php:60:9: InaccessibleProperty: Temporal\Client\Interceptor\SystemInfoInterceptor::$systemInfoRequested is marked readonly (see https://psalm.dev/054)

return $next($method, $arg, $ctx);

Check failure on line 62 in src/Client/Interceptor/SystemInfoInterceptor.php

View workflow job for this annotation

GitHub Actions / Psalm Validation (PHP 8.2, OS ubuntu-latest)

ImpureFunctionCall

src/Client/Interceptor/SystemInfoInterceptor.php:62:16: ImpureFunctionCall: Cannot call an impure function from a mutation-free context (see https://psalm.dev/202)
}
}
40 changes: 40 additions & 0 deletions src/Client/ServerCapabilities.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

/**
* This file is part of Temporal package.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Temporal\Client;

final class ServerCapabilities
{
public function __construct(
private bool $signalAndQueryHeader = false,
private bool $internalErrorDifferentiation = false
) {
}

/**
* True if signal and query headers are supported.
*/
public function isSignalAndQueryHeaderSupports(): bool
{
return $this->signalAndQueryHeader;
}

/**
* True if internal errors are differentiated from other types of errors for purposes of
* retrying non-internal errors.
* When unset/false, clients retry all failures. When true, clients should only retry
* non-internal errors.
*/
public function isInternalErrorDifferentiation(): bool
{
return $this->internalErrorDifferentiation;
}
}
Loading

0 comments on commit 775cc98

Please sign in to comment.