From f915784f48d6cd3c07bca35a2ef00bb147fa538e Mon Sep 17 00:00:00 2001 From: Evgeniy Moiseenko Date: Thu, 23 Nov 2023 16:52:12 +0300 Subject: [PATCH] Setup psalm level to 2 (#504) --- .gitattributes | 1 + composer.json | 4 +- phpunit.xml.dist | 30 ++++--- psalm.xml | 6 +- src/ExecEvent.php | 14 ++-- src/InvalidJobException.php | 8 +- src/JobEvent.php | 11 ++- src/LogBehavior.php | 5 +- src/PushEvent.php | 4 +- src/Queue.php | 20 +++-- src/RetryableJobInterface.php | 5 +- src/cli/Action.php | 2 +- src/cli/Command.php | 12 +-- src/cli/Queue.php | 2 - src/cli/VerboseBehavior.php | 81 ++++++++++--------- src/cli/WorkerEvent.php | 6 ++ src/closure/Behavior.php | 2 + src/closure/Job.php | 2 +- src/debug/Panel.php | 20 ++--- src/debug/views/detail.php | 2 +- src/drivers/amqp_interop/Command.php | 2 + src/drivers/amqp_interop/Queue.php | 31 ++++--- src/drivers/beanstalk/Command.php | 3 + src/drivers/beanstalk/InfoAction.php | 20 ++++- src/drivers/beanstalk/Queue.php | 5 +- src/drivers/db/Command.php | 2 + src/drivers/db/InfoAction.php | 32 +++++--- src/drivers/db/Queue.php | 79 +++++++++++------- .../db/migrations/M161119140200Queue.php | 3 +- .../migrations/M211218163000JobQueueSize.php | 4 +- src/drivers/file/Command.php | 6 +- src/drivers/file/InfoAction.php | 18 +++-- src/drivers/file/Queue.php | 32 ++++---- src/drivers/gearman/Command.php | 2 + src/drivers/gearman/Queue.php | 15 ++-- src/drivers/redis/Command.php | 2 + src/drivers/redis/InfoAction.php | 2 + src/drivers/redis/Queue.php | 4 + src/drivers/sqs/Command.php | 2 + src/drivers/sqs/Queue.php | 8 +- src/drivers/stomp/Command.php | 2 + src/drivers/stomp/Queue.php | 54 ++++++++----- src/gii/Generator.php | 23 +++--- src/serializers/IgbinarySerializer.php | 2 +- src/serializers/JsonSerializer.php | 8 +- src/serializers/PhpSerializer.php | 2 +- src/serializers/SerializerInterface.php | 2 +- stubs/psalm/BaseYii.php | 18 +++++ tests/app/RetryJob.php | 3 +- tests/app/SimpleJob.php | 2 +- tests/app/benchmark/Controller.php | 2 +- tests/app/config/main.php | 3 + tests/docker-compose.yml | 10 ++- tests/drivers/beanstalk/QueueTest.php | 4 +- tests/drivers/db/TestCase.php | 2 +- tests/drivers/file/QueueTest.php | 2 +- tests/drivers/redis/QueueTest.php | 2 +- tests/drivers/stomp/QueueTest.php | 15 ++-- tests/serializers/TestCase.php | 6 +- 59 files changed, 417 insertions(+), 254 deletions(-) create mode 100644 stubs/psalm/BaseYii.php diff --git a/.gitattributes b/.gitattributes index 6e08b1d55..8cdf150ae 100644 --- a/.gitattributes +++ b/.gitattributes @@ -12,3 +12,4 @@ /phpunit.xml.dist export-ignore /support export-ignore /psalm.xml export-ignore +/stubs export-ignore diff --git a/composer.json b/composer.json index 92d208b46..0da6de72b 100644 --- a/composer.json +++ b/composer.json @@ -31,8 +31,8 @@ "enqueue/amqp-bunny": "^0.10.0", "enqueue/amqp-ext": "^0.10.8", "enqueue/stomp": "^0.10.0", - "pda/pheanstalk": "^v5.0.0", - "aws/aws-sdk-php": ">=2.4", + "pda/pheanstalk": "^5.0.0", + "aws/aws-sdk-php": "3.285.0", "vimeo/psalm": "^5.10.0" }, "suggest": { diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 821636bb0..3feee9238 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -10,17 +10,21 @@ displayDetailsOnSkippedTests="true" executionOrder="random" > - - - ./tests - ./tests/app - ./tests/docker - ./tests/runtime - - - - - ./src - - + + + ./tests + ./tests/app + ./tests/docker + ./tests/runtime + + + + + ./src + + + ./src/debug + ./src/gii + + diff --git a/psalm.xml b/psalm.xml index e19fc2555..c9de05bc3 100644 --- a/psalm.xml +++ b/psalm.xml @@ -1,6 +1,6 @@ + + + + diff --git a/src/ExecEvent.php b/src/ExecEvent.php index 75945cea7..b59b9dafd 100644 --- a/src/ExecEvent.php +++ b/src/ExecEvent.php @@ -10,6 +10,8 @@ namespace yii\queue; +use Throwable; + /** * Exec Event. * @@ -23,23 +25,23 @@ class ExecEvent extends JobEvent * @see Queue::EVENT_AFTER_EXEC * @see Queue::EVENT_AFTER_ERROR */ - public int $attempt; + public int $attempt = 0; /** * @var mixed result of a job execution in case job is done. * @see Queue::EVENT_AFTER_EXEC * @since 2.1.1 */ - public $result; + public mixed $result = null; /** - * @var null|\Exception|\Throwable + * @var null|Throwable * @see Queue::EVENT_AFTER_ERROR * @since 2.1.1 */ - public $error; + public ?Throwable $error = null; /** - * @var null|bool + * @var bool * @see Queue::EVENT_AFTER_ERROR * @since 2.1.1 */ - public $retry; + public bool $retry = true; } diff --git a/src/InvalidJobException.php b/src/InvalidJobException.php index a313dd590..ae345f1d2 100644 --- a/src/InvalidJobException.php +++ b/src/InvalidJobException.php @@ -30,10 +30,10 @@ class InvalidJobException extends Exception * @param Throwable|null $previous */ public function __construct( - private string $serialized, - string $message = '', - int $code = 0, - Throwable $previous = null + private readonly string $serialized, + string $message = '', + int $code = 0, + Throwable $previous = null ) { parent::__construct($message, $code, $previous); diff --git a/src/JobEvent.php b/src/JobEvent.php index 8c56a1db6..e994161ef 100644 --- a/src/JobEvent.php +++ b/src/JobEvent.php @@ -21,14 +21,19 @@ abstract class JobEvent extends Event { /** - * @var Queue * @inheritdoc */ + public $name = ''; + /** + * @var Queue|null|object + * @inheritdoc + * @psalm-suppress PropertyNotSetInConstructor + */ public $sender; /** * @var int|string|null unique id of a job */ - public string|int|null $id; + public string|int|null $id = null; /** * @var Closure|JobInterface|null|mixed */ @@ -36,5 +41,5 @@ abstract class JobEvent extends Event /** * @var int time to reserve in seconds of the job */ - public int $ttr; + public int $ttr = 0; } diff --git a/src/LogBehavior.php b/src/LogBehavior.php index 367cea414..5f44acf4a 100644 --- a/src/LogBehavior.php +++ b/src/LogBehavior.php @@ -12,6 +12,7 @@ use Yii; use yii\base\Behavior; +use yii\base\Component; /** * Log Behavior. @@ -21,7 +22,7 @@ class LogBehavior extends Behavior { /** - * @var Queue + * @var Queue|null|Component * @inheritdoc */ public $owner; @@ -138,7 +139,7 @@ protected function getExecTitle(ExecEvent $event): string { $title = $this->getJobTitle($event); $extra = "attempt: $event->attempt"; - if ($pid = $event->sender->getWorkerPid()) { + if ($pid = $event->sender?->getWorkerPid()) { $extra .= ", PID: $pid"; } return "$title ($extra)"; diff --git a/src/PushEvent.php b/src/PushEvent.php index 947e0a6bd..9064836ea 100644 --- a/src/PushEvent.php +++ b/src/PushEvent.php @@ -20,9 +20,9 @@ class PushEvent extends JobEvent /** * @var int */ - public int $delay; + public int $delay = 0; /** * @var int|string|null */ - public string|int|null $priority; + public string|int|null $priority = null; } diff --git a/src/Queue.php b/src/Queue.php index 280d9918c..93c01f744 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -10,6 +10,7 @@ namespace yii\queue; +use Throwable; use yii\base\Component; use yii\base\InvalidArgumentException; use yii\base\InvalidConfigException; @@ -88,8 +89,6 @@ public function init(): void { parent::init(); - $this->serializer = Instance::ensure($this->serializer, SerializerInterface::class); - if ($this->ttr <= 0) { throw new InvalidConfigException('Default TTR must be greater that zero.'); } @@ -141,7 +140,7 @@ public function priority(int|string|null $value): static * @param JobInterface|mixed $job * @return int|string|null id of a job message */ - public function push($job): int|string|null + public function push(mixed $job): int|string|null { $event = new PushEvent([ 'job' => $job, @@ -174,7 +173,7 @@ public function push($job): int|string|null throw new InvalidArgumentException('Job delay must be positive.'); } - $message = $this->serializer->serialize($event->job); + $message = $this->getSerializer()->serialize($event->job); $event->id = $this->pushMessage($message, $event->ttr, $event->delay, $event->priority); $this->trigger(self::EVENT_AFTER_PUSH, $event); @@ -225,8 +224,9 @@ protected function handleMessage(int|string $id, string $message, int $ttr, int return $this->handleError($event); } try { - $event->result = $event->job->execute($this); - } catch (\Exception|\Throwable $error) { + /** @psalm-suppress PossiblyUndefinedMethod */ + $event->result = $event->job?->execute($this); + } catch (Throwable $error) { $event->error = $error; return $this->handleError($event); } @@ -243,7 +243,7 @@ protected function handleMessage(int|string $id, string $message, int $ttr, int public function unserializeMessage(string $serialized): array { try { - $job = $this->serializer->unserialize($serialized); + $job = $this->getSerializer()->unserialize($serialized); } catch (\Exception $e) { return [null, new InvalidJobException($serialized, $e->getMessage(), 0, $e)]; } @@ -307,4 +307,10 @@ public function isDone(int|string $id): bool * @return int status code */ abstract public function status(int|string $id): int; + + private function getSerializer(): SerializerInterface + { + /** @psalm-var SerializerInterface */ + return Instance::ensure($this->serializer, SerializerInterface::class); + } } diff --git a/src/RetryableJobInterface.php b/src/RetryableJobInterface.php index 08791db77..f61ec269b 100644 --- a/src/RetryableJobInterface.php +++ b/src/RetryableJobInterface.php @@ -10,7 +10,6 @@ namespace yii\queue; -use Exception; use Throwable; /** @@ -27,8 +26,8 @@ public function getTtr(): int; /** * @param int $attempt number - * @param Exception|Throwable $error from last execute of the job + * @param Throwable|null $error from last execute of the job * @return bool */ - public function canRetry(int $attempt, $error): bool; + public function canRetry(int $attempt, ?Throwable $error): bool; } diff --git a/src/cli/Action.php b/src/cli/Action.php index 3ee72af88..5e90ac7b8 100644 --- a/src/cli/Action.php +++ b/src/cli/Action.php @@ -26,7 +26,7 @@ abstract class Action extends BaseAction */ public Queue $queue; /** - * @var Command|ConsoleController + * @inheritdoc */ public $controller; diff --git a/src/cli/Command.php b/src/cli/Command.php index ef993ba53..f5fd0e9da 100644 --- a/src/cli/Command.php +++ b/src/cli/Command.php @@ -124,8 +124,9 @@ public function beforeAction($action): bool if ($this->phpBinary === null) { $this->phpBinary = PHP_BINARY; } - $this->queue->messageHandler = function ($id, $message, $ttr, $attempt) { - return $this->handleMessage($id, $message, (int)$ttr, (int)$attempt); + /** @psalm-suppress MissingClosureReturnType */ + $this->queue->messageHandler = function (int|string|null $id, string $message, int $ttr, int $attempt) { + return $this->handleMessage($id, $message, $ttr, $attempt); }; } @@ -136,14 +137,14 @@ public function beforeAction($action): bool * Executes a job. * The command is internal, and used to isolate a job execution. Manual usage is not provided. * - * @param string|null $id of a message + * @param string $id of a message * @param int $ttr time to reserve * @param int $attempt number * @param int $pid of a worker * @return int exit code * @internal It is used with isolate mode. */ - public function actionExec(?string $id, int $ttr, int $attempt, int $pid): int + public function actionExec(string $id, int $ttr, int $attempt, int $pid): int { if ($this->queue->execute($id, file_get_contents('php://stdin'), $ttr, $attempt, $pid ?: null)) { return self::EXEC_DONE; @@ -164,6 +165,7 @@ public function actionExec(?string $id, int $ttr, int $attempt, int $pid): int protected function handleMessage(int|string|null $id, string $message, ?int $ttr, int $attempt): bool { // Child process command: php yii queue/exec "id" "ttr" "attempt" "pid" + /** @psalm-suppress PossiblyUndefinedArrayOffset */ $cmd = [ $this->phpBinary, $_SERVER['SCRIPT_FILENAME'], @@ -185,7 +187,7 @@ protected function handleMessage(int|string|null $id, string $message, ?int $ttr $process = new Process($cmd, null, null, $message, $ttr); try { - $result = $process->run(function ($type, $buffer) { + $result = $process->run(function (string $type, string $buffer) { if ($type === Process::ERR) { $this->stderr($buffer); } else { diff --git a/src/cli/Queue.php b/src/cli/Queue.php index 5dc67a4ec..9dac8a5bd 100644 --- a/src/cli/Queue.php +++ b/src/cli/Queue.php @@ -70,7 +70,6 @@ abstract class Queue extends BaseQueue implements BootstrapInterface */ protected function getCommandId(): string { - /** @psalm-suppress UndefinedClass */ foreach (Yii::$app->getComponents(false) as $id => $component) { if ($component === $this) { return Inflector::camel2id($id); @@ -103,7 +102,6 @@ protected function runWorker(callable $handler): ?int { $this->_workerPid = getmypid(); /** @var LoopInterface $loop */ - /** @psalm-suppress UndefinedClass */ $loop = Yii::createObject($this->loopConfig, [$this]); $event = new WorkerEvent(['loop' => $loop]); diff --git a/src/cli/VerboseBehavior.php b/src/cli/VerboseBehavior.php index e25d95b90..184930bc7 100644 --- a/src/cli/VerboseBehavior.php +++ b/src/cli/VerboseBehavior.php @@ -11,7 +11,9 @@ namespace yii\queue\cli; use yii\base\Behavior; +use yii\base\Component; use yii\console\Controller; +use yii\helpers\BaseConsole; use yii\helpers\Console; use yii\queue\ExecEvent; use yii\queue\JobInterface; @@ -24,22 +26,23 @@ class VerboseBehavior extends Behavior { /** - * @var Queue + * @var Queue|null|Component */ public $owner; /** * @var Controller + * @psalm-suppress PropertyNotSetInConstructor */ public Controller $command; /** - * @var float timestamp + * @var float|null timestamp */ - private float $jobStartedAt; + private ?float $jobStartedAt = null; /** - * @var int timestamp + * @var int|null timestamp */ - private int $workerStartedAt; + private ?int $workerStartedAt = null; /** * @inheritdoc @@ -61,10 +64,10 @@ public function events(): array public function beforeExec(ExecEvent $event): void { $this->jobStartedAt = microtime(true); - $this->command->stdout(date('Y-m-d H:i:s'), Console::FG_YELLOW); - $this->command->stdout($this->jobTitle($event), Console::FG_GREY); - $this->command->stdout(' - ', Console::FG_YELLOW); - $this->command->stdout('Started', Console::FG_GREEN); + Console::ansiFormat(date('Y-m-d H:i:s'), [BaseConsole::FG_YELLOW]); + Console::ansiFormat($this->jobTitle($event), [BaseConsole::FG_GREY]); + Console::ansiFormat(' - ', [BaseConsole::FG_YELLOW]); + Console::ansiFormat('Started', [BaseConsole::FG_GREEN]); $this->command->stdout(PHP_EOL); } @@ -73,13 +76,13 @@ public function beforeExec(ExecEvent $event): void */ public function afterExec(ExecEvent $event): void { - $this->command->stdout(date('Y-m-d H:i:s'), Console::FG_YELLOW); - $this->command->stdout($this->jobTitle($event), Console::FG_GREY); - $this->command->stdout(' - ', Console::FG_YELLOW); - $this->command->stdout('Done', Console::FG_GREEN); + $this->command->stdout(date('Y-m-d H:i:s'), BaseConsole::FG_YELLOW); + $this->command->stdout($this->jobTitle($event), BaseConsole::FG_GREY); + $this->command->stdout(' - ', BaseConsole::FG_YELLOW); + $this->command->stdout('Done', BaseConsole::FG_GREEN); $duration = number_format(round(microtime(true) - $this->jobStartedAt, 3), 3); - $memory = round(memory_get_peak_usage(false)/1024/1024, 2); - $this->command->stdout(" ($duration s, $memory MiB)", Console::FG_YELLOW); + $memory = round(memory_get_peak_usage()/1024/1024, 2); + $this->command->stdout(" ($duration s, $memory MiB)", BaseConsole::FG_YELLOW); $this->command->stdout(PHP_EOL); } @@ -88,23 +91,25 @@ public function afterExec(ExecEvent $event): void */ public function afterError(ExecEvent $event): void { - $this->command->stdout(date('Y-m-d H:i:s'), Console::FG_YELLOW); - $this->command->stdout($this->jobTitle($event), Console::FG_GREY); - $this->command->stdout(' - ', Console::FG_YELLOW); - $this->command->stdout('Error', Console::BG_RED); + $this->command->stdout(date('Y-m-d H:i:s'), BaseConsole::FG_YELLOW); + $this->command->stdout($this->jobTitle($event), BaseConsole::FG_GREY); + $this->command->stdout(' - ', BaseConsole::FG_YELLOW); + $this->command->stdout('Error', BaseConsole::BG_RED); if ($this->jobStartedAt) { $duration = number_format(round(microtime(true) - $this->jobStartedAt, 3), 3); - $this->command->stdout(" ($duration s)", Console::FG_YELLOW); + $this->command->stdout(" ($duration s)", BaseConsole::FG_YELLOW); + } + if (null !== $event->error) { + $this->command->stdout(PHP_EOL); + $this->command->stdout('> ' . get_class($event->error) . ': ', BaseConsole::FG_RED); + $message = explode("\n", ltrim($event->error->getMessage()), 2)[0]; // First line + $this->command->stdout($message, BaseConsole::FG_GREY); + $this->command->stdout(PHP_EOL); + $this->command->stdout('Stack trace:', BaseConsole::FG_GREY); + $this->command->stdout(PHP_EOL); + $this->command->stdout($event->error->getTraceAsString(), BaseConsole::FG_GREY); + $this->command->stdout(PHP_EOL); } - $this->command->stdout(PHP_EOL); - $this->command->stdout('> ' . get_class($event->error) . ': ', Console::FG_RED); - $message = explode("\n", ltrim($event->error->getMessage()), 2)[0]; // First line - $this->command->stdout($message, Console::FG_GREY); - $this->command->stdout(PHP_EOL); - $this->command->stdout('Stack trace:', Console::FG_GREY); - $this->command->stdout(PHP_EOL); - $this->command->stdout($event->error->getTraceAsString(), Console::FG_GREY); - $this->command->stdout(PHP_EOL); } /** @@ -116,7 +121,7 @@ protected function jobTitle(ExecEvent $event): string { $name = $event->job instanceof JobInterface ? get_class($event->job) : 'unknown job'; $extra = "attempt: $event->attempt"; - if ($pid = $event->sender->getWorkerPid()) { + if ($pid = $event->sender?->getWorkerPid()) { $extra .= ", pid: $pid"; } return " [$event->id] $name ($extra)"; @@ -129,10 +134,10 @@ protected function jobTitle(ExecEvent $event): string public function workerStart(WorkerEvent $event): void { $this->workerStartedAt = time(); - $this->command->stdout(date('Y-m-d H:i:s'), Console::FG_YELLOW); + $this->command->stdout(date('Y-m-d H:i:s'), BaseConsole::FG_YELLOW); $pid = $event->sender->getWorkerPid(); - $this->command->stdout(" [pid: $pid]", Console::FG_GREY); - $this->command->stdout(" - Worker is started\n", Console::FG_GREEN); + $this->command->stdout(" [pid: $pid]", BaseConsole::FG_GREY); + $this->command->stdout(" - Worker is started\n", BaseConsole::FG_GREEN); } /** @@ -141,12 +146,12 @@ public function workerStart(WorkerEvent $event): void */ public function workerStop(WorkerEvent $event): void { - $this->command->stdout(date('Y-m-d H:i:s'), Console::FG_YELLOW); + $this->command->stdout(date('Y-m-d H:i:s'), BaseConsole::FG_YELLOW); $pid = $event->sender->getWorkerPid(); - $this->command->stdout(" [pid: $pid]", Console::FG_GREY); - $this->command->stdout(' - Worker is stopped ', Console::FG_GREEN); + $this->command->stdout(" [pid: $pid]", BaseConsole::FG_GREY); + $this->command->stdout(' - Worker is stopped ', BaseConsole::FG_GREEN); $duration = $this->formatDuration(time() - $this->workerStartedAt); - $this->command->stdout("($duration)\n", Console::FG_YELLOW); + $this->command->stdout("($duration)\n", BaseConsole::FG_YELLOW); } /** @@ -154,7 +159,7 @@ public function workerStop(WorkerEvent $event): void * @return string * @since 2.0.2 */ - protected function formatDuration($value): string + protected function formatDuration(int $value): string { $seconds = $value % 60; $value = ($value - $seconds) / 60; diff --git a/src/cli/WorkerEvent.php b/src/cli/WorkerEvent.php index eb92a5af1..6dc4bf3c6 100644 --- a/src/cli/WorkerEvent.php +++ b/src/cli/WorkerEvent.php @@ -20,13 +20,19 @@ */ class WorkerEvent extends Event { + /** + * @inheritdoc + */ + public $name; /** * @var Queue * @inheritdoc + * @psalm-suppress PropertyNotSetInConstructor, NonInvariantDocblockPropertyType */ public $sender; /** * @var LoopInterface + * @psalm-suppress PropertyNotSetInConstructor */ public LoopInterface $loop; /** diff --git a/src/closure/Behavior.php b/src/closure/Behavior.php index a83f82bdf..dd0fe3183 100644 --- a/src/closure/Behavior.php +++ b/src/closure/Behavior.php @@ -33,6 +33,8 @@ class Behavior extends \yii\base\Behavior { /** * @var Queue + * @psalm-suppress NonInvariantDocblockPropertyType + * @psalm-suppress PropertyNotSetInConstructor */ public $owner; diff --git a/src/closure/Job.php b/src/closure/Job.php index ec4ed605e..5bcab6984 100644 --- a/src/closure/Job.php +++ b/src/closure/Job.php @@ -24,7 +24,7 @@ class Job implements JobInterface /** * @var string serialized closure */ - public string $serialized; + public string $serialized = ''; /** * Unserializes and executes a closure. diff --git a/src/debug/Panel.php b/src/debug/Panel.php index 11bee1130..bb35b86d3 100644 --- a/src/debug/Panel.php +++ b/src/debug/Panel.php @@ -10,6 +10,7 @@ namespace yii\queue\debug; +use Exception; use Yii; use yii\base\NotSupportedException; use yii\base\ViewContextInterface; @@ -25,12 +26,12 @@ */ class Panel extends \yii\debug\Panel implements ViewContextInterface { - private $_jobs = []; + private array $_jobs = []; /** * @inheritdoc */ - public function getName() + public function getName(): string { return 'Queue'; } @@ -38,7 +39,7 @@ public function getName() /** * @inheritdoc */ - public function init() + public function init(): void { parent::init(); PushEvent::on(Queue::class, Queue::EVENT_AFTER_PUSH, function (PushEvent $event) { @@ -50,7 +51,7 @@ public function init() * @param PushEvent $event * @return array */ - protected function getPushData(PushEvent $event) + protected function getPushData(PushEvent $event): array { $data = []; foreach (Yii::$app->getComponents(false) as $id => $component) { @@ -87,7 +88,7 @@ public function save() /** * @inheritdoc */ - public function getViewPath() + public function getViewPath(): string { return __DIR__ . '/views'; } @@ -95,7 +96,7 @@ public function getViewPath() /** * @inheritdoc */ - public function getSummary() + public function getSummary(): string { return Yii::$app->view->render('summary', [ 'url' => $this->getUrl(), @@ -106,9 +107,9 @@ public function getSummary() /** * @inheritdoc */ - public function getDetail() + public function getDetail(): string { - $jobs = isset($this->data['jobs']) ? $this->data['jobs'] : []; + $jobs = $this->data['jobs'] ?? []; foreach ($jobs as &$job) { $job['status'] = 'unknown'; /** @var Queue $queue */ @@ -121,8 +122,7 @@ public function getDetail() } elseif ($queue->isDone($job['id'])) { $job['status'] = 'done'; } - } catch (NotSupportedException $e) { - } catch (\Exception $e) { + } catch (NotSupportedException|Exception $e) { $job['status'] = $e->getMessage(); } } diff --git a/src/debug/views/detail.php b/src/debug/views/detail.php index f031d4e14..2957a4548 100644 --- a/src/debug/views/detail.php +++ b/src/debug/views/detail.php @@ -18,7 +18,7 @@

Pushed jobs

-
+

diff --git a/src/drivers/amqp_interop/Command.php b/src/drivers/amqp_interop/Command.php index e1bbe5ad9..e2007fd27 100644 --- a/src/drivers/amqp_interop/Command.php +++ b/src/drivers/amqp_interop/Command.php @@ -23,6 +23,8 @@ class Command extends CliCommand { /** * @var Queue + * @psalm-suppress NonInvariantDocblockPropertyType + * @psalm-suppress PropertyNotSetInConstructor */ public CliQueue $queue; diff --git a/src/drivers/amqp_interop/Queue.php b/src/drivers/amqp_interop/Queue.php index c7f25b0c9..d38ccaaa9 100644 --- a/src/drivers/amqp_interop/Queue.php +++ b/src/drivers/amqp_interop/Queue.php @@ -325,9 +325,10 @@ public function listen(): void $this->open(); $this->setupBroker(); - $queue = $this->context->createQueue($this->queueName); - $consumer = $this->context->createConsumer($queue); + $queue = $this->getContext()->createQueue($this->queueName); + $consumer = $this->getContext()->createConsumer($queue); + /** @psalm-suppress MissingClosureReturnType */ $callback = function (AmqpMessage $message, AmqpConsumer $consumer) { if ($message->isRedelivered()) { $consumer->acknowledge($message); @@ -339,8 +340,12 @@ public function listen(): void $ttr = $message->getProperty(self::TTR); $attempt = $message->getProperty(self::ATTEMPT, 1); + $messageId = $message->getMessageId(); - if ($this->handleMessage($message->getMessageId(), $message->getBody(), $ttr, $attempt)) { + if ( + null !== $messageId + && $this->handleMessage($messageId, $message->getBody(), $ttr, $attempt) + ) { $consumer->acknowledge($message); } else { $consumer->acknowledge($message); @@ -351,18 +356,19 @@ public function listen(): void return true; }; - $subscriptionConsumer = $this->context->createSubscriptionConsumer(); + $subscriptionConsumer = $this->getContext()->createSubscriptionConsumer(); $subscriptionConsumer->subscribe($consumer, $callback); $subscriptionConsumer->consume(); } /** - * @return AmqpContext|null + * @return AmqpContext */ - public function getContext(): ?AmqpContext + public function getContext(): AmqpContext { $this->open(); + /** @psalm-var AmqpContext */ return $this->context; } @@ -374,10 +380,10 @@ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $pri $this->open(); $this->setupBroker(); - $topic = $this->context->createTopic($this->exchangeName); + $topic = $this->getContext()->createTopic($this->exchangeName); /** @var AmqpMessage $message */ - $message = $this->context->createMessage($payload); + $message = $this->getContext()->createMessage($payload); $message->setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT); $message->setMessageId(uniqid('', true)); $message->setTimestamp(time()); @@ -389,7 +395,7 @@ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $pri ] )); - $producer = $this->context->createProducer(); + $producer = $this->getContext()->createProducer(); if ($delay) { $message->setProperty(self::DELAY, $delay); @@ -423,6 +429,7 @@ public function status($id): int */ protected function open(): void { + /** @psalm-suppress RedundantConditionGivenDocblockType */ if ($this->context) { return; } @@ -505,6 +512,9 @@ protected function setupBroker(): void */ protected function close(): void { + /** + * @psalm-suppress DocblockTypeContradiction + */ if (!$this->context) { return; } @@ -535,6 +545,7 @@ protected function redeliver(AmqpMessage $message): void private function createQueue(): AmqpQueue { - return $this->context->createQueue($this->queueName); + /** @psalm-var AmqpQueue */ + return $this->getContext()->createQueue($this->queueName); } } diff --git a/src/drivers/beanstalk/Command.php b/src/drivers/beanstalk/Command.php index 6360032ac..757a50052 100644 --- a/src/drivers/beanstalk/Command.php +++ b/src/drivers/beanstalk/Command.php @@ -23,6 +23,9 @@ class Command extends CliCommand { /** * @var Queue + * @psalm-suppress NonInvariantPropertyType + * @psalm-suppress PropertyNotSetInConstructor + * @psalm-suppress NonInvariantDocblockPropertyType */ public CliQueue $queue; /** diff --git a/src/drivers/beanstalk/InfoAction.php b/src/drivers/beanstalk/InfoAction.php index c8951f282..2d320d934 100644 --- a/src/drivers/beanstalk/InfoAction.php +++ b/src/drivers/beanstalk/InfoAction.php @@ -10,6 +10,8 @@ namespace yii\queue\beanstalk; +use Throwable; +use yii\helpers\BaseConsole; use yii\helpers\Console; use yii\queue\cli\Action; use yii\queue\cli\Queue as CliQueue; @@ -23,6 +25,7 @@ class InfoAction extends Action { /** * @var Queue + * @psalm-suppress NonInvariantDocblockPropertyType */ public CliQueue $queue; @@ -31,11 +34,20 @@ class InfoAction extends Action */ public function run(): void { - Console::output($this->format('Statistical information about the tube:', Console::FG_GREEN)); + Console::output( + $this->format('Statistical information about the tube:', BaseConsole::FG_GREEN) + ); - foreach ($this->queue->getStatsTube() as $key => $value) { - Console::stdout($this->format("- $key: ", Console::FG_YELLOW)); - Console::output($value); + try { + /** @psalm-suppress RawObjectIteration */ + foreach ($this->queue->getStatsTube() as $key => $value) { + Console::stdout($this->format("- $key: ", BaseConsole::FG_YELLOW)); + Console::output($value); + } + } catch (Throwable) { + Console::stdout( + $this->format('Tube not found or empty', BaseConsole::FG_RED) + ); } } } diff --git a/src/drivers/beanstalk/Queue.php b/src/drivers/beanstalk/Queue.php index 384347fa3..e3d7ec4f4 100644 --- a/src/drivers/beanstalk/Queue.php +++ b/src/drivers/beanstalk/Queue.php @@ -18,6 +18,7 @@ use Pheanstalk\Values\Timeout; use Pheanstalk\Values\TubeName; use Pheanstalk\Values\TubeStats; +use Throwable; use yii\base\InvalidArgumentException; use yii\queue\cli\Queue as CliQueue; @@ -114,7 +115,7 @@ public function status($id): int } return self::STATUS_WAITING; - } catch (\Throwable) { + } catch (Throwable) { return self::STATUS_DONE; } } @@ -131,7 +132,7 @@ public function remove(int|string $id): bool try { $this->getPheanstalk()->delete(new JobId($id)); return true; - } catch (\Throwable) { + } catch (Throwable) { return false; } } diff --git a/src/drivers/db/Command.php b/src/drivers/db/Command.php index 7ce572a13..f06af1347 100644 --- a/src/drivers/db/Command.php +++ b/src/drivers/db/Command.php @@ -23,6 +23,8 @@ class Command extends CliCommand { /** * @var Queue + * @psalm-suppress NonInvariantDocblockPropertyType + * @psalm-suppress PropertyNotSetInConstructor */ public CliQueue $queue; /** diff --git a/src/drivers/db/InfoAction.php b/src/drivers/db/InfoAction.php index c0859e043..43007526f 100644 --- a/src/drivers/db/InfoAction.php +++ b/src/drivers/db/InfoAction.php @@ -10,7 +10,9 @@ namespace yii\queue\db; +use yii\db\Connection; use yii\db\Query; +use yii\helpers\BaseConsole; use yii\helpers\Console; use yii\queue\cli\Action; use yii\queue\cli\Queue as CliQueue; @@ -24,6 +26,8 @@ class InfoAction extends Action { /** * @var Queue + * @psalm-suppress NonInvariantDocblockPropertyType + * @psalm-suppress PropertyNotSetInConstructor */ public CliQueue $queue; @@ -32,25 +36,27 @@ class InfoAction extends Action */ public function run(): void { - Console::output($this->format('Jobs', Console::FG_GREEN)); + Console::output($this->format('Jobs', BaseConsole::FG_GREEN)); + /** @var Connection $db */ + $db = $this->queue->db; - Console::stdout($this->format('- waiting: ', Console::FG_YELLOW)); - Console::output($this->getWaiting()->count('*', $this->queue->db)); + Console::stdout($this->format('- waiting: ', BaseConsole::FG_YELLOW)); + Console::output((string)$this->getWaiting()->count('*', $db)); - Console::stdout($this->format('- delayed: ', Console::FG_YELLOW)); - Console::output($this->getDelayed()->count('*', $this->queue->db)); + Console::stdout($this->format('- delayed: ', BaseConsole::FG_YELLOW)); + Console::output((string)$this->getDelayed()->count('*', $db)); - Console::stdout($this->format('- reserved: ', Console::FG_YELLOW)); - Console::output($this->getReserved()->count('*', $this->queue->db)); + Console::stdout($this->format('- reserved: ', BaseConsole::FG_YELLOW)); + Console::output((string)$this->getReserved()->count('*', $db)); - Console::stdout($this->format('- done: ', Console::FG_YELLOW)); - Console::output($this->getDone()->count('*', $this->queue->db)); + Console::stdout($this->format('- done: ', BaseConsole::FG_YELLOW)); + Console::output((string)$this->getDone()->count('*', $db)); } /** * @return Query */ - protected function getWaiting() + protected function getWaiting(): Query { return (new Query()) ->from($this->queue->tableName) @@ -62,7 +68,7 @@ protected function getWaiting() /** * @return Query */ - protected function getDelayed() + protected function getDelayed(): Query { return (new Query()) ->from($this->queue->tableName) @@ -74,7 +80,7 @@ protected function getDelayed() /** * @return Query */ - protected function getReserved() + protected function getReserved(): Query { return (new Query()) ->from($this->queue->tableName) @@ -86,7 +92,7 @@ protected function getReserved() /** * @return Query */ - protected function getDone() + protected function getDone(): Query { return (new Query()) ->from($this->queue->tableName) diff --git a/src/drivers/db/Queue.php b/src/drivers/db/Queue.php index aade95bda..8b82364da 100644 --- a/src/drivers/db/Queue.php +++ b/src/drivers/db/Queue.php @@ -28,11 +28,11 @@ class Queue extends CliQueue /** * @var Connection|array|string */ - public $db = 'db'; + public Connection|string|array $db = 'db'; /** * @var Mutex|array|string */ - public $mutex = 'mutex'; + public Mutex|string|array $mutex = 'mutex'; /** * @var int timeout */ @@ -54,13 +54,17 @@ class Queue extends CliQueue */ public string $commandClass = Command::class; + protected int $reserveTime = 0; + /** * @inheritdoc */ public function init(): void { parent::init(); + /** @psalm-suppress PropertyTypeCoercion */ $this->db = Instance::ensure($this->db, Connection::class); + /** @psalm-suppress PropertyTypeCoercion */ $this->mutex = Instance::ensure($this->mutex, Mutex::class); } @@ -68,7 +72,7 @@ public function init(): void * Listens queue and runs each job. * * @param bool $repeat whether to continue listening when queue is empty. - * @param int $timeout number of seconds to sleep before next iteration. + * @param int<0, max> $timeout number of seconds to sleep before next iteration. * @return null|int exit code. * @internal for worker command only * @since 2.0.2 @@ -98,12 +102,12 @@ public function run(bool $repeat, int $timeout = 0) /** * @inheritdoc */ - public function status($id): int + public function status(int|string $id): int { $payload = (new Query()) ->from($this->tableName) ->where(['id' => $id]) - ->one($this->db); + ->one($this->getDb()); if (!$payload) { if ($this->deleteReleased) { @@ -113,11 +117,11 @@ public function status($id): int throw new InvalidArgumentException("Unknown message ID: $id."); } - if (!$payload['reserved_at']) { + if (!isset($payload['reserved_at'])) { return self::STATUS_WAITING; } - if (!$payload['done_at']) { + if (!isset($payload['done_at'])) { return self::STATUS_RESERVED; } @@ -131,7 +135,7 @@ public function status($id): int */ public function clear(): void { - $this->db->createCommand() + $this->getDb()->createCommand() ->delete($this->tableName, ['channel' => $this->channel]) ->execute(); } @@ -143,9 +147,9 @@ public function clear(): void * @return bool * @since 2.0.1 */ - public function remove($id) + public function remove(int $id): bool { - return (bool) $this->db->createCommand() + return (bool) $this->getDb()->createCommand() ->delete($this->tableName, ['channel' => $this->channel, 'id' => $id]) ->execute(); } @@ -155,7 +159,7 @@ public function remove($id) */ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $priority): int|string|null { - $this->db->createCommand()->insert($this->tableName, [ + $this->getDb()->createCommand()->insert($this->tableName, [ 'channel' => $this->channel, 'job' => $payload, 'pushed_at' => time(), @@ -163,8 +167,11 @@ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $pri 'delay' => $delay, 'priority' => $priority ?: 1024, ])->execute(); - $tableSchema = $this->db->getTableSchema($this->tableName); - return $this->db->getLastInsertID($tableSchema->sequenceName); + $tableSchema = $this->getDb()->getTableSchema($this->tableName); + if (null === $tableSchema) { + return null; + } + return $this->getDb()->getLastInsertID($tableSchema->sequenceName??''); } /** @@ -173,10 +180,10 @@ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $pri * @return array|false payload * @throws Exception in case it hasn't waited the lock */ - protected function reserve() + protected function reserve(): bool|array { - return $this->db->useMaster(function () { - if (!$this->mutex->acquire(__CLASS__ . $this->channel, $this->mutexTimeout)) { + return $this->getDb()->useMaster(function () { + if (!$this->getMutex()->acquire(__CLASS__ . $this->channel, $this->mutexTimeout)) { throw new Exception('Has not waited the lock.'); } @@ -190,11 +197,11 @@ protected function reserve() ->andWhere('[[pushed_at]] <= :time - [[delay]]', [':time' => time()]) ->orderBy(['priority' => SORT_ASC, 'id' => SORT_ASC]) ->limit(1) - ->one($this->db); + ->one($this->getDb()); if (is_array($payload)) { $payload['reserved_at'] = time(); $payload['attempt'] = (int) $payload['attempt'] + 1; - $this->db->createCommand()->update($this->tableName, [ + $this->getDb()->createCommand()->update($this->tableName, [ 'reserved_at' => $payload['reserved_at'], 'attempt' => $payload['attempt'], ], [ @@ -207,27 +214,25 @@ protected function reserve() } } } finally { - $this->mutex->release(__CLASS__ . $this->channel); + $this->getMutex()->release(__CLASS__ . $this->channel); } return $payload; }); } - - /** * @param array $payload */ - protected function release($payload) + protected function release(array $payload): void { if ($this->deleteReleased) { - $this->db->createCommand()->delete( + $this->getDb()->createCommand()->delete( $this->tableName, ['id' => $payload['id']] )->execute(); } else { - $this->db->createCommand()->update( + $this->getDb()->createCommand()->update( $this->tableName, ['done_at' => time()], ['id' => $payload['id']] @@ -238,11 +243,11 @@ protected function release($payload) /** * Moves expired messages into waiting list. */ - protected function moveExpired() + protected function moveExpired(): void { if ($this->reserveTime !== time()) { $this->reserveTime = time(); - $this->db->createCommand()->update( + $this->getDb()->createCommand()->update( $this->tableName, ['reserved_at' => null], // `reserved_at IS NOT NULL` forces db to use index on column, @@ -253,5 +258,25 @@ protected function moveExpired() } } - protected $reserveTime; + private function getDb(): Connection + { + /** @var Connection $dbConnection */ + $dbConnection = $this->db; + if (is_string($this->db) || is_array($this->db)) { + /** @psalm-suppress PropertyTypeCoercion */ + $this->db = Instance::ensure($this->db, Connection::class); + } + return $dbConnection; + } + + private function getMutex(): Mutex + { + /** @var Mutex $mutex */ + $mutex = $this->mutex; + if (is_string($this->mutex) || is_array($this->mutex)) { + /** @psalm-suppress PropertyTypeCoercion */ + $this->mutex = Instance::ensure($this->mutex, Mutex::class); + } + return $mutex; + } } diff --git a/src/drivers/db/migrations/M161119140200Queue.php b/src/drivers/db/migrations/M161119140200Queue.php index 9db693284..14b8adfda 100644 --- a/src/drivers/db/migrations/M161119140200Queue.php +++ b/src/drivers/db/migrations/M161119140200Queue.php @@ -20,7 +20,6 @@ class M161119140200Queue extends Migration { public string $tableName = '{{%queue}}'; - public $tableOptions; public function up(): void { @@ -31,7 +30,7 @@ public function up(): void 'created_at' => $this->integer()->notNull(), 'started_at' => $this->integer(), 'finished_at' => $this->integer(), - ], $this->tableOptions); + ]); $this->createIndex('channel', $this->tableName, 'channel'); $this->createIndex('started_at', $this->tableName, 'started_at'); diff --git a/src/drivers/db/migrations/M211218163000JobQueueSize.php b/src/drivers/db/migrations/M211218163000JobQueueSize.php index db9e259c6..04cd2b897 100644 --- a/src/drivers/db/migrations/M211218163000JobQueueSize.php +++ b/src/drivers/db/migrations/M211218163000JobQueueSize.php @@ -24,14 +24,14 @@ class M211218163000JobQueueSize extends Migration public function up(): void { if ($this->db->driverName === 'mysql') { - $this->alterColumn('{{%queue}}', 'job', 'LONGBLOB NOT NULL'); + $this->alterColumn($this->tableName, 'job', 'LONGBLOB NOT NULL'); } } public function down(): void { if ($this->db->driverName === 'mysql') { - $this->alterColumn('{{%queue}}', 'job', $this->binary()->notNull()); + $this->alterColumn($this->tableName, 'job', $this->binary()->notNull()); } } } diff --git a/src/drivers/file/Command.php b/src/drivers/file/Command.php index b6b8bcd9f..222b644b0 100644 --- a/src/drivers/file/Command.php +++ b/src/drivers/file/Command.php @@ -23,6 +23,8 @@ class Command extends CliCommand { /** * @var Queue + * @psalm-suppress NonInvariantDocblockPropertyType + * @psalm-suppress PropertyNotSetInConstructor */ public CliQueue $queue; /** @@ -95,9 +97,9 @@ public function actionClear(): void * @throws Exception when the job is not found. * @since 2.0.1 */ - public function actionRemove($id) + public function actionRemove(int $id): void { - if (!$this->queue->remove((int) $id)) { + if (!$this->queue->remove($id)) { throw new Exception('The job is not found.'); } } diff --git a/src/drivers/file/InfoAction.php b/src/drivers/file/InfoAction.php index cd4b649a4..8c786fdec 100644 --- a/src/drivers/file/InfoAction.php +++ b/src/drivers/file/InfoAction.php @@ -10,6 +10,7 @@ namespace yii\queue\file; +use yii\helpers\BaseConsole; use yii\helpers\Console; use yii\queue\cli\Action; use yii\queue\cli\Queue as CliQueue; @@ -23,6 +24,8 @@ class InfoAction extends Action { /** * @var Queue + * @psalm-suppress NonInvariantDocblockPropertyType + * @psalm-suppress PropertyNotSetInConstructor */ public CliQueue $queue; @@ -31,18 +34,18 @@ class InfoAction extends Action */ public function run(): void { - Console::output($this->format('Jobs', Console::FG_GREEN)); + Console::output($this->format('Jobs', BaseConsole::FG_GREEN)); - Console::stdout($this->format('- waiting: ', Console::FG_YELLOW)); + Console::stdout($this->format('- waiting: ', BaseConsole::FG_YELLOW)); Console::output((string)$this->getWaitingCount()); - Console::stdout($this->format('- delayed: ', Console::FG_YELLOW)); + Console::stdout($this->format('- delayed: ', BaseConsole::FG_YELLOW)); Console::output((string)$this->getDelayedCount()); - Console::stdout($this->format('- reserved: ', Console::FG_YELLOW)); + Console::stdout($this->format('- reserved: ', BaseConsole::FG_YELLOW)); Console::output((string)$this->getReservedCount()); - Console::stdout($this->format('- done: ', Console::FG_YELLOW)); + Console::stdout($this->format('- done: ', BaseConsole::FG_YELLOW)); Console::output((string)$this->getDoneCount()); } @@ -83,7 +86,10 @@ protected function getDoneCount(): int return $total - $this->getDelayedCount() - $this->getWaitingCount(); } - protected function getIndexData() + /** + * @return array|mixed + */ + protected function getIndexData(): mixed { static $data; if ($data === null) { diff --git a/src/drivers/file/Queue.php b/src/drivers/file/Queue.php index 76326b47b..b415972ad 100644 --- a/src/drivers/file/Queue.php +++ b/src/drivers/file/Queue.php @@ -55,9 +55,12 @@ class Queue extends CliQueue public function init(): void { parent::init(); - $this->path = Yii::getAlias($this->path); + $alias = Yii::getAlias($this->path); + if (false !== $alias) { + $this->path = $alias; + } if (!is_dir($this->path)) { - FileHelper::createDirectory($this->path, $this->dirMode, true); + FileHelper::createDirectory($this->path, $this->dirMode); } } @@ -65,7 +68,7 @@ public function init(): void * Listens queue and runs each job. * * @param bool $repeat whether to continue listening when queue is empty. - * @param int $timeout number of seconds to sleep before next iteration. + * @param int<0, max> $timeout number of seconds to sleep before next iteration. * @return null|int exit code. * @internal for worker command only. * @since 2.0.2 @@ -111,8 +114,7 @@ public function status($id): int */ public function clear(): void { - $this->touchIndex(function (&$data) { - $data = []; + $this->touchIndex(function () { foreach (glob("$this->path/job*.data") as $fileName) { unlink($fileName); } @@ -126,10 +128,10 @@ public function clear(): void * @return bool * @since 2.0.1 */ - public function remove($id) + public function remove(int $id): bool { $removed = false; - $this->touchIndex(function (&$data) use ($id, &$removed) { + $this->touchIndex(function (array &$data) use ($id, &$removed) { if (!empty($data['waiting'])) { foreach ($data['waiting'] as $key => $payload) { if ($payload[0] === $id) { @@ -170,16 +172,16 @@ public function remove($id) * * @return array|null payload */ - protected function reserve() + protected function reserve(): ?array { $id = null; $ttr = null; $attempt = null; - $this->touchIndex(function (&$data) use (&$id, &$ttr, &$attempt) { + $this->touchIndex(function (array &$data) use (&$id, &$ttr, &$attempt) { if (!empty($data['reserved'])) { foreach ($data['reserved'] as $key => $payload) { if ($payload[1] + $payload[3] < time()) { - list($id, $ttr, $attempt, $time) = $payload; + [$id, $ttr, $attempt, $time] = $payload; $data['reserved'][$key][2] = ++$attempt; $data['reserved'][$key][3] = time(); return; @@ -188,7 +190,7 @@ protected function reserve() } if (!empty($data['delayed']) && $data['delayed'][0][2] <= time()) { - list($id, $ttr, $time) = array_shift($data['delayed']); + [$id, $ttr, $time] = array_shift($data['delayed']); } elseif (!empty($data['waiting'])) { [$id, $ttr] = array_shift($data['waiting']); } @@ -213,7 +215,7 @@ protected function reserve() protected function delete(array $payload): void { $id = $payload[0]; - $this->touchIndex(function (&$data) use ($id) { + $this->touchIndex(function (array &$data) use ($id) { foreach ($data['reserved'] as $key => $payload) { if ($payload[0] === $id) { unset($data['reserved'][$key]); @@ -233,7 +235,7 @@ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $pri throw new NotSupportedException('Job priority is not supported in the driver.'); } - $this->touchIndex(function (&$data) use ($payload, $ttr, $delay, &$id) { + $this->touchIndex(function (array &$data) use ($payload, $ttr, $delay, &$id) { if (!isset($data['lastId'])) { $data['lastId'] = 0; } @@ -247,7 +249,7 @@ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $pri $data['waiting'][] = [$id, $ttr, 0]; } else { $data['delayed'][] = [$id, $ttr, time() + $delay]; - usort($data['delayed'], function ($a, $b) { + usort($data['delayed'], static function ($a, $b) { if ($a[2] < $b[2]) { return -1; } @@ -272,7 +274,7 @@ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $pri * @param callable $callback * @throws InvalidConfigException */ - private function touchIndex($callback) + private function touchIndex(callable $callback): void { $fileName = "$this->path/index.data"; $isNew = !file_exists($fileName); diff --git a/src/drivers/gearman/Command.php b/src/drivers/gearman/Command.php index 745710d4b..f16c47c4e 100644 --- a/src/drivers/gearman/Command.php +++ b/src/drivers/gearman/Command.php @@ -22,6 +22,8 @@ class Command extends CliCommand { /** * @var Queue + * @psalm-suppress NonInvariantDocblockPropertyType + * @psalm-suppress PropertyNotSetInConstructor */ public CliQueue $queue; diff --git a/src/drivers/gearman/Queue.php b/src/drivers/gearman/Queue.php index 90d276faa..9c47d5162 100644 --- a/src/drivers/gearman/Queue.php +++ b/src/drivers/gearman/Queue.php @@ -31,6 +31,8 @@ class Queue extends CliQueue */ public string $commandClass = Command::class; + private ?GearmanClient $client = null; + /** * Listens queue and runs each job. * @@ -45,6 +47,7 @@ public function run(bool $repeat): ?int $worker = new GearmanWorker(); $worker->addServer($this->host, $this->port); $worker->addFunction($this->channel, function (GearmanJob $payload) { + /** @psalm-suppress PossiblyUndefinedArrayOffset */ [$ttr, $message] = explode(';', $payload->workload(), 2); $this->handleMessage($payload->handle(), $message, (int)$ttr, 1); }); @@ -79,7 +82,7 @@ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $pri */ public function status($id): int { - $status = $this->getClient()->jobStatus($id); + $status = $this->getClient()->jobStatus((string)$id); if ($status[0] && !$status[1]) { return self::STATUS_WAITING; } @@ -97,12 +100,10 @@ public function status($id): int */ protected function getClient(): GearmanClient { - if (!$this->_client) { - $this->_client = new GearmanClient(); - $this->_client->addServer($this->host, $this->port); + if (!$this->client) { + $this->client = new GearmanClient(); + $this->client->addServer($this->host, $this->port); } - return $this->_client; + return $this->client; } - - private $_client; } diff --git a/src/drivers/redis/Command.php b/src/drivers/redis/Command.php index 8b6e862e1..95ed09e45 100644 --- a/src/drivers/redis/Command.php +++ b/src/drivers/redis/Command.php @@ -23,6 +23,8 @@ class Command extends CliCommand { /** * @var Queue + * @psalm-suppress NonInvariantDocblockPropertyType + * @psalm-suppress PropertyNotSetInConstructor */ public CliQueue $queue; /** diff --git a/src/drivers/redis/InfoAction.php b/src/drivers/redis/InfoAction.php index 385a097f2..213e4cc28 100644 --- a/src/drivers/redis/InfoAction.php +++ b/src/drivers/redis/InfoAction.php @@ -23,6 +23,8 @@ class InfoAction extends Action { /** * @var Queue + * @psalm-suppress NonInvariantDocblockPropertyType + * @psalm-suppress PropertyNotSetInConstructor */ public CliQueue $queue; diff --git a/src/drivers/redis/Queue.php b/src/drivers/redis/Queue.php index 5396227ae..b1ecc9f43 100644 --- a/src/drivers/redis/Queue.php +++ b/src/drivers/redis/Queue.php @@ -25,6 +25,8 @@ class Queue extends CliQueue { /** * @var Connection|array|string + * @psalm-var Connection + * @psalm-suppress InvalidPropertyAssignmentValue */ public Connection|string|array $redis = 'redis'; /** @@ -42,6 +44,7 @@ class Queue extends CliQueue public function init(): void { parent::init(); + /** @psalm-suppress PropertyTypeCoercion */ $this->redis = Instance::ensure($this->redis, Connection::class); } @@ -155,6 +158,7 @@ protected function reserve(int $timeout): ?array return null; } + /** @psalm-suppress PossiblyUndefinedArrayOffset */ [$ttr, $message] = explode(';', $payload, 2); $this->redis->zadd("$this->channel.reserved", time() + (int)$ttr, $id); $attempt = $this->redis->hincrby("$this->channel.attempts", $id, 1); diff --git a/src/drivers/sqs/Command.php b/src/drivers/sqs/Command.php index 3597e0af5..cc9998768 100644 --- a/src/drivers/sqs/Command.php +++ b/src/drivers/sqs/Command.php @@ -24,6 +24,8 @@ class Command extends CliCommand { /** * @var Queue + * @psalm-suppress NonInvariantDocblockPropertyType + * @psalm-suppress PropertyNotSetInConstructor */ public CliQueue $queue; diff --git a/src/drivers/sqs/Queue.php b/src/drivers/sqs/Queue.php index 21aad3237..c7127db8d 100644 --- a/src/drivers/sqs/Queue.php +++ b/src/drivers/sqs/Queue.php @@ -29,17 +29,17 @@ class Queue extends CliQueue * The SQS url. * @var string */ - public string $url; + public string $url = 'localhost'; /** * aws access key. * @var string|null */ - public ?string $key; + public ?string $key = null; /** * aws secret. * @var string|null */ - public ?string $secret; + public ?string $secret = null; /** * region where queue is hosted. * @var string @@ -195,7 +195,7 @@ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $pri 'MessageAttributes' => [ 'TTR' => [ 'DataType' => 'Number', - 'StringValue' => $ttr, + 'StringValue' => (string) $ttr, ], ], ]; diff --git a/src/drivers/stomp/Command.php b/src/drivers/stomp/Command.php index 87e70bd25..312eac87f 100644 --- a/src/drivers/stomp/Command.php +++ b/src/drivers/stomp/Command.php @@ -24,6 +24,8 @@ class Command extends CliCommand { /** * @var Queue + * @psalm-suppress NonInvariantDocblockPropertyType + * @psalm-suppress PropertyNotSetInConstructor */ public CliQueue $queue; diff --git a/src/drivers/stomp/Queue.php b/src/drivers/stomp/Queue.php index e04ec302c..a98eaad35 100644 --- a/src/drivers/stomp/Queue.php +++ b/src/drivers/stomp/Queue.php @@ -16,7 +16,7 @@ use Enqueue\Stomp\StompMessage; use Interop\Queue\Exception as QueueException; use Interop\Queue\Message; -use Interop\Queue\Queue as InteropQueue; +use Stomp\Network\Connection; use yii\base\Application as BaseApp; use yii\base\Event; use yii\base\NotSupportedException; @@ -37,7 +37,7 @@ class Queue extends CliQueue * * @var string|null */ - public ?string $host; + public ?string $host = null; /** * The message queue broker's port. * @@ -124,12 +124,8 @@ public function init(): void /** * Opens connection. */ - protected function open(): void + protected function open(): StompContext { - if ($this->context) { - return; - } - $config = [ 'host' => $this->host, 'port' => $this->port, @@ -147,16 +143,14 @@ protected function open(): void return null !== $value; }); - $factory = new StompConnectionFactory($config); - - $this->context = $factory->createContext(); + return (new StompConnectionFactory($config))->createContext(); } /** * Listens queue and runs each job. * * @param bool $repeat - * @param int $timeout + * @param int<0, max> $timeout * @return int|null */ public function run(bool $repeat, int $timeout = 0): ?int @@ -164,11 +158,12 @@ public function run(bool $repeat, int $timeout = 0): ?int return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) { $this->open(); $queue = $this->createQueue($this->queueName); - $consumer = $this->context->createConsumer($queue); + $consumer = $this->getContext()->createConsumer($queue); while ($canContinue()) { + /** @var StompMessage|null $message */ $message = $this->readTimeOut > 0 ? $consumer->receive($this->readTimeOut) : $consumer->receiveNoWait(); - if ($message) { + if (null !== $message) { $messageId = $message->getMessageId(); if (!$messageId) { $message = $this->setMessageId($message); @@ -184,8 +179,9 @@ public function run(bool $repeat, int $timeout = 0): ?int $ttr = $message->getProperty(self::TTR, $this->ttr); $attempt = $message->getProperty(self::ATTEMPT, 1); + $messageId = $message->getMessageId(); - if ($this->handleMessage($message->getMessageId(), $message->getBody(), $ttr, $attempt)) { + if (null !== $messageId && $this->handleMessage($messageId, $message->getBody(), $ttr, $attempt)) { $consumer->acknowledge($message); } else { $consumer->acknowledge($message); @@ -196,7 +192,7 @@ public function run(bool $repeat, int $timeout = 0): ?int break; } elseif ($timeout) { sleep($timeout); - $this->context->getStomp()->getConnection()?->sendAlive(); + $this->getConnection()->sendAlive(); } } }); @@ -222,13 +218,13 @@ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $pri $this->open(); $queue = $this->createQueue($this->queueName); - $message = $this->context->createMessage($payload); + $message = $this->getContext()->createMessage($payload); $message = $this->setMessageId($message); $message->setPersistent(true); $message->setProperty(self::ATTEMPT, 1); $message->setProperty(self::TTR, $ttr); - $producer = $this->context->createProducer(); + $producer = $this->getContext()->createProducer(); if ($delay) { throw new NotSupportedException('Delayed work is not supported in the driver.'); @@ -273,14 +269,14 @@ protected function redeliver(StompMessage $message): void { $attempt = $message->getProperty(self::ATTEMPT, 1); - $newMessage = $this->context->createMessage( + $newMessage = $this->getContext()->createMessage( $message->getBody(), $message->getProperties(), $message->getHeaders() ); $newMessage->setProperty(self::ATTEMPT, ++$attempt); - $this->context->createProducer()->send( + $this->getContext()->createProducer()->send( $this->createQueue($this->queueName), $newMessage ); @@ -288,15 +284,29 @@ protected function redeliver(StompMessage $message): void /** * @param string $name - * @return InteropQueue|StompDestination + * @return StompDestination */ - private function createQueue(string $name): InteropQueue|StompDestination + private function createQueue(string $name): StompDestination { - $queue = $this->context->createQueue($name); + $queue = $this->getContext()->createQueue($name); $queue->setDurable(true); $queue->setAutoDelete(false); $queue->setExclusive(false); return $queue; } + + private function getContext(): StompContext + { + if (null === $this->context) { + $this->context = $this->open(); + } + + return $this->context; + } + + private function getConnection(): Connection + { + return $this->getContext()->getStomp()->getConnection(); + } } diff --git a/src/gii/Generator.php b/src/gii/Generator.php index a07fd4414..3e67cd579 100644 --- a/src/gii/Generator.php +++ b/src/gii/Generator.php @@ -13,6 +13,7 @@ use Yii; use yii\base\BaseObject; use yii\gii\CodeFile; +use yii\gii\Generator as BaseGenerator; use yii\queue\JobInterface; use yii\queue\RetryableJobInterface; @@ -21,7 +22,7 @@ * * @author Roman Zhuravlev */ -class Generator extends \yii\gii\Generator +class Generator extends BaseGenerator { public $jobClass; public $properties; @@ -32,7 +33,7 @@ class Generator extends \yii\gii\Generator /** * @inheritdoc */ - public function getName() + public function getName(): string { return 'Job Generator'; } @@ -40,7 +41,7 @@ public function getName() /** * @inheritdoc */ - public function getDescription() + public function getDescription(): string { return 'This generator generates a Job class for the queue.'; } @@ -48,7 +49,7 @@ public function getDescription() /** * @inheritdoc */ - public function rules() + public function rules(): array { return array_merge(parent::rules(), [ [['jobClass', 'properties', 'ns', 'baseClass'], 'trim'], @@ -65,7 +66,7 @@ public function rules() /** * @inheritdoc */ - public function attributeLabels() + public function attributeLabels(): array { return array_merge(parent::attributeLabels(), [ 'jobClass' => 'Job Class', @@ -79,7 +80,7 @@ public function attributeLabels() /** * @inheritdoc */ - public function hints() + public function hints(): array { return array_merge(parent::hints(), [ 'jobClass' => 'This is the name of the Job class to be generated, e.g., SomeJob.', @@ -93,7 +94,7 @@ public function hints() /** * @inheritdoc */ - public function stickyAttributes() + public function stickyAttributes(): array { return array_merge(parent::stickyAttributes(), ['ns', 'baseClass']); } @@ -101,7 +102,7 @@ public function stickyAttributes() /** * @inheritdoc */ - public function requiredTemplates() + public function requiredTemplates(): array { return ['job.php']; } @@ -109,7 +110,7 @@ public function requiredTemplates() /** * @inheritdoc */ - public function generate() + public function generate(): array { $params = []; $params['jobClass'] = $this->jobClass; @@ -140,7 +141,7 @@ public function generate() * * @param string $attribute job attribute name. */ - public function validateJobClass($attribute) + public function validateJobClass(string $attribute): void { if ($this->isReservedKeyword($this->$attribute)) { $this->addError($attribute, 'Class name cannot be a reserved PHP keyword.'); @@ -152,7 +153,7 @@ public function validateJobClass($attribute) * * @param string $attribute Namespace attribute name. */ - public function validateNamespace($attribute) + public function validateNamespace(string $attribute): void { $value = $this->$attribute; $value = ltrim($value, '\\'); diff --git a/src/serializers/IgbinarySerializer.php b/src/serializers/IgbinarySerializer.php index 15fb9bda4..21db11fdb 100644 --- a/src/serializers/IgbinarySerializer.php +++ b/src/serializers/IgbinarySerializer.php @@ -33,7 +33,7 @@ public function serialize($job): string /** * @inheritdoc */ - public function unserialize($serialized) + public function unserialize(string $serialized) { return igbinary_unserialize($serialized); } diff --git a/src/serializers/JsonSerializer.php b/src/serializers/JsonSerializer.php index 2500263db..0cc85a91e 100644 --- a/src/serializers/JsonSerializer.php +++ b/src/serializers/JsonSerializer.php @@ -42,7 +42,7 @@ public function serialize($job): string /** * @inheritdoc */ - public function unserialize($serialized) + public function unserialize(string $serialized) { return $this->fromArray(Json::decode($serialized)); } @@ -52,7 +52,7 @@ public function unserialize($serialized) * @return array|mixed * @throws InvalidConfigException */ - protected function toArray($data) + protected function toArray(mixed $data) { if (is_object($data)) { $result = [$this->classKey => get_class($data)]; @@ -82,10 +82,10 @@ protected function toArray($data) } /** - * @param array $data + * @param mixed $data * @return mixed */ - protected function fromArray($data) + protected function fromArray(mixed $data) { if (!is_array($data)) { return $data; diff --git a/src/serializers/PhpSerializer.php b/src/serializers/PhpSerializer.php index d4fb4bc7c..ac3e05791 100644 --- a/src/serializers/PhpSerializer.php +++ b/src/serializers/PhpSerializer.php @@ -30,7 +30,7 @@ public function serialize($job): string /** * @inheritdoc */ - public function unserialize($serialized) + public function unserialize(string $serialized) { return unserialize($serialized); } diff --git a/src/serializers/SerializerInterface.php b/src/serializers/SerializerInterface.php index d34faad5b..d10b668a5 100644 --- a/src/serializers/SerializerInterface.php +++ b/src/serializers/SerializerInterface.php @@ -29,5 +29,5 @@ public function serialize($job): string; * @param string $serialized * @return JobInterface */ - public function unserialize($serialized); + public function unserialize(string $serialized); } diff --git a/stubs/psalm/BaseYii.php b/stubs/psalm/BaseYii.php new file mode 100644 index 000000000..059e8373b --- /dev/null +++ b/stubs/psalm/BaseYii.php @@ -0,0 +1,18 @@ +|array{class: class-string}|callable(): T $type + * @param array $params + * + * @return T + */ + abstract public static function createObject($type, array $params = []); +} diff --git a/tests/app/RetryJob.php b/tests/app/RetryJob.php index 2a8c0e8bd..1fa838434 100644 --- a/tests/app/RetryJob.php +++ b/tests/app/RetryJob.php @@ -11,6 +11,7 @@ namespace tests\app; use Exception; +use Throwable; use Yii; use yii\base\BaseObject; use yii\queue\Queue; @@ -41,7 +42,7 @@ public function getTtr(): int return 2; } - public function canRetry($attempt, $error): bool + public function canRetry($attempt, ?Throwable $error): bool { return $attempt < 2; } diff --git a/tests/app/SimpleJob.php b/tests/app/SimpleJob.php index 1e7a669ae..5fac7a062 100644 --- a/tests/app/SimpleJob.php +++ b/tests/app/SimpleJob.php @@ -31,6 +31,6 @@ public function execute(Queue $queue): void public function getFileName(): bool|string { - return Yii::getAlias("@runtime/job-{$this->uid}.lock"); + return Yii::getAlias("@runtime/job-$this->uid.lock"); } } diff --git a/tests/app/benchmark/Controller.php b/tests/app/benchmark/Controller.php index 265e2ad4f..65b66ca4c 100644 --- a/tests/app/benchmark/Controller.php +++ b/tests/app/benchmark/Controller.php @@ -43,7 +43,7 @@ public function beforeAction($action): bool public function afterAction($action, $result) { $duration = time() - $this->startedAt; - $this->stdout("\nCompleted in {$duration} s.\n"); + $this->stdout("\nCompleted in $duration s.\n"); return parent::afterAction($action, $result); } } diff --git a/tests/app/config/main.php b/tests/app/config/main.php index 7b719c946..cf47d8faf 100644 --- a/tests/app/config/main.php +++ b/tests/app/config/main.php @@ -117,6 +117,9 @@ 'stompQueue' => [ 'class' => StompQueue::class, 'host' => getenv('ACTIVEMQ_HOST') ?: 'localhost', + 'port' => getenv('ACTIVEMQ_PORT'), + 'user' => getenv('ACTIVEMQ_USER'), + 'password' => getenv('ACTIVEMQ_PASSWORD'), ], 'sqsQueue' => [ 'class' => SqsQueue::class, diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index ddcb8e508..3c641de34 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -37,6 +37,9 @@ services: BEANSTALK_HOST: beanstalk GEARMAN_HOST: gearmand ACTIVEMQ_HOST: activemq + ACTIVEMQ_USER: artemis + ACTIVEMQ_PASSWORD: artemis + ACTIVEMQ_PORT: 61616 AWS_KEY: ${AWS_KEY:-admin} AWS_SECRET: ${AWS_SECRET:-admin} AWS_REGION: ${AWS_REGION:-us-east-1} @@ -115,11 +118,12 @@ services: networks: net: {} - # https://hub.docker.com/r/webcenter/activemq/ + # https://hub.docker.com/r/apache/activemq-artemis activemq: - image: webcenter/activemq + image: apache/activemq-artemis:latest-alpine ports: - - "61613:61613" + - "61616:61616" + - "8161:8161" networks: net: {} diff --git a/tests/drivers/beanstalk/QueueTest.php b/tests/drivers/beanstalk/QueueTest.php index 61491c03d..383717474 100644 --- a/tests/drivers/beanstalk/QueueTest.php +++ b/tests/drivers/beanstalk/QueueTest.php @@ -10,11 +10,11 @@ namespace tests\drivers\beanstalk; -use Exception; use Pheanstalk\Pheanstalk; use Pheanstalk\Values\JobId; use tests\app\PriorityJob; use tests\drivers\CliTestCase; +use Throwable; use Yii; use yii\queue\beanstalk\Queue; @@ -134,7 +134,7 @@ protected function jobIsExists(int|string|null $id): bool try { $connection->peek(new JobId($id)); return true; - } catch (\Throwable) { + } catch (Throwable) { return false; } } diff --git a/tests/drivers/db/TestCase.php b/tests/drivers/db/TestCase.php index 4e8e09de7..61b36544a 100644 --- a/tests/drivers/db/TestCase.php +++ b/tests/drivers/db/TestCase.php @@ -76,7 +76,7 @@ public function testLater(): void public function testRetry(): void { $this->startProcess(['php', 'yii', 'queue/listen', '1']); - $job = new RetryJob(['uid' => uniqid()]); + $job = new RetryJob(['uid' => uniqid('', true)]); $this->getQueue()->push($job); sleep(6); diff --git a/tests/drivers/file/QueueTest.php b/tests/drivers/file/QueueTest.php index fb6fe7807..5fda9c247 100644 --- a/tests/drivers/file/QueueTest.php +++ b/tests/drivers/file/QueueTest.php @@ -64,7 +64,7 @@ public function testLater(): void public function testRetry(): void { $this->startProcess(['php', 'yii', 'queue/listen', '1']); - $job = new RetryJob(['uid' => uniqid()]); + $job = new RetryJob(['uid' => uniqid('', true)]); $this->getQueue()->push($job); sleep(6); diff --git a/tests/drivers/redis/QueueTest.php b/tests/drivers/redis/QueueTest.php index 29ca2c6e3..48ac8ff47 100644 --- a/tests/drivers/redis/QueueTest.php +++ b/tests/drivers/redis/QueueTest.php @@ -64,7 +64,7 @@ public function testLater(): void public function testRetry(): void { $this->startProcess(['php', 'yii', 'queue/listen', '1']); - $job = new RetryJob(['uid' => uniqid()]); + $job = new RetryJob(['uid' => uniqid('', true)]); $this->getQueue()->push($job); sleep(6); diff --git a/tests/drivers/stomp/QueueTest.php b/tests/drivers/stomp/QueueTest.php index 105b1aa4b..f6df3369c 100644 --- a/tests/drivers/stomp/QueueTest.php +++ b/tests/drivers/stomp/QueueTest.php @@ -13,9 +13,10 @@ use tests\app\RetryJob; use tests\drivers\CliTestCase; use Yii; +use yii\base\NotSupportedException; use yii\queue\stomp\Queue; -class QueueTest extends CliTestCase +final class QueueTest extends CliTestCase { public function testListen(): void { @@ -37,12 +38,16 @@ public function testRetry(): void $this->assertEquals('aa', file_get_contents($job->getFileName())); } - /** - * @return Queue - */ + public function testStatus(): void + { + $this->expectException(NotSupportedException::class); + + $id = $this->getQueue()->push($this->createSimpleJob()); + $this->getQueue()->isWaiting($id); + } + protected function getQueue(): Queue { return Yii::$app->stompQueue; } - } diff --git a/tests/serializers/TestCase.php b/tests/serializers/TestCase.php index 57d5a1352..8e74012a4 100644 --- a/tests/serializers/TestCase.php +++ b/tests/serializers/TestCase.php @@ -31,7 +31,7 @@ abstract protected function createSerializer(): SerializerInterface; * @dataProvider providerSerialize * @param mixed $expected */ - public function testSerialize($expected): void + public function testSerialize(mixed $expected): void { $serializer = $this->createSerializer(); @@ -80,6 +80,6 @@ public static function providerSerialize(): array class TestObject extends BaseObject { - public $foo; - public $bar; + public int $foo; + public array $bar; }