diff --git a/.github/workflows/psalm.yml b/.github/workflows/psalm.yml index e6940e4..4c1f9dd 100644 --- a/.github/workflows/psalm.yml +++ b/.github/workflows/psalm.yml @@ -8,8 +8,8 @@ on: name: static analysis jobs: - psalm: - uses: spiral/gh-actions/.github/workflows/psalm.yml@master - with: - os: >- - ['ubuntu-latest'] + psalm: + uses: spiral/gh-actions/.github/workflows/psalm.yml@master + with: + os: >- + ['ubuntu-latest'] diff --git a/.gitignore b/.gitignore index 1a6ed30..29387cb 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ docs vendor node_modules .php-cs-fixer.cache +tests/runtime diff --git a/composer.json b/composer.json index 393d0d9..9532048 100644 --- a/composer.json +++ b/composer.json @@ -24,7 +24,7 @@ "spiral/tokenizer": "^3.0", "spiral/roadrunner-bridge": "^2.0 || ^3.0", "nette/php-generator": "^4.0", - "temporal/sdk": "^1.3 || ^2.0" + "temporal/sdk": "^2.7" }, "require-dev": { "spiral/framework": "^3.0", diff --git a/src/Bootloader/PrototypeBootloader.php b/src/Bootloader/PrototypeBootloader.php index 734ad93..7e93db2 100644 --- a/src/Bootloader/PrototypeBootloader.php +++ b/src/Bootloader/PrototypeBootloader.php @@ -9,7 +9,7 @@ use Spiral\TemporalBridge\WorkflowManagerInterface; use Temporal\Client\WorkflowClientInterface; -class PrototypeBootloader extends Bootloader +final class PrototypeBootloader extends Bootloader { protected const DEPENDENCIES = [ BasePrototypeBootloader::class, diff --git a/src/Bootloader/TemporalBridgeBootloader.php b/src/Bootloader/TemporalBridgeBootloader.php index 7c2fb86..5e6a79e 100644 --- a/src/Bootloader/TemporalBridgeBootloader.php +++ b/src/Bootloader/TemporalBridgeBootloader.php @@ -8,18 +8,21 @@ use Spiral\Boot\AbstractKernel; use Spiral\Boot\Bootloader\Bootloader; use Spiral\Boot\EnvironmentInterface; -use Spiral\Boot\FinalizerInterface; use Spiral\Config\ConfiguratorInterface; use Spiral\Config\Patch\Append; use Spiral\Console\Bootloader\ConsoleBootloader; +use Spiral\Core\Container\Autowire; use Spiral\Core\FactoryInterface; use Spiral\RoadRunnerBridge\Bootloader\RoadRunnerBootloader; use Spiral\TemporalBridge\Commands; use Spiral\TemporalBridge\Config\TemporalConfig; +use Spiral\TemporalBridge\DeclarationLocator; use Spiral\TemporalBridge\DeclarationLocatorInterface; use Spiral\TemporalBridge\Dispatcher; use Spiral\TemporalBridge\Preset\PresetRegistry; use Spiral\TemporalBridge\Preset\PresetRegistryInterface; +use Spiral\TemporalBridge\WorkerFactory; +use Spiral\TemporalBridge\WorkerFactoryInterface; use Spiral\TemporalBridge\WorkersRegistry; use Spiral\TemporalBridge\WorkersRegistryInterface; use Spiral\TemporalBridge\Workflow\WorkflowManager; @@ -33,22 +36,27 @@ use Temporal\Client\WorkflowClientInterface; use Temporal\DataConverter\DataConverter; use Temporal\DataConverter\DataConverterInterface; +use Temporal\Interceptor\PipelineProvider; +use Temporal\Interceptor\SimplePipelineProvider; +use Temporal\Internal\Interceptor\Interceptor; use Temporal\Worker\Transport\Goridge; -use Temporal\Worker\WorkerFactoryInterface; +use Temporal\Worker\WorkerFactoryInterface as TemporalWorkerFactoryInterface; use Temporal\Worker\WorkerOptions; -use Temporal\WorkerFactory; +use Temporal\WorkerFactory as TemporalWorkerFactory; class TemporalBridgeBootloader extends Bootloader { protected const SINGLETONS = [ WorkflowPresetLocatorInterface::class => [self::class, 'initWorkflowPresetLocator'], WorkflowManagerInterface::class => WorkflowManager::class, - WorkerFactoryInterface::class => [self::class, 'initWorkerFactory'], + TemporalWorkerFactoryInterface::class => [self::class, 'initWorkerFactory'], DeclarationLocatorInterface::class => [self::class, 'initDeclarationLocator'], WorkflowClientInterface::class => [self::class, 'initWorkflowClient'], - WorkersRegistryInterface::class => [self::class, 'initWorkersRegistry'], + WorkersRegistryInterface::class => WorkersRegistry::class, PresetRegistryInterface::class => PresetRegistry::class, DataConverterInterface::class => [self::class, 'initDataConverter'], + WorkerFactoryInterface::class => WorkerFactory::class, + PipelineProvider::class => [self::class, 'initPipelineProvider'], ]; protected const DEPENDENCIES = [ @@ -57,7 +65,7 @@ class TemporalBridgeBootloader extends Bootloader ]; public function __construct( - private readonly ConfiguratorInterface $config + private readonly ConfiguratorInterface $config, ) { } @@ -65,7 +73,7 @@ public function init( AbstractKernel $kernel, EnvironmentInterface $env, ConsoleBootloader $console, - FactoryInterface $factory + FactoryInterface $factory, ): void { $this->initConfig($env); @@ -83,7 +91,7 @@ public function addWorkerOptions(string $worker, WorkerOptions $options): void protected function initWorkflowPresetLocator( FactoryInterface $factory, - ClassesInterface $classes + ClassesInterface $classes, ): WorkflowPresetLocatorInterface { return new WorkflowPresetLocator( factory: $factory, @@ -99,20 +107,22 @@ protected function initConfig(EnvironmentInterface $env): void [ 'address' => $env->get('TEMPORAL_ADDRESS', '127.0.0.1:7233'), 'namespace' => 'App\\Workflow', - 'defaultWorker' => (string)$env->get('TEMPORAL_TASK_QUEUE', WorkerFactoryInterface::DEFAULT_TASK_QUEUE), + 'defaultWorker' => (string)$env->get('TEMPORAL_TASK_QUEUE', TemporalWorkerFactory::DEFAULT_TASK_QUEUE), 'workers' => [], - ] + ], ); } protected function initWorkflowClient( TemporalConfig $config, - DataConverterInterface $dataConverter + DataConverterInterface $dataConverter, + PipelineProvider $pipelineProvider, ): WorkflowClientInterface { - return WorkflowClient::create( + return new WorkflowClient( serviceClient: ServiceClient::create($config->getAddress()), options: (new ClientOptions())->withNamespace($config->getTemporalNamespace()), - converter: $dataConverter + converter: $dataConverter, + interceptorProvider: $pipelineProvider, ); } @@ -122,28 +132,35 @@ protected function initDataConverter(): DataConverterInterface } protected function initWorkerFactory( - DataConverterInterface $dataConverter - ): WorkerFactoryInterface { - return new WorkerFactory( + DataConverterInterface $dataConverter, + ): TemporalWorkerFactoryInterface { + return new TemporalWorkerFactory( dataConverter: $dataConverter, rpc: Goridge::create() ); } protected function initDeclarationLocator( - ClassesInterface $classes + ClassesInterface $classes, ): DeclarationLocatorInterface { - return new \Spiral\TemporalBridge\DeclarationLocator( + return new DeclarationLocator( classes: $classes, reader: new AttributeReader() ); } - protected function initWorkersRegistry( - WorkerFactoryInterface $workerFactory, - FinalizerInterface $finalizer, - TemporalConfig $config - ): WorkersRegistryInterface { - return new WorkersRegistry($workerFactory, $finalizer, $config); + protected function initPipelineProvider(TemporalConfig $config, FactoryInterface $factory): PipelineProvider + { + /** @var Interceptor[] $interceptors */ + $interceptors = \array_map( + static fn (mixed $interceptor) => match (true) { + \is_string($interceptor) => $factory->make($interceptor), + $interceptor instanceof Autowire => $interceptor->resolve($factory), + default => $interceptor + }, + $config->getInterceptors(), + ); + + return new SimplePipelineProvider($interceptors); } } diff --git a/src/Config/TemporalConfig.php b/src/Config/TemporalConfig.php index 3a84343..683a941 100644 --- a/src/Config/TemporalConfig.php +++ b/src/Config/TemporalConfig.php @@ -4,10 +4,30 @@ namespace Spiral\TemporalBridge\Config; +use Spiral\Core\Container\Autowire; use Spiral\Core\InjectableConfig; +use Temporal\Exception\ExceptionInterceptorInterface; +use Temporal\Internal\Interceptor\Interceptor; use Temporal\Worker\WorkerFactoryInterface; use Temporal\Worker\WorkerOptions; +/** + * @psalm-type TInterceptor = Interceptor|class-string|Autowire + * @psalm-type TExceptionInterceptor = ExceptionInterceptorInterface|class-string|Autowire + * @psalm-type TWorker = array{ + * options?: WorkerOptions, + * exception_interceptor?: TExceptionInterceptor + * } + * + * @property array{ + * address: non-empty-string, + * namespace: non-empty-string, + * temporalNamespace: non-empty-string, + * defaultWorker: non-empty-string, + * workers: array, + * interceptors?: TInterceptor[] + * } $config + */ final class TemporalConfig extends InjectableConfig { public const CONFIG = 'temporal'; @@ -18,31 +38,54 @@ final class TemporalConfig extends InjectableConfig 'temporalNamespace' => 'default', 'defaultWorker' => WorkerFactoryInterface::DEFAULT_TASK_QUEUE, 'workers' => [], + 'interceptors' => [], ]; + /** + * @return non-empty-string + */ public function getDefaultNamespace(): string { return $this->config['namespace']; } + /** + * @return non-empty-string + */ public function getTemporalNamespace(): string { return $this->config['temporalNamespace']; } + /** + * @return non-empty-string + */ public function getAddress(): string { return $this->config['address']; } + /** + * @return non-empty-string + */ public function getDefaultWorker(): string { return $this->config['defaultWorker']; } - /** @psalm-return array */ + /** + * @return array + */ public function getWorkers(): array { - return (array) $this->config['workers']; + return $this->config['workers'] ?? []; + } + + /** + * @return TInterceptor[] + */ + public function getInterceptors(): array + { + return $this->config['interceptors'] ?? []; } } diff --git a/src/WorkerFactory.php b/src/WorkerFactory.php new file mode 100644 index 0000000..f9ec100 --- /dev/null +++ b/src/WorkerFactory.php @@ -0,0 +1,90 @@ + */ + private array $workers = []; + + public function __construct( + private readonly TemporalWorkerFactory $workerFactory, + private readonly FinalizerInterface $finalizer, + private readonly FactoryInterface $factory, + private readonly PipelineProvider $pipelineProvider, + private readonly TemporalConfig $config, + ) { + $this->workers = $this->config->getWorkers(); + } + + /** + * @param non-empty-string $name + */ + public function create(string $name): WorkerInterface + { + /** @psalm-suppress TooManyArguments */ + $worker = $this->workerFactory->newWorker( + $name, + $this->getWorkerOptions($name), + $this->getExceptionInterceptor($name), + $this->pipelineProvider, + ); + $worker->registerActivityFinalizer(fn () => $this->finalizer->finalize()); + + return $worker; + } + + /** + * @param non-empty-string $name + */ + private function getWorkerOptions(string $name): ?WorkerOptions + { + $worker = $this->workers[$name] ?? null; + + return match (true) { + $worker instanceof WorkerOptions => $worker, + isset($worker['options']) && $worker['options'] instanceof WorkerOptions => $worker['options'], + default => null + }; + } + + /** + * @param non-empty-string $name + */ + private function getExceptionInterceptor(string $name): ?ExceptionInterceptorInterface + { + $worker = $this->workers[$name] ?? null; + if (!\is_array($worker) || !isset($worker['exception_interceptor'])) { + return null; + } + + $exceptionInterceptor = $this->wire($worker['exception_interceptor']); + \assert($exceptionInterceptor instanceof ExceptionInterceptorInterface); + + return $exceptionInterceptor; + } + + private function wire(mixed $alias): object + { + return match (true) { + \is_string($alias) => $this->factory->make($alias), + $alias instanceof Autowire => $alias->resolve($this->factory), + default => $alias + }; + } +} diff --git a/src/WorkerFactoryInterface.php b/src/WorkerFactoryInterface.php new file mode 100644 index 0000000..34aa9c8 --- /dev/null +++ b/src/WorkerFactoryInterface.php @@ -0,0 +1,17 @@ + $options */ public function __construct( - private readonly WorkerFactoryInterface $workerFactory, + private readonly WorkerFactoryInterface|TemporalWorkerFactory $workerFactory, private readonly FinalizerInterface $finalizer, private readonly TemporalConfig $config ) { @@ -34,18 +34,22 @@ public function register(string $name, ?WorkerOptions $options): void ); } - $this->workers[$name] = $this->workerFactory->newWorker($name, $options); - $this->workers[$name]->registerActivityFinalizer(fn() => $this->finalizer->finalize()); + if ($this->workerFactory instanceof WorkerFactoryInterface) { + $this->workers[$name] = $this->workerFactory->create($name); + } else { + $this->workers[$name] = $this->workerFactory->newWorker($name, $options); + $this->workers[$name]->registerActivityFinalizer(fn() => $this->finalizer->finalize()); + } } public function get(string $name): WorkerInterface { \assert($name !== ''); - $options = $this->config->getWorkers(); + $options = $this->config->getWorkers()[$name] ?? null; if (! $this->has($name)) { - $this->register($name, $options[$name] ?? null); + $this->register($name, $options instanceof WorkerOptions ? $options : null); } return $this->workers[$name]; diff --git a/src/Workflow/Workflow.php b/src/Workflow/Workflow.php index 48ef8f7..4899167 100644 --- a/src/Workflow/Workflow.php +++ b/src/Workflow/Workflow.php @@ -6,7 +6,6 @@ use Temporal\Client\WorkflowClientInterface; use Temporal\Client\WorkflowOptions; -use Temporal\Client\WorkflowStubInterface; use Temporal\Common\RetryOptions; use Temporal\Internal\Client\WorkflowProxy; use Temporal\Internal\Support\DateInterval; @@ -44,6 +43,9 @@ public function backoffRetryCoefficient(float $coefficient): self return $this; } + /** + * @param positive-int $attempts + */ public function maxRetryAttempts(int $attempts): self { $this->retryOptions = $this->getRetryOptions() diff --git a/tests/app/src/SomeInterceptor.php b/tests/app/src/SomeInterceptor.php new file mode 100644 index 0000000..ba447ca --- /dev/null +++ b/tests/app/src/SomeInterceptor.php @@ -0,0 +1,15 @@ +assertContainerBoundAsSingleton( WorkflowPresetLocatorInterface::class, @@ -38,7 +42,7 @@ public function testWorkflowPresetLocator() ); } - public function testWorkflowManager() + public function testWorkflowManager(): void { $this->mockContainer(ReaderInterface::class); @@ -48,7 +52,15 @@ public function testWorkflowManager() ); } - public function testWorkerFactory() + public function testTemporalWorkerFactory(): void + { + $this->assertContainerBoundAsSingleton( + TemporalWorkerFactoryInterface::class, + TemporalWorkerFactory::class + ); + } + + public function testWorkerFactory(): void { $this->assertContainerBoundAsSingleton( WorkerFactoryInterface::class, @@ -56,7 +68,7 @@ public function testWorkerFactory() ); } - public function testDataConverter() + public function testDataConverter(): void { $this->assertContainerBoundAsSingleton( DataConverterInterface::class, @@ -64,7 +76,7 @@ public function testDataConverter() ); } - public function testDeclarationLocator() + public function testDeclarationLocator(): void { $this->assertContainerBoundAsSingleton( DeclarationLocatorInterface::class, @@ -72,7 +84,7 @@ public function testDeclarationLocator() ); } - public function testWorkflowClient() + public function testWorkflowClient(): void { $this->assertContainerBoundAsSingleton( WorkflowClientInterface::class, @@ -80,7 +92,7 @@ public function testWorkflowClient() ); } - public function testPresetRegistry() + public function testPresetRegistry(): void { $this->assertContainerBoundAsSingleton( PresetRegistryInterface::class, @@ -96,6 +108,14 @@ public function testWorkersRegistry(): void ); } + public function testPipelineProvider(): void + { + $this->assertContainerBound( + PipelineProvider::class, + SimplePipelineProvider::class + ); + } + public function testAddWorkerOptions(): void { $configs = new ConfigManager($this->createMock(LoaderInterface::class)); diff --git a/tests/src/Config/TemporalConfigTest.php b/tests/src/Config/TemporalConfigTest.php index 6f56142..b9427a1 100644 --- a/tests/src/Config/TemporalConfigTest.php +++ b/tests/src/Config/TemporalConfigTest.php @@ -79,7 +79,25 @@ public function testGetsWorkers(): void { $workers = [ 'first' => WorkerOptions::new(), - 'second' => WorkerOptions::new() + 'second' => WorkerOptions::new(), + 'withOptions' => [ + 'options' => WorkerOptions::new(), + ], + 'withInterceptors' => [ + 'interceptors' => [ + 'foo' + ], + ], + 'withExceptionInterceptor' => [ + 'exception_interceptor' => 'bar' + ], + 'all' => [ + 'options' => WorkerOptions::new(), + 'interceptors' => [ + 'foo' + ], + 'exception_interceptor' => 'bar' + ], ]; $config = new TemporalConfig([ diff --git a/tests/src/WorkerFactoryTest.php b/tests/src/WorkerFactoryTest.php new file mode 100644 index 0000000..813a6e8 --- /dev/null +++ b/tests/src/WorkerFactoryTest.php @@ -0,0 +1,182 @@ +temporalWorkerFactory = $this->createMock(TemporalWorkerFactory::class); + } + + public function testCreateWithoutAnyOptions(): void + { + $this->temporalWorkerFactory + ->expects($this->once()) + ->method('newWorker') + ->with('without-any-options') + ->willReturn($worker = $this->createMock(WorkerInterface::class)); + + $factory = $this->createWorkerFactory($this->temporalWorkerFactory); + + $this->assertSame($worker, $factory->create('without-any-options')); + } + + public function testCreateWithOptionsAsValue(): void + { + $this->temporalWorkerFactory + ->expects($this->once()) + ->method('newWorker') + ->with('with-options-as-value', $this->equalTo(WorkerOptions::new()->withEnableSessionWorker())) + ->willReturn($worker = $this->createMock(WorkerInterface::class)); + + $factory = $this->createWorkerFactory($this->temporalWorkerFactory); + + $this->assertSame($worker, $factory->create('with-options-as-value')); + } + + public function testCreateWithOptionsInArray(): void + { + $this->temporalWorkerFactory + ->expects($this->once()) + ->method('newWorker') + ->with('with-options-in-array', $this->equalTo(WorkerOptions::new()->withEnableSessionWorker())) + ->willReturn($worker = $this->createMock(WorkerInterface::class)); + + $factory = $this->createWorkerFactory($this->temporalWorkerFactory); + + $this->assertSame($worker, $factory->create('with-options-in-array')); + } + + /** + * @dataProvider exceptionInterceptorsDataProvider + */ + public function testCreateWithExceptionInterceptor(string $name): void + { + $this->temporalWorkerFactory + ->expects($this->once()) + ->method('newWorker') + ->with($name, null, $this->equalTo(new ExceptionInterceptor([]))) + ->willReturn($worker = $this->createMock(WorkerInterface::class)); + + $factory = $this->createWorkerFactory($this->temporalWorkerFactory); + + $this->assertSame($worker, $factory->create($name)); + } + + public function testCreateWithInterceptors(): void + { + $expectedInterceptors = new SimplePipelineProvider([ + new SomeInterceptor(), + new SomeInterceptor(), + new SomeInterceptor(), + ]); + + $this->temporalWorkerFactory + ->expects($this->once()) + ->method('newWorker') + ->with('with-interceptors', null, null, $this->equalTo($expectedInterceptors)) + ->willReturn($worker = $this->createMock(WorkerInterface::class)); + + $factory = $this->createWorkerFactory($this->temporalWorkerFactory, $expectedInterceptors); + + $this->assertSame($worker, $factory->create('with-interceptors')); + } + + public function testCreateWithAllOptions(): void + { + $expectedInterceptors = new SimplePipelineProvider([ + new SomeInterceptor(), + new SomeInterceptor(), + new SomeInterceptor(), + ]); + + $this->temporalWorkerFactory + ->expects($this->once()) + ->method('newWorker') + ->with( + 'all', + $this->equalTo(WorkerOptions::new()->withEnableSessionWorker()), + $this->equalTo(new ExceptionInterceptor([])), + $this->equalTo($expectedInterceptors), + ) + ->willReturn($worker = $this->createMock(WorkerInterface::class)); + + $factory = $this->createWorkerFactory($this->temporalWorkerFactory, $expectedInterceptors); + + $this->assertSame($worker, $factory->create('all')); + } + + public function exceptionInterceptorsDataProvider(): \Traversable + { + yield ['with-exception-interceptor-as-string']; + yield ['with-exception-interceptor-as-autowire']; + yield ['with-exception-interceptor-as-instance']; + } + + private function createWorkerFactory( + TemporalWorkerFactory $workerFactory, + PipelineProvider $pipelineProvider = new SimplePipelineProvider(), + ): + WorkerFactory { + $container = new Container(); + $container->bind(PipelineProvider::class, SimplePipelineProvider::class); + $container->bind(ExceptionInterceptor::class, new ExceptionInterceptor([])); + + $interceptors = [ + SomeInterceptor::class, + new SomeInterceptor(), + new Autowire(SomeInterceptor::class), + ]; + + return new WorkerFactory( + $workerFactory, + $this->createMock(FinalizerInterface::class), + $container, + $pipelineProvider, + new TemporalConfig([ + 'workers' => [ + 'with-options-as-value' => WorkerOptions::new()->withEnableSessionWorker(), + 'with-options-in-array' => [ + 'options' => WorkerOptions::new()->withEnableSessionWorker(), + ], + 'with-interceptors' => [ + 'interceptors' => $interceptors, + ], + 'with-exception-interceptor-as-string' => [ + 'exception_interceptor' => ExceptionInterceptor::class, + ], + 'with-exception-interceptor-as-autowire' => [ + 'exception_interceptor' => new Autowire(ExceptionInterceptor::class, []), + ], + 'with-exception-interceptor-as-instance' => [ + 'exception_interceptor' => new ExceptionInterceptor([]), + ], + 'all' => [ + 'options' => WorkerOptions::new()->withEnableSessionWorker(), + 'interceptors' => $interceptors, + 'exception_interceptor' => ExceptionInterceptor::class, + ], + ], + ]) + ); + } +}