Skip to content

Commit

Permalink
Adds an ability to register interceptors via bootloader
Browse files Browse the repository at this point in the history
  • Loading branch information
butschster committed Dec 25, 2023
1 parent 4e2f44f commit d7999e6
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 9 deletions.
35 changes: 32 additions & 3 deletions src/Bootloader/TemporalBridgeBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
use Spiral\TemporalBridge\WorkersRegistry;
use Spiral\TemporalBridge\WorkersRegistryInterface;
use Spiral\Tokenizer\ClassesInterface;
use Temporal\Client\ClientOptions;
use Temporal\Client\GRPC\ServiceClient;
use Temporal\Client\WorkflowClient;
use Temporal\Client\WorkflowClientInterface;
Expand All @@ -41,6 +40,9 @@
use Temporal\Client\ScheduleClientInterface;
use Temporal\Client\GRPC\ServiceClientInterface;

/**
* @psalm-import-type TInterceptor from TemporalConfig
*/
class TemporalBridgeBootloader extends Bootloader
{
public function defineDependencies(): array
Expand Down Expand Up @@ -69,24 +71,51 @@ public function defineSingletons(): array

public function __construct(
private readonly ConfiguratorInterface $config,
private readonly FactoryInterface $factory,
) {
}

public function init(
AbstractKernel $kernel,
EnvironmentInterface $env,
FactoryInterface $factory,
Dispatcher $dispatcher,
): void {
$this->initConfig($env);

$kernel->addDispatcher($factory->make(Dispatcher::class));
$kernel->addDispatcher($dispatcher);
}

public function addWorkerOptions(string $worker, WorkerOptions $options): void
{
$this->config->modify(TemporalConfig::CONFIG, new Append('workers', $worker, $options));
}

/**
* Register a new Temporal interceptor.
*
* @param TInterceptor $interceptor
*/
public function addInterceptor(string|Interceptor|Autowire $interceptor): void
{
if (\is_string($interceptor)) {
$interceptor = $this->factory->make($interceptor);
} elseif ($interceptor instanceof Autowire) {
$interceptor = $interceptor->resolve($this->factory);
}

if (!$interceptor instanceof Interceptor) {
throw new \InvalidArgumentException(
\sprintf(
'Interceptor must be an instance of `%s`, `%s` given.',
Interceptor::class,
\get_class($interceptor),
),
);
}

$this->config->modify(TemporalConfig::CONFIG, new Append('interceptors', null, $interceptor));
}

protected function initConfig(EnvironmentInterface $env): void
{
$this->config->setDefaults(
Expand Down
5 changes: 3 additions & 2 deletions src/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ final class Dispatcher implements DispatcherInterface
public function __construct(
private readonly RoadRunnerMode $mode,
private readonly ReaderInterface $reader,
private readonly TemporalConfig $config,
private readonly Container $container,
) {
}
Expand All @@ -32,6 +31,8 @@ public function canServe(): bool

public function serve(): void
{
$config = $this->container->get(TemporalConfig::class);

// finds all available workflows, activity types and commands in a given directory
/**
* @var array<class-string<WorkflowInterface>|class-string<ActivityInterface>, ReflectionClass> $declarations
Expand All @@ -45,7 +46,7 @@ public function serve(): void
$hasDeclarations = false;
foreach ($declarations as $type => $declaration) {
// Worker that listens on a task queue and hosts both workflow and activity implementations.
$queueName = $this->resolveQueueName($declaration) ?? $this->config->getDefaultWorker();
$queueName = $this->resolveQueueName($declaration) ?? $config->getDefaultWorker();

$worker = $registry->get($queueName);

Expand Down
70 changes: 69 additions & 1 deletion tests/src/Bootloader/TemporalBridgeBootloaderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

namespace Spiral\TemporalBridge\Tests\Bootloader;

use Mockery as m;
use Spiral\Core\Container\Autowire;
use Spiral\Core\FactoryInterface;
use Spiral\TemporalBridge\Bootloader\TemporalBridgeBootloader;
use Spiral\TemporalBridge\Config\TemporalConfig;
use Spiral\Config\ConfigManager;
Expand All @@ -25,6 +28,7 @@
use Temporal\DataConverter\DataConverterInterface;
use Temporal\Interceptor\SimplePipelineProvider;
use Temporal\Interceptor\PipelineProvider;
use Temporal\Internal\Interceptor\Interceptor;
use Temporal\Worker\WorkerFactoryInterface as TemporalWorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;
use Temporal\WorkerFactory as TemporalWorkerFactory;
Expand Down Expand Up @@ -108,7 +112,7 @@ public function testAddWorkerOptions(): void
$configs = new ConfigManager($this->createMock(LoaderInterface::class));
$configs->setDefaults(TemporalConfig::CONFIG, ['workers' => []]);

$bootloader = new TemporalBridgeBootloader($configs);
$bootloader = new TemporalBridgeBootloader($configs, $this->getContainer());
$bootloader->addWorkerOptions('first', $first = WorkerOptions::new());
$bootloader->addWorkerOptions('second', $second = WorkerOptions::new());

Expand All @@ -117,4 +121,68 @@ public function testAddWorkerOptions(): void
$configs->getConfig(TemporalConfig::CONFIG)['workers'],
);
}

public function testAddInterceptor(): void
{
$configs = new ConfigManager($this->createMock(LoaderInterface::class));
$configs->setDefaults(TemporalConfig::CONFIG, ['interceptors' => []]);

$bootloader = new TemporalBridgeBootloader($configs, $this->getContainer());

$bootloader->addInterceptor($iterceptor = m::mock(Interceptor::class));

$this->assertSame(
[$iterceptor],
$configs->getConfig(TemporalConfig::CONFIG)['interceptors'],
);
}

public function testStringableInterceptor(): void
{
$configs = new ConfigManager($this->createMock(LoaderInterface::class));
$configs->setDefaults(TemporalConfig::CONFIG, ['interceptors' => []]);

$bootloader = new TemporalBridgeBootloader($configs, $factory = m::mock(FactoryInterface::class));

$factory->shouldReceive('make')->with('foo')->andReturn($iterceptor = m::mock(Interceptor::class));

$bootloader->addInterceptor('foo');

$this->assertSame(
[$iterceptor],
$configs->getConfig(TemporalConfig::CONFIG)['interceptors'],
);
}

public function testAutowireInterceptor(): void
{
$configs = new ConfigManager($this->createMock(LoaderInterface::class));
$configs->setDefaults(TemporalConfig::CONFIG, ['interceptors' => []]);

$bootloader = new TemporalBridgeBootloader($configs, $factory = m::mock(FactoryInterface::class));

$factory->shouldReceive('make')->with('foo', ['bar' => 'baz'])->andReturn($iterceptor = m::mock(Interceptor::class));

$bootloader->addInterceptor(new Autowire('foo', ['bar' => 'baz']));

$this->assertSame(
[$iterceptor],
$configs->getConfig(TemporalConfig::CONFIG)['interceptors'],
);
}

public function testInvalidInterceptor(): void
{
$this->expectException(\InvalidArgumentException::class);
$this->expectExceptionMessage('Interceptor must be an instance of `Temporal\Internal\Interceptor\Interceptor`, `stdClass` given.');

$configs = new ConfigManager($this->createMock(LoaderInterface::class));
$configs->setDefaults(TemporalConfig::CONFIG, ['interceptors' => []]);

$bootloader = new TemporalBridgeBootloader($configs, $factory = m::mock(FactoryInterface::class));

$factory->shouldReceive('make')->with('foo')->andReturn(new \StdClass());

$bootloader->addInterceptor('foo');
}
}
3 changes: 0 additions & 3 deletions tests/src/DispatcherTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ protected function setUp(): void
$this->dispatcher = new Dispatcher(
RoadRunnerMode::Temporal,
new AttributeReader(),
new TemporalConfig(['defaultWorker' => 'foo']),
$this->getContainer(),
);

Expand Down Expand Up @@ -72,7 +71,6 @@ public function testServeWithoutDeclarations(): void
$dispatcher = new Dispatcher(
RoadRunnerMode::Temporal,
new AttributeReader(),
new TemporalConfig(),
$this->getContainer(),
);

Expand All @@ -97,7 +95,6 @@ public function testServeWithDeclarations(): void
$dispatcher = new Dispatcher(
RoadRunnerMode::Temporal,
new AttributeReader(),
new TemporalConfig(),
$this->getContainer(),
);

Expand Down

0 comments on commit d7999e6

Please sign in to comment.