Skip to content

Commit

Permalink
Merge pull request #76 from spiral/feature/info-command
Browse files Browse the repository at this point in the history
Adds temporal:info console command
  • Loading branch information
butschster authored Dec 27, 2023
2 parents a55d7b2 + 6cc12a9 commit 38be446
Show file tree
Hide file tree
Showing 19 changed files with 496 additions and 121 deletions.
7 changes: 5 additions & 2 deletions src/Attribute/AssignWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
/**
* @psalm-suppress DeprecatedClass
*/
#[\Attribute(\Attribute::TARGET_CLASS), NamedArgumentConstructor]
#[\Attribute(\Attribute::TARGET_CLASS | \Attribute::IS_REPEATABLE), NamedArgumentConstructor]
final class AssignWorker
{
/**
* @param string $taskQueue Task queue name.
*/
public function __construct(
public readonly string $name,
public readonly string $taskQueue,
) {
}
}
6 changes: 3 additions & 3 deletions src/Bootloader/TemporalBridgeBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class TemporalBridgeBootloader extends Bootloader
public function defineDependencies(): array
{
return [
ConsoleBootloader::class,
RoadRunnerBootloader::class,
ScaffolderBootloader::class,
];
Expand Down Expand Up @@ -78,11 +77,12 @@ public function __construct(
public function init(
AbstractKernel $kernel,
EnvironmentInterface $env,
Dispatcher $dispatcher,
ConsoleBootloader $console,
): void {
$this->initConfig($env);

$kernel->addDispatcher($dispatcher);
$console->addCommand(Commands\InfoCommand::class);
$kernel->addDispatcher($this->factory->make(Dispatcher::class));
}

public function addWorkerOptions(string $worker, WorkerOptions $options): void
Expand Down
103 changes: 103 additions & 0 deletions src/Commands/InfoCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge\Commands;

use Spiral\Boot\DirectoriesInterface;
use Spiral\Console\Attribute\AsCommand;
use Spiral\Console\Attribute\Option;
use Spiral\Console\Command;
use Spiral\TemporalBridge\DeclarationLocatorInterface;
use Spiral\TemporalBridge\DeclarationWorkerResolver;
use Symfony\Component\Console\Helper\TableSeparator;
use Symfony\Component\Console\Output\OutputInterface;
use Temporal\Internal\Declaration\Reader\ActivityReader;
use Temporal\Internal\Declaration\Reader\WorkflowReader;
use Temporal\Workflow\WorkflowInterface;

#[AsCommand(
name: 'temporal:info',
description: 'Show information about registered temporal workflows and activities.',
)]
final class InfoCommand extends Command
{
#[Option(name: 'show-activities', shortcut: 'a', description: 'Show activities.')]
private bool $showActivities = false;

public function perform(
DeclarationLocatorInterface $locator,
DeclarationWorkerResolver $workerResolver,
WorkflowReader $workflowReader,
ActivityReader $activityReader,
DirectoriesInterface $dir,
): int {
$workflows = [];
$activities = [];

foreach ($locator->getDeclarations() as $type => $declaration) {
$taskQueue = $workerResolver->resolve($declaration);

if ($type === WorkflowInterface::class) {
$prototype = $workflowReader->fromClass($declaration->getName());
$workflows[$prototype->getID()] = [
'class' => $declaration->getName(),
'file' => $declaration->getFileName(),
'name' => $prototype->getID(),
'task_queue' => \implode(', ', $taskQueue),
];
} else {
$taskQueueShown = false;

foreach ($activityReader->fromClass($declaration->getName()) as $prototype) {
$activities[$declaration->getName()][$prototype->getID()] = [
'file' => $declaration->getFileName(),
'name' => $prototype->getID(),
'handler' => $declaration->getShortName() . '::' . $prototype->getHandler()->getName(),
'task_queue' => !$taskQueueShown ? \implode(', ', $taskQueue) : '',
];

$taskQueueShown = true;
}
}
}

$rootDir = \realpath($dir->get('root')) . '/';

\assert($this->output instanceof OutputInterface);

$this->output->title('Workflows');

$table = $this->table(['Name', 'Class', 'Task Queue']);
foreach ($workflows as $workflow) {
$table->addRow([
\sprintf('<fg=green>%s</>', $workflow['name']),
$workflow['class'] . "\n" . \sprintf('<fg=blue>%s</>', \str_replace($rootDir, '', $workflow['file'])),
$workflow['task_queue'],
]);
}
$table->render();

if (!$this->showActivities) {
return self::SUCCESS;
}

$this->output->title('Activities');
$table = $this->table(['Name', 'Class', 'Task Queue']);
foreach ($activities as $class => $prototypes) {
foreach ($prototypes as $prototype) {
$table->addRow([
$prototype['name'],
$prototype['handler'],
$prototype['task_queue'],
]);
}
if (\end($activities) !== $prototypes) {
$table->addRow(new TableSeparator());
}
}
$table->render();

return self::SUCCESS;
}
}
8 changes: 8 additions & 0 deletions src/DeclarationLocatorInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@

namespace Spiral\TemporalBridge;

use Temporal\Activity\ActivityInterface;
use Temporal\Workflow\WorkflowInterface;

interface DeclarationLocatorInterface
{
/**
* List of all declarations for workflows and activities.
*
* @return iterable<class-string<WorkflowInterface>|class-string<ActivityInterface>, \ReflectionClass>
*/
public function getDeclarations(): iterable;
}
58 changes: 58 additions & 0 deletions src/DeclarationWorkerResolver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge;

use Spiral\Attributes\ReaderInterface;
use Spiral\TemporalBridge\Attribute\AssignWorker;
use Spiral\TemporalBridge\Config\TemporalConfig;

final class DeclarationWorkerResolver
{
public function __construct(
private readonly ReaderInterface $reader,
private readonly TemporalConfig $config,
) {
}

/**
* Find the worker name for the given workflow or class declaration. If no worker is assigned, the default task
* queue name is returned.
*/
public function resolve(\ReflectionClass $declaration): array
{
$queue = $this->resolveTaskQueues($declaration);

if ($queue !== []) {
return $queue;
}

return [$this->config->getDefaultWorker()];
}

private function resolveTaskQueues(\ReflectionClass $declaration): array
{
$assignWorker = $this->reader->getClassMetadata($declaration, AssignWorker::class);

$workers = [];

foreach ($assignWorker as $worker) {
$workers[] = $worker->taskQueue;
}

if ($workers !== []) {
return $workers;
}

$parents = $declaration->getInterfaceNames();
foreach ($parents as $parent) {
$queueName = $this->resolveTaskQueues(new \ReflectionClass($parent));
if ($queueName !== []) {
return $queueName;
}
}

return [];
}
}
57 changes: 19 additions & 38 deletions src/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@
namespace Spiral\TemporalBridge;

use ReflectionClass;
use Spiral\Attributes\ReaderInterface;
use Spiral\Boot\DispatcherInterface;
use Spiral\Core\Container;
use Spiral\RoadRunnerBridge\RoadRunnerMode;
use Spiral\TemporalBridge\Attribute\AssignWorker;
use Spiral\TemporalBridge\Config\TemporalConfig;
use Temporal\Activity\ActivityInterface;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Workflow\WorkflowInterface;
Expand All @@ -19,8 +16,8 @@ final class Dispatcher implements DispatcherInterface
{
public function __construct(
private readonly RoadRunnerMode $mode,
private readonly ReaderInterface $reader,
private readonly Container $container,
private readonly DeclarationWorkerResolver $workerResolver,
) {
}

Expand All @@ -31,38 +28,41 @@ public function canServe(): bool

public function serve(): void
{
$config = $this->container->get(TemporalConfig::class);

// finds all available workflows, activity types and commands in a given directory
/**
* @var array<class-string<WorkflowInterface>|class-string<ActivityInterface>, ReflectionClass> $declarations
*/
$declarations = $this->container->get(DeclarationLocatorInterface::class)->getDeclarations();

// factory initiates and runs task queue specific activity and workflow workers
/** @var WorkerFactoryInterface $factory */
$factory = $this->container->get(WorkerFactoryInterface::class);
/** @var WorkersRegistryInterface $registry */
$registry = $this->container->get(WorkersRegistryInterface::class);

$hasDeclarations = false;
foreach ($declarations as $type => $declaration) {
// Worker that listens on a task queue and hosts both workflow and activity implementations.
$queueName = $this->resolveQueueName($declaration) ?? $config->getDefaultWorker();
$taskQueues = $this->workerResolver->resolve($declaration);

$worker = $registry->get($queueName);
foreach ($taskQueues as $taskQueue) {
$worker = $registry->get($taskQueue);

if ($type === WorkflowInterface::class) {
// Workflows are stateful. So you need a type to create instances.
$worker->registerWorkflowTypes($declaration->getName());
}
if ($type === WorkflowInterface::class) {
// Workflows are stateful. So you need a type to create instances.
$worker->registerWorkflowTypes($declaration->getName());
}

if ($type === ActivityInterface::class) {
// Workflows are stateful. So you need a type to create instances.
$worker->registerActivity(
$declaration->getName(),
fn(ReflectionClass $class): object => $this->container->make($class->getName()),
);
}

if ($type === ActivityInterface::class) {
// Workflows are stateful. So you need a type to create instances.
$worker->registerActivity(
$declaration->getName(),
fn(ReflectionClass $class): object => $this->container->make($class->getName()),
);
$hasDeclarations = true;
}
$hasDeclarations = true;
}

if (!$hasDeclarations) {
Expand All @@ -72,23 +72,4 @@ public function serve(): void
// start primary loop
$factory->run();
}

private function resolveQueueName(\ReflectionClass $declaration): ?string
{
$assignWorker = $this->reader->firstClassMetadata($declaration, AssignWorker::class);

if ($assignWorker !== null) {
return $assignWorker->name;
}

$parents = $declaration->getInterfaceNames();
foreach ($parents as $parent) {
$queueName = $this->resolveQueueName(new \ReflectionClass($parent));
if ($queueName !== null) {
return $queueName;
}
}

return null;
}
}
2 changes: 1 addition & 1 deletion src/Scaffolder/Declaration/ActivityDeclaration.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public function declare(): void
public function assignWorker(string $worker): void
{
$this->namespace->addUse(AssignWorker::class);
$this->class->addAttribute(AssignWorker::class, ['name' => $worker]);
$this->class->addAttribute(AssignWorker::class, ['taskQueue' => $worker]);
}

public function addMethod(string $name, string $returnType): void
Expand Down
2 changes: 1 addition & 1 deletion src/Scaffolder/Declaration/WorkflowDeclaration.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public function declare(): void
public function assignWorker(string $worker): void
{
$this->namespace->addUse(AssignWorker::class);
$this->class->addAttribute(AssignWorker::class, ['name' => $worker]);
$this->class->addAttribute(AssignWorker::class, ['taskQueue' => $worker]);
}

public function addQueryMethod(string $name, string $returnType): void
Expand Down
2 changes: 1 addition & 1 deletion src/WorkersRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ final class WorkersRegistry implements WorkersRegistryInterface
public function __construct(
private readonly WorkerFactoryInterface|TemporalWorkerFactory $workerFactory,
private readonly FinalizerInterface $finalizer,
private readonly TemporalConfig $config
private readonly TemporalConfig $config,
) {
}

Expand Down
7 changes: 4 additions & 3 deletions src/WorkersRegistryInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,19 @@
interface WorkersRegistryInterface
{
/**
* Register a new temporal worker with given task queue and options
* Register a new temporal worker with given task queue and options.
*
* @throws WorkersRegistryException
*/
public function register(string $name, ?WorkerOptions $options): void;

/**
* Get or create worker by task queue name
* Get or create worker by task queue name.
*/
public function get(string $name): WorkerInterface;

/**
* Check if a worker with given task queue name registered
* Check if a worker with given task queue name registered.
*/
public function has(string $name): bool;
}
2 changes: 1 addition & 1 deletion tests/app/src/SomeActivity.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use Spiral\TemporalBridge\Attribute\AssignWorker;

#[AssignWorker(name: 'worker1')]
#[AssignWorker(taskQueue: 'worker1')]
class SomeActivity
{
}
9 changes: 9 additions & 0 deletions tests/app/src/SomeActivityWithDefaultWorker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge\Tests\App;

class SomeActivityWithDefaultWorker
{
}
Loading

0 comments on commit 38be446

Please sign in to comment.