Skip to content

Commit

Permalink
Fixes interceptors support for Workflow client.
Browse files Browse the repository at this point in the history
  • Loading branch information
butschster committed Apr 25, 2023
1 parent c555447 commit a71b80d
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 59 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ docs
vendor
node_modules
.php-cs-fixer.cache
tests/runtime
2 changes: 1 addition & 1 deletion src/Bootloader/PrototypeBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use Spiral\TemporalBridge\WorkflowManagerInterface;
use Temporal\Client\WorkflowClientInterface;

class PrototypeBootloader extends Bootloader
final class PrototypeBootloader extends Bootloader
{
protected const DEPENDENCIES = [
BasePrototypeBootloader::class,
Expand Down
44 changes: 32 additions & 12 deletions src/Bootloader/TemporalBridgeBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
use Spiral\Config\ConfiguratorInterface;
use Spiral\Config\Patch\Append;
use Spiral\Console\Bootloader\ConsoleBootloader;
use Spiral\Core\Container\Autowire;
use Spiral\Core\FactoryInterface;
use Spiral\RoadRunnerBridge\Bootloader\RoadRunnerBootloader;
use Spiral\TemporalBridge\Commands;
use Spiral\TemporalBridge\Config\TemporalConfig;
use Spiral\TemporalBridge\DeclarationLocator;
use Spiral\TemporalBridge\DeclarationLocatorInterface;
use Spiral\TemporalBridge\Dispatcher;
use Spiral\TemporalBridge\Preset\PresetRegistry;
Expand All @@ -34,8 +36,9 @@
use Temporal\Client\WorkflowClientInterface;
use Temporal\DataConverter\DataConverter;
use Temporal\DataConverter\DataConverterInterface;
use Temporal\Interceptor\SimplePipelineProvider;
use Temporal\Interceptor\PipelineProvider;
use Temporal\Interceptor\SimplePipelineProvider;
use Temporal\Internal\Interceptor\Interceptor;
use Temporal\Worker\Transport\Goridge;
use Temporal\Worker\WorkerFactoryInterface as TemporalWorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;
Expand All @@ -53,7 +56,7 @@ class TemporalBridgeBootloader extends Bootloader
PresetRegistryInterface::class => PresetRegistry::class,
DataConverterInterface::class => [self::class, 'initDataConverter'],
WorkerFactoryInterface::class => WorkerFactory::class,
PipelineProvider::class => SimplePipelineProvider::class,
PipelineProvider::class => [self::class, 'initPipelineProvider'],
];

protected const DEPENDENCIES = [
Expand All @@ -62,15 +65,15 @@ class TemporalBridgeBootloader extends Bootloader
];

public function __construct(
private readonly ConfiguratorInterface $config
private readonly ConfiguratorInterface $config,
) {
}

public function init(
AbstractKernel $kernel,
EnvironmentInterface $env,
ConsoleBootloader $console,
FactoryInterface $factory
FactoryInterface $factory,
): void {
$this->initConfig($env);

Expand All @@ -88,7 +91,7 @@ public function addWorkerOptions(string $worker, WorkerOptions $options): void

protected function initWorkflowPresetLocator(
FactoryInterface $factory,
ClassesInterface $classes
ClassesInterface $classes,
): WorkflowPresetLocatorInterface {
return new WorkflowPresetLocator(
factory: $factory,
Expand All @@ -106,18 +109,20 @@ protected function initConfig(EnvironmentInterface $env): void
'namespace' => 'App\\Workflow',
'defaultWorker' => (string)$env->get('TEMPORAL_TASK_QUEUE', TemporalWorkerFactory::DEFAULT_TASK_QUEUE),
'workers' => [],
]
],
);
}

protected function initWorkflowClient(
TemporalConfig $config,
DataConverterInterface $dataConverter
DataConverterInterface $dataConverter,
PipelineProvider $pipelineProvider,
): WorkflowClientInterface {
return WorkflowClient::create(
return new WorkflowClient(
serviceClient: ServiceClient::create($config->getAddress()),
options: (new ClientOptions())->withNamespace($config->getTemporalNamespace()),
converter: $dataConverter
converter: $dataConverter,
interceptorProvider: $pipelineProvider,
);
}

Expand All @@ -127,7 +132,7 @@ protected function initDataConverter(): DataConverterInterface
}

protected function initWorkerFactory(
DataConverterInterface $dataConverter
DataConverterInterface $dataConverter,
): TemporalWorkerFactoryInterface {
return new TemporalWorkerFactory(
dataConverter: $dataConverter,
Expand All @@ -136,11 +141,26 @@ protected function initWorkerFactory(
}

protected function initDeclarationLocator(
ClassesInterface $classes
ClassesInterface $classes,
): DeclarationLocatorInterface {
return new \Spiral\TemporalBridge\DeclarationLocator(
return new DeclarationLocator(
classes: $classes,
reader: new AttributeReader()
);
}

protected function initPipelineProvider(TemporalConfig $config, FactoryInterface $factory): PipelineProvider
{
/** @var Interceptor[] $interceptors */
$interceptors = \array_map(
static fn (mixed $interceptor) => match (true) {
\is_string($interceptor) => $factory->make($interceptor),
$interceptor instanceof Autowire => $interceptor->resolve($factory),
default => $interceptor
},
$config->getInterceptors(),
);

return new SimplePipelineProvider($interceptors);
}
}
13 changes: 11 additions & 2 deletions src/Config/TemporalConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* @psalm-type TExceptionInterceptor = ExceptionInterceptorInterface|class-string<ExceptionInterceptorInterface>|Autowire<ExceptionInterceptorInterface>
* @psalm-type TWorker = array{
* options?: WorkerOptions,
* interceptors?: TInterceptor[],
* exception_interceptor?: TExceptionInterceptor
* }
*
Expand All @@ -25,7 +24,8 @@
* namespace: non-empty-string,
* temporalNamespace: non-empty-string,
* defaultWorker: non-empty-string,
* workers: array<non-empty-string, WorkerOptions|TWorker>
* workers: array<non-empty-string, WorkerOptions|TWorker>,
* interceptors?: TInterceptor[]
* } $config
*/
final class TemporalConfig extends InjectableConfig
Expand All @@ -38,6 +38,7 @@ final class TemporalConfig extends InjectableConfig
'temporalNamespace' => 'default',
'defaultWorker' => WorkerFactoryInterface::DEFAULT_TASK_QUEUE,
'workers' => [],
'interceptors' => [],
];

/**
Expand Down Expand Up @@ -79,4 +80,12 @@ public function getWorkers(): array
{
return $this->config['workers'] ?? [];
}

/**
* @return TInterceptor[]
*/
public function getInterceptors(): array
{
return $this->config['interceptors'] ?? [];
}
}
31 changes: 5 additions & 26 deletions src/WorkerFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
use Spiral\Core\FactoryInterface;
use Spiral\TemporalBridge\Config\TemporalConfig;
use Temporal\Exception\ExceptionInterceptorInterface;
use Temporal\Internal\Interceptor\Interceptor;
use Temporal\Interceptor\PipelineProvider;
use Temporal\Worker\WorkerInterface;
use Temporal\Worker\WorkerFactoryInterface as TemporalWorkerFactory;
use Temporal\Worker\WorkerInterface;
use Temporal\Worker\WorkerOptions;

/**
Expand All @@ -27,7 +26,8 @@ public function __construct(
private readonly TemporalWorkerFactory $workerFactory,
private readonly FinalizerInterface $finalizer,
private readonly FactoryInterface $factory,
private readonly TemporalConfig $config
private readonly PipelineProvider $pipelineProvider,
private readonly TemporalConfig $config,
) {
$this->workers = $this->config->getWorkers();
}
Expand All @@ -42,9 +42,9 @@ public function create(string $name): WorkerInterface
$name,
$this->getWorkerOptions($name),
$this->getExceptionInterceptor($name),
$this->getPipelineProvider($name)
$this->pipelineProvider,
);
$worker->registerActivityFinalizer(fn() => $this->finalizer->finalize());
$worker->registerActivityFinalizer(fn () => $this->finalizer->finalize());

return $worker;
}
Expand Down Expand Up @@ -79,27 +79,6 @@ private function getExceptionInterceptor(string $name): ?ExceptionInterceptorInt
return $exceptionInterceptor;
}

/**
* @param non-empty-string $name
*/
private function getPipelineProvider(string $name): ?PipelineProvider
{
$worker = $this->workers[$name] ?? null;
if (!\is_array($worker) || !isset($worker['interceptors'])) {
return null;
}

$interceptors = [];
foreach ($worker['interceptors'] as $interceptor) {
$interceptor = $this->wire($interceptor);
\assert($interceptor instanceof Interceptor);

$interceptors[] = $interceptor;
}

return $this->factory->make(PipelineProvider::class, ['interceptors' => $interceptors]);
}

private function wire(mixed $alias): object
{
return match (true) {
Expand Down
1 change: 0 additions & 1 deletion src/Workflow/Workflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use Temporal\Client\WorkflowClientInterface;
use Temporal\Client\WorkflowOptions;
use Temporal\Client\WorkflowStubInterface;
use Temporal\Common\RetryOptions;
use Temporal\Internal\Client\WorkflowProxy;
use Temporal\Internal\Support\DateInterval;
Expand Down
38 changes: 21 additions & 17 deletions tests/src/WorkerFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
use Spiral\TemporalBridge\Tests\App\SomeInterceptor;
use Spiral\TemporalBridge\WorkerFactory;
use Temporal\Exception\ExceptionInterceptor;
use Temporal\Interceptor\SimplePipelineProvider;
use Temporal\Interceptor\PipelineProvider;
use Temporal\Interceptor\SimplePipelineProvider;
use Temporal\Worker\WorkerFactoryInterface as TemporalWorkerFactory;
use Temporal\Worker\WorkerInterface;
use Temporal\Worker\WorkerOptions;
Expand Down Expand Up @@ -87,7 +87,7 @@ public function testCreateWithInterceptors(): void
$expectedInterceptors = new SimplePipelineProvider([
new SomeInterceptor(),
new SomeInterceptor(),
new SomeInterceptor()
new SomeInterceptor(),
]);

$this->temporalWorkerFactory
Expand All @@ -96,7 +96,7 @@ public function testCreateWithInterceptors(): void
->with('with-interceptors', null, null, $this->equalTo($expectedInterceptors))
->willReturn($worker = $this->createMock(WorkerInterface::class));

$factory = $this->createWorkerFactory($this->temporalWorkerFactory);
$factory = $this->createWorkerFactory($this->temporalWorkerFactory, $expectedInterceptors);

$this->assertSame($worker, $factory->create('with-interceptors'));
}
Expand All @@ -106,7 +106,7 @@ public function testCreateWithAllOptions(): void
$expectedInterceptors = new SimplePipelineProvider([
new SomeInterceptor(),
new SomeInterceptor(),
new SomeInterceptor()
new SomeInterceptor(),
]);

$this->temporalWorkerFactory
Expand All @@ -116,11 +116,11 @@ public function testCreateWithAllOptions(): void
'all',
$this->equalTo(WorkerOptions::new()->withEnableSessionWorker()),
$this->equalTo(new ExceptionInterceptor([])),
$this->equalTo($expectedInterceptors)
$this->equalTo($expectedInterceptors),
)
->willReturn($worker = $this->createMock(WorkerInterface::class));

$factory = $this->createWorkerFactory($this->temporalWorkerFactory);
$factory = $this->createWorkerFactory($this->temporalWorkerFactory, $expectedInterceptors);

$this->assertSame($worker, $factory->create('all'));
}
Expand All @@ -132,46 +132,50 @@ public function exceptionInterceptorsDataProvider(): \Traversable
yield ['with-exception-interceptor-as-instance'];
}

private function createWorkerFactory(TemporalWorkerFactory $workerFactory): WorkerFactory
{
private function createWorkerFactory(
TemporalWorkerFactory $workerFactory,
PipelineProvider $pipelineProvider = new SimplePipelineProvider(),
):
WorkerFactory {
$container = new Container();
$container->bind(PipelineProvider::class, SimplePipelineProvider::class);
$container->bind(ExceptionInterceptor::class, new ExceptionInterceptor([]));

$interceptors = [
SomeInterceptor::class,
new SomeInterceptor(),
new Autowire(SomeInterceptor::class)
new Autowire(SomeInterceptor::class),
];

return new WorkerFactory(
$workerFactory,
$this->createMock(FinalizerInterface::class),
$container,
$pipelineProvider,
new TemporalConfig([
'workers' => [
'with-options-as-value' => WorkerOptions::new()->withEnableSessionWorker(),
'with-options-in-array' => [
'options' => WorkerOptions::new()->withEnableSessionWorker()
'options' => WorkerOptions::new()->withEnableSessionWorker(),
],
'with-interceptors' => [
'interceptors' => $interceptors
'interceptors' => $interceptors,
],
'with-exception-interceptor-as-string' => [
'exception_interceptor' => ExceptionInterceptor::class
'exception_interceptor' => ExceptionInterceptor::class,
],
'with-exception-interceptor-as-autowire' => [
'exception_interceptor' => new Autowire(ExceptionInterceptor::class, [])
'exception_interceptor' => new Autowire(ExceptionInterceptor::class, []),
],
'with-exception-interceptor-as-instance' => [
'exception_interceptor' => new ExceptionInterceptor([])
'exception_interceptor' => new ExceptionInterceptor([]),
],
'all' => [
'options' => WorkerOptions::new()->withEnableSessionWorker(),
'interceptors' => $interceptors,
'exception_interceptor' => ExceptionInterceptor::class
]
]
'exception_interceptor' => ExceptionInterceptor::class,
],
],
])
);
}
Expand Down

0 comments on commit a71b80d

Please sign in to comment.