diff --git a/CHANGELOG.md b/CHANGELOG.md
index 00b75e3..64dc6f9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
## [Unreleased]
+## [6.1.0] - 2025-06-24
+### Changed
+- Replaced entity doctrine annotations with php attributes
+
+### Breaking change
+- Sync with https://github.com/printedcom/rabbitmq-queue-bundle.git.
+
## [5.3.2] - 2025-04-02
### Fixed
- Force uuid generation as string when dispatching a QueueTask.
@@ -26,6 +33,14 @@ Please take care when updating to this version as the index could take a while t
may break the usages only if the call-sites uses php strict types and they were casting this argument to "(string)".
The explicit cast should be removed to resolve this breaking change.
+## [6.0.0] - 2023-03-15
+### Changed
+- Updated dependencies and update code to work with Symfony ^6.0 and PHP ^8.1.
+
+### Breaking changes
+- PHP >= 8.1
+- Symfony ^6.0
+
## [5.1.0] - 2023-03-14
### Fixed
- Fix supplying dedicated entity manager to the abstract queue consumer not actually working due to the QueueTask
@@ -200,6 +215,8 @@ exceptions.
- [Breaking change] Use exchange-less way of using producers and consumers
[Unreleased]: https://github.com/printedcom/rabbitmq-queue-bundle/compare/5.3.1...HEAD
+[6.1.0]: https://github.com/MasterRO94/printed-rabbitmq-queue-bundle/compare/6.0.0...6.1.0
+[6.0.0]: https://github.com/printedcom/rabbitmq-queue-bundle/compare/5.1.0...MasterRO94:printed-rabbitmq-queue-bundle:6.0.0
[5.3.0]: https://github.com/printedcom/rabbitmq-queue-bundle/compare/5.3.0...5.3.1
[5.3.0]: https://github.com/printedcom/rabbitmq-queue-bundle/compare/5.2.0...5.3.0
[5.2.0]: https://github.com/printedcom/rabbitmq-queue-bundle/compare/5.1.0...5.2.0
diff --git a/README.md b/README.md
index 3729ebf..8e1da24 100644
--- a/README.md
+++ b/README.md
@@ -13,12 +13,12 @@ The bundle piggybacks off of the `php-amqplib/rabbitmq-bundle` bundle.
## Setup & Dependencies
-* PHP `>=7.0`
-* https://packagist.org/packages/symfony/symfony `^3.4|^4.0`
-* https://packagist.org/packages/doctrine/orm `~2.5`
-* https://packagist.org/packages/monolog/monolog `~1.11`
-* https://packagist.org/packages/ramsey/uuid `~3.4`
-* https://packagist.org/packages/php-amqplib/rabbitmq-bundle `~1.6`
+* PHP `>=8.1`
+* https://packagist.org/packages/symfony/symfony `^6.0`
+* https://packagist.org/packages/doctrine/orm `~2.14`
+* https://packagist.org/packages/monolog/monolog `~2.8`
+* https://packagist.org/packages/ramsey/uuid `~4.7`
+* https://packagist.org/packages/php-amqplib/rabbitmq-bundle `~2.11`
We assume that you are familiar with the `php-amqplib/rabbitmq-bundle` configuration and setup.
diff --git a/composer.json b/composer.json
index edfd6fd..f33fab4 100644
--- a/composer.json
+++ b/composer.json
@@ -20,29 +20,33 @@
},
"require": {
- "php": ">=7.0",
+ "php": ">=8.1",
- "symfony/config": "^3.4 || ^4.0",
- "symfony/console": "^3.4 || ^4.0",
- "symfony/dependency-injection": "^3.4 || ^4.0",
- "symfony/expression-language": "^3.4 || ^4.0",
- "symfony/filesystem": "^3.4 || ^4.0",
- "symfony/http-foundation": "^3.4 || ^4.0",
- "symfony/http-kernel": "^3.4 || ^4.0",
- "symfony/validator": "^3.4 || ^4.0",
+ "symfony/config": "^6.0",
+ "symfony/console": "^6.0",
+ "symfony/dependency-injection": "^6.0",
+ "symfony/expression-language": "^6.0",
+ "symfony/filesystem": "^6.0",
+ "symfony/http-foundation": "^6.0",
+ "symfony/http-kernel": "^6.0",
+ "symfony/validator": "^6.0",
- "doctrine/cache": "^1.6",
- "doctrine/doctrine-bundle": "~1.4",
- "doctrine/orm": "~2.5",
+ "doctrine/cache": "^2.0",
+ "doctrine/doctrine-bundle": "^2.8",
+ "doctrine/orm": "^2.14",
- "monolog/monolog": "~1.11",
+ "monolog/monolog": "^2.8",
- "php-amqplib/rabbitmq-bundle": "^1.13.0",
+ "php-amqplib/rabbitmq-bundle": "^2.11",
- "ramsey/uuid": "~3.4",
- "php-http/guzzle6-adapter": "^1.1",
- "richardfullmer/rabbitmq-management-api": "^2.0",
+ "ramsey/uuid": "^4.7",
+ "php-http/guzzle7-adapter": "^1.0",
+ "andrewmy/rabbitmq-management-api": "^2.1",
"psr/log": "^1.0"
+ },
+ "config": {
+ "allow-plugins": {
+ "php-http/discovery": true
+ }
}
-
}
diff --git a/src/Printed/Bundle/Queue/Command/EnsureVhostExistsCommand.php b/src/Printed/Bundle/Queue/Command/EnsureVhostExistsCommand.php
index bf369ba..7c0fe94 100644
--- a/src/Printed/Bundle/Queue/Command/EnsureVhostExistsCommand.php
+++ b/src/Printed/Bundle/Queue/Command/EnsureVhostExistsCommand.php
@@ -1,5 +1,7 @@
rabbitMqVhostExistenceEnsurer = $rabbitMqVhostExistenceEnsurer;
}
/**
* {@inheritdoc}
*/
- protected function configure()
+ protected function configure(): void
{
$this->setName('queue:ensure-vhost-exists');
$this->setDescription("Ensures, that a rabbitmq's vhost exists, and that rabbitmq's user can manage it");
@@ -36,13 +33,15 @@ protected function configure()
/**
* {@inheritdoc}
*/
- public function execute(InputInterface $input, OutputInterface $output)
+ public function execute(InputInterface $input, OutputInterface $output): int
{
if ($output->getVerbosity() === OutputInterface::VERBOSITY_NORMAL) {
$output->setVerbosity(OutputInterface::VERBOSITY_VERY_VERBOSE);
}
$this->rabbitMqVhostExistenceEnsurer->ensure();
+
+ return static::SUCCESS;
}
}
diff --git a/src/Printed/Bundle/Queue/Command/MaintenanceDownCommand.php b/src/Printed/Bundle/Queue/Command/MaintenanceDownCommand.php
index 905d7ee..83a59d7 100644
--- a/src/Printed/Bundle/Queue/Command/MaintenanceDownCommand.php
+++ b/src/Printed/Bundle/Queue/Command/MaintenanceDownCommand.php
@@ -1,5 +1,7 @@
queueMaintenance = $queueMaintenance;
}
-
+
/**
* {@inheritdoc}
*/
- protected function configure()
+ protected function configure(): void
{
$this->setName('queue:maintenance:down');
$this->setDescription('Disable the maintenance mode for the queue');
@@ -35,10 +32,12 @@ protected function configure()
/**
* {@inheritdoc}
*/
- public function execute(InputInterface $input, OutputInterface $output)
+ public function execute(InputInterface $input, OutputInterface $output): int
{
$output->writeln('Disabling maintenance mode');
$this->queueMaintenance->disable();
+
+ return static::SUCCESS;
}
}
diff --git a/src/Printed/Bundle/Queue/Command/MaintenanceUpCommand.php b/src/Printed/Bundle/Queue/Command/MaintenanceUpCommand.php
index 3dafb6b..4cb576a 100644
--- a/src/Printed/Bundle/Queue/Command/MaintenanceUpCommand.php
+++ b/src/Printed/Bundle/Queue/Command/MaintenanceUpCommand.php
@@ -1,5 +1,7 @@
queueMaintenance = $queueMaintenance;
}
/**
* {@inheritdoc}
*/
- protected function configure()
+ protected function configure(): void
{
$this->setName('queue:maintenance:up');
$this->setDescription('Puts the queue in to maintenance mode');
@@ -35,10 +32,12 @@ protected function configure()
/**
* {@inheritdoc}
*/
- public function execute(InputInterface $input, OutputInterface $output)
+ public function execute(InputInterface $input, OutputInterface $output): int
{
$output->writeln('Enabling maintenance mode');
$this->queueMaintenance->enable();
+
+ return static::SUCCESS;
}
}
diff --git a/src/Printed/Bundle/Queue/Command/MaintenanceWaitForRunningCommand.php b/src/Printed/Bundle/Queue/Command/MaintenanceWaitForRunningCommand.php
index f729c42..27e0c07 100644
--- a/src/Printed/Bundle/Queue/Command/MaintenanceWaitForRunningCommand.php
+++ b/src/Printed/Bundle/Queue/Command/MaintenanceWaitForRunningCommand.php
@@ -1,5 +1,7 @@
queueMaintenance = $queueMaintenance;
- $this->dbalConnection = $dbalConnection;
}
/**
* {@inheritdoc}
*/
- protected function configure()
+ protected function configure(): void
{
$this->setName('queue:maintenance:wait');
$this->setDescription('Wait for running tasks to complete and exit');
@@ -49,7 +42,7 @@ protected function configure()
/**
* {@inheritdoc}
*/
- public function execute(InputInterface $input, OutputInterface $output)
+ public function execute(InputInterface $input, OutputInterface $output): int
{
$output->writeln('Monitoring running tasks');
@@ -65,28 +58,30 @@ public function execute(InputInterface $input, OutputInterface $output)
$dbal = $this->dbalConnection;
if (!$this->doesDatabaseExist($dbal)) {
- $output->writeln("The database doesn't exist. This is expected, if the bundle is used for the first time. Otherwise it's a critical error you should investigate.");
- return;
+ $output->writeln("The database doesn't exist. This is expected, if the bundle is used for the first time. Otherwise, it's a critical error you should investigate.");
+
+ return static::SUCCESS;
}
/*
* Exit immediately if the queue tasks db table is not in the database. Assume no workers
* are running.
*/
- if (!in_array('queue_task', $dbal->getSchemaManager()->listTableNames())) {
- $output->writeln("Couldn't find the queue tasks table in the database. This is expected, if the bundle is used for the first time. Otherwise it's a critical error you should investigate.");
- return;
+ if (!in_array('queue_task', $dbal->createSchemaManager()->listTableNames())) {
+ $output->writeln("Couldn't find the queue tasks table in the database. This is expected, if the bundle is used for the first time. Otherwise, it's a critical error you should investigate.");
+
+ return static::SUCCESS;
}
// Get the refresh time, this is 3 by default.
- $refresh = (integer) $input->getOption('refresh');
+ $refresh = (int) $input->getOption('refresh');
$table = new Table($output);
$table->setHeaders(['Queue', 'Tasks']);
while (true) {
// Find all tasks with running status.
- $tasks = $dbal->fetchAll(
+ $tasks = $dbal->fetchAllAssociative(
'SELECT id, queue_name FROM queue_task WHERE status = :status_running',
[ 'status_running' => QueueTaskStatus::RUNNING ]
);
@@ -95,7 +90,8 @@ public function execute(InputInterface $input, OutputInterface $output)
// If there are none we can exit the command.
if ($count === 0) {
$output->writeln(sprintf('There are no tasks in running state!'));
- return;
+
+ return static::SUCCESS;
}
$table->setRows([]);
@@ -162,7 +158,7 @@ private function doesDatabaseExist(Connection $connection): bool
$tmpConnection = DriverManager::getConnection($params);
- $doesDatabaseExist = in_array($name, $tmpConnection->getSchemaManager()->listDatabases());
+ $doesDatabaseExist = in_array($name, $tmpConnection->createSchemaManager()->listDatabases());
$tmpConnection->close();
diff --git a/src/Printed/Bundle/Queue/Command/RequeueTaskCommand.php b/src/Printed/Bundle/Queue/Command/RequeueTaskCommand.php
index 971abd4..08ae698 100644
--- a/src/Printed/Bundle/Queue/Command/RequeueTaskCommand.php
+++ b/src/Printed/Bundle/Queue/Command/RequeueTaskCommand.php
@@ -1,5 +1,7 @@
logger = $logger;
- $this->queueTaskDispatcher = $queueTaskDispatcher;
- $this->queueTaskHelper = $queueTaskHelper;
- $this->queueTaskRepository = $queueTaskRepository;
}
/**
* {@inheritdoc}
*/
- protected function configure()
+ protected function configure(): void
{
$this->setName('queue:requeue-task');
- $this->setDescription("Requeues a task. Should be used with great care (preferably never)");
+ $this->setDescription("Re-queues a task. Should be used with great care (preferably never)");
$this->setHelp('This command allows to perform dangerous stuff and is mostly useful only for debugging.');
$this->addArgument('queue-task-id', InputArgument::REQUIRED, 'Queue task to requeue');
@@ -58,7 +43,7 @@ protected function configure()
/**
* {@inheritdoc}
*/
- public function execute(InputInterface $input, OutputInterface $output)
+ public function execute(InputInterface $input, OutputInterface $output): int
{
if ($output->getVerbosity() === OutputInterface::VERBOSITY_NORMAL) {
$output->setVerbosity(OutputInterface::VERBOSITY_VERY_VERBOSE);
@@ -77,5 +62,7 @@ public function execute(InputInterface $input, OutputInterface $output)
$newTask = $this->queueTaskDispatcher->dispatch($this->queueTaskHelper->getPayload($task));
$this->logger->info("Successfully requeued task. New task id: `{$newTask->getId()}`");
+
+ return static::SUCCESS;
}
}
diff --git a/src/Printed/Bundle/Queue/Command/StoreNewDeploymentStampCommand.php b/src/Printed/Bundle/Queue/Command/StoreNewDeploymentStampCommand.php
index 42108fc..190617c 100644
--- a/src/Printed/Bundle/Queue/Command/StoreNewDeploymentStampCommand.php
+++ b/src/Printed/Bundle/Queue/Command/StoreNewDeploymentStampCommand.php
@@ -1,5 +1,7 @@
logger = $logger;
- $this->newDeploymentsDetector = $newDeploymentsDetector;
}
/**
* {@inheritdoc}
*/
- protected function configure()
+ protected function configure(): void
{
$this->setName('queue:store-new-deployment-stamp');
$this->setDescription("Store a new deployment stamp, so old workers can be told to shut down.");
@@ -50,18 +43,20 @@ protected function configure()
/**
* {@inheritdoc}
*/
- public function execute(InputInterface $input, OutputInterface $output)
+ public function execute(InputInterface $input, OutputInterface $output): int
{
if ($output->getVerbosity() === OutputInterface::VERBOSITY_NORMAL) {
$output->setVerbosity(OutputInterface::VERBOSITY_VERY_VERBOSE);
}
-
+
$newDeploymentStamp = $input->getArgument('new-deployment-stamp');
$this->logger->info("Trying to set new deployment stamp: `{$newDeploymentStamp}`");
$this->newDeploymentsDetector->setCurrentDeploymentStamp($newDeploymentStamp);
- $this->logger->info("Successfully set new deployment stamp.");
+ $this->logger->info('Successfully set new deployment stamp.');
+
+ return static::SUCCESS;
}
}
diff --git a/src/Printed/Bundle/Queue/Common/Traits/GetDataItemFromDataOrThrowTrait.php b/src/Printed/Bundle/Queue/Common/Traits/GetDataItemFromDataOrThrowTrait.php
index 5c22e2c..48ed3d6 100644
--- a/src/Printed/Bundle/Queue/Common/Traits/GetDataItemFromDataOrThrowTrait.php
+++ b/src/Printed/Bundle/Queue/Common/Traits/GetDataItemFromDataOrThrowTrait.php
@@ -1,5 +1,7 @@
serviceNames = $serviceNames;
}
- public function process(ContainerBuilder $containerBuilder): void
+ public function process(ContainerBuilder $container): void
{
foreach ($this->serviceNames as $serviceName) {
- $containerBuilder->getDefinition($serviceName)
- ->setPublic(true);
+ $container->getDefinition($serviceName)->setPublic(true);
}
}
-}
\ No newline at end of file
+}
diff --git a/src/Printed/Bundle/Queue/DependencyInjection/Configuration.php b/src/Printed/Bundle/Queue/DependencyInjection/Configuration.php
index 6c7bade..6d637e4 100644
--- a/src/Printed/Bundle/Queue/DependencyInjection/Configuration.php
+++ b/src/Printed/Bundle/Queue/DependencyInjection/Configuration.php
@@ -1,5 +1,7 @@
bundleAlias = $bundleAlias;
}
/**
* {@inheritDoc}
*/
- public function getConfigTreeBuilder()
+ public function getConfigTreeBuilder(): TreeBuilder
{
- $treeBuilder = new TreeBuilder();
- $rootNode = $treeBuilder->root($this->bundleAlias);
-
+ $treeBuilder = new TreeBuilder($this->bundleAlias);
+ $rootNode = $treeBuilder->getRootNode();
+
$rootNode
->children()
->arrayNode('options')
diff --git a/src/Printed/Bundle/Queue/DependencyInjection/QueueExtension.php b/src/Printed/Bundle/Queue/DependencyInjection/QueueExtension.php
index 0554f5c..f18de14 100644
--- a/src/Printed/Bundle/Queue/DependencyInjection/QueueExtension.php
+++ b/src/Printed/Bundle/Queue/DependencyInjection/QueueExtension.php
@@ -1,5 +1,7 @@
getAlias());
$config = $this->processConfiguration($configuration, $configs);
@@ -29,18 +31,18 @@ public function load(array $configs, ContainerBuilder $container)
$this->configureQueueServicesWithDynamicDependencies($config, $container);
}
- public function getAlias()
+ public function getAlias(): string
{
return 'printedcom_rabbitmq_queue_bundle';
}
- private function defineQueueBundleOptions(array $bundleConfig, ContainerBuilder $container)
+ private function defineQueueBundleOptions(array $bundleConfig, ContainerBuilder $container): void
{
$queueBundleOptionsDefinition = $container->getDefinition('printed.bundle.queue.service.queue_bundle_options');
$queueBundleOptionsDefinition->setArgument(0, $bundleConfig['options']);
}
- private function configureQueueServicesWithDynamicDependencies(array $bundleConfig, ContainerBuilder $container)
+ private function configureQueueServicesWithDynamicDependencies(array $bundleConfig, ContainerBuilder $container): void
{
/*
* QueueMaintenance.php
diff --git a/src/Printed/Bundle/Queue/Entity/QueueTask.php b/src/Printed/Bundle/Queue/Entity/QueueTask.php
index 66b14e3..7530879 100644
--- a/src/Printed/Bundle/Queue/Entity/QueueTask.php
+++ b/src/Printed/Bundle/Queue/Entity/QueueTask.php
@@ -1,168 +1,104 @@
id;
}
- /**
- * {@inheritdoc}
- */
public function getPublicId(): string
{
return $this->publicId;
}
- /**
- * {@inheritdoc}
- */
public function setPublicId(string $id)
{
$this->publicId = $id;
+
return $this;
}
- /**
- * {@inheritdoc}
- */
public function getQueueName(): string
{
return $this->queueName;
@@ -177,76 +113,52 @@ public function assertQueueName(string $queueName)
throw new \RuntimeException("Failed to assert, that queue task `{$this->id}` is for queue `{$queueName}`.");
}
- /**
- * {@inheritdoc}
- */
public function setQueueName(string $queueName)
{
$this->queueName = $queueName;
+
return $this;
}
- /**
- * {@inheritdoc}
- */
public function getStatus(): int
{
return $this->status;
}
- /**
- * {@inheritdoc}
- */
public function isStatus(int $status): bool
{
return $this->status === $status;
}
- /**
- * {@inheritdoc}
- */
public function isAnyFailedStatus(): bool
{
- return in_array($this->status, [ QueueTaskStatus::FAILED, QueueTaskStatus::FAILED_LIMIT_EXCEEDED ]);
+ return in_array($this->status, [QueueTaskStatus::FAILED, QueueTaskStatus::FAILED_LIMIT_EXCEEDED]);
}
- /**
- * {@inheritdoc}
- */
public function setStatus(int $status)
{
$this->status = $status;
+
return $this;
}
- /**
- * {@inheritdoc}
- */
public function getAttempts(): int
{
return $this->attempts;
}
- /**
- * {@inheritdoc}
- */
public function setAttempts(int $attempts)
{
$this->attempts = $attempts;
+
return $this;
}
- /**
- * {@inheritdoc}
- */
public function getCompletionPercentage(): int
{
return $this->completionPercentage;
}
- /**
- * {@inheritdoc}
- */
public function setCompletionPercentage(int $completionPercentage)
{
if ($completionPercentage < 0 || $completionPercentage > 100) {
@@ -258,134 +170,91 @@ public function setCompletionPercentage(int $completionPercentage)
return $this;
}
- /**
- * {@inheritdoc}
- */
public function isCancellationRequested(): bool
{
return $this->cancellationRequested;
}
- /**
- * {@inheritdoc}
- */
public function setCancellationRequested(bool $cancellationRequested)
{
$this->cancellationRequested = $cancellationRequested;
+
return $this;
}
- /**
- * {@inheritdoc}
- */
public function getProcessId()
{
return $this->processId;
}
- /**
- * {@inheritdoc}
- */
public function setProcessId(int $pid = null)
{
$this->processId = $pid;
+
return $this;
}
- /**
- * {@inheritdoc}
- */
public function getPayloadClass(): string
{
return $this->payloadClass;
}
- /**
- * {@inheritdoc}
- */
public function setPayloadClass(string $class)
{
$this->payloadClass = $class;
+
return $this;
}
- /**
- * {@inheritdoc}
- */
public function getPayload(): array
{
return $this->payload;
}
- /**
- * {@inheritdoc}
- */
public function getPayloadDataItem(string $key)
{
return $this->getDataItemFromData($this->payload, $key);
}
- /**
- * {@inheritdoc}
- */
public function getPayloadDataItemOrThrow(string $key)
{
return $this->getDataItemFromDataOrThrow($this->payload, $key);
}
- /**
- * {@inheritdoc}
- */
public function setPayload(array $payload)
{
$this->payload = $payload;
+
return $this;
}
- /**
- * {@inheritdoc}
- */
public function getResponse(): array
{
return $this->response;
}
- /**
- * {@inheritdoc}
- */
public function setResponse(array $response)
{
$this->response = $response;
+
return $this;
}
- /**
- * {@inheritdoc}
- */
public function getResponseError(): array
{
return $this->responseError;
}
- /**
- * {@inheritdoc}
- */
public function getResponseDataItem(string $key)
{
return $this->getDataItemFromData($this->response, $key);
}
- /**
- * {@inheritdoc}
- */
public function getResponseDataItemOrThrow(string $key)
{
return $this->getDataItemFromDataOrThrow($this->response, $key);
}
- /**
- * {@inheritdoc}
- */
public function setResponseError(string $class, string $message, array $stack)
{
/*
@@ -399,6 +268,7 @@ public function setResponseError(string $class, string $message, array $stack)
$this->responseError['class'] = $class;
$this->responseError['message'] = $message;
$this->responseError['stack'] = $stack;
+
return $this;
}
@@ -418,6 +288,7 @@ public function getCreatedDate(): \DateTimeInterface
public function setCreatedDate(\DateTimeInterface $date)
{
$this->createdDate = $date;
+
return $this;
}
@@ -437,6 +308,7 @@ public function getStartedDate()
public function setStartedDate(\DateTimeInterface $date)
{
$this->startedDate = $date;
+
return $this;
}
@@ -456,6 +328,7 @@ public function getCompletedDate()
public function setCompletedDate(\DateTimeInterface $date)
{
$this->completedDate = $date;
+
return $this;
}
diff --git a/src/Printed/Bundle/Queue/Event/DispatchDelayedQueueTasksEventListener.php b/src/Printed/Bundle/Queue/Event/DispatchDelayedQueueTasksEventListener.php
index 6b18ca6..bdee9d5 100644
--- a/src/Printed/Bundle/Queue/Event/DispatchDelayedQueueTasksEventListener.php
+++ b/src/Printed/Bundle/Queue/Event/DispatchDelayedQueueTasksEventListener.php
@@ -13,12 +13,8 @@
*/
class DispatchDelayedQueueTasksEventListener implements EventSubscriber
{
- /** @var QueueTaskDispatcher */
- private $queueTaskDispatcher;
-
- public function __construct(QueueTaskDispatcher $queueTaskDispatcher)
+ public function __construct(private readonly QueueTaskDispatcher $queueTaskDispatcher)
{
- $this->queueTaskDispatcher = $queueTaskDispatcher;
}
public function getSubscribedEvents(): array
@@ -34,7 +30,7 @@ public function postFlush(PostFlushEventArgs $event)
* Make sure the flush doesn't happen in a db transaction, because this wouldn't actually
* make the queue tasks available in the database at this point.
*/
- if ($event->getEntityManager()->getConnection()->isTransactionActive()) {
+ if ($event->getObjectManager()->getConnection()->isTransactionActive()) {
return;
}
diff --git a/src/Printed/Bundle/Queue/Exception/Consumer/QueueFatalErrorException.php b/src/Printed/Bundle/Queue/Exception/Consumer/QueueFatalErrorException.php
index d5d15f9..a07bfad 100644
--- a/src/Printed/Bundle/Queue/Exception/Consumer/QueueFatalErrorException.php
+++ b/src/Printed/Bundle/Queue/Exception/Consumer/QueueFatalErrorException.php
@@ -2,11 +2,13 @@
namespace Printed\Bundle\Queue\Exception\Consumer;
+use RuntimeException;
+
/**
* Throwing this exception within a queue consumer will prevent it from running again by maxing out the task attempts.
* The message is then logged in the log file and the task is exited.
*/
-class QueueFatalErrorException extends \RuntimeException
+class QueueFatalErrorException extends RuntimeException
{
}
diff --git a/src/Printed/Bundle/Queue/Exception/CouldNotFindAllRequestedQueueTasksException.php b/src/Printed/Bundle/Queue/Exception/CouldNotFindAllRequestedQueueTasksException.php
index 1292591..4d26d8e 100644
--- a/src/Printed/Bundle/Queue/Exception/CouldNotFindAllRequestedQueueTasksException.php
+++ b/src/Printed/Bundle/Queue/Exception/CouldNotFindAllRequestedQueueTasksException.php
@@ -1,21 +1,23 @@
missingTaskIds = $missingTaskIds;
}
/**
diff --git a/src/Printed/Bundle/Queue/Exception/MissingQueueException.php b/src/Printed/Bundle/Queue/Exception/MissingQueueException.php
index 0821597..f7df2dd 100644
--- a/src/Printed/Bundle/Queue/Exception/MissingQueueException.php
+++ b/src/Printed/Bundle/Queue/Exception/MissingQueueException.php
@@ -2,10 +2,12 @@
namespace Printed\Bundle\Queue\Exception;
+use RuntimeException;
+
/**
* {@inheritdoc}
*/
-class MissingQueueException extends \RuntimeException
+class MissingQueueException extends RuntimeException
{
}
diff --git a/src/Printed/Bundle/Queue/Exception/QueuePayloadValidationException.php b/src/Printed/Bundle/Queue/Exception/QueuePayloadValidationException.php
index dabd225..2318a68 100644
--- a/src/Printed/Bundle/Queue/Exception/QueuePayloadValidationException.php
+++ b/src/Printed/Bundle/Queue/Exception/QueuePayloadValidationException.php
@@ -2,10 +2,12 @@
namespace Printed\Bundle\Queue\Exception;
+use RuntimeException;
+
/**
* {@inheritdoc}
*/
-class QueuePayloadValidationException extends \RuntimeException
+class QueuePayloadValidationException extends RuntimeException
{
}
diff --git a/src/Printed/Bundle/Queue/Exception/QueueTaskCancellationException.php b/src/Printed/Bundle/Queue/Exception/QueueTaskCancellationException.php
index 2877d99..295bb66 100644
--- a/src/Printed/Bundle/Queue/Exception/QueueTaskCancellationException.php
+++ b/src/Printed/Bundle/Queue/Exception/QueueTaskCancellationException.php
@@ -2,11 +2,13 @@
namespace Printed\Bundle\Queue\Exception;
+use RuntimeException;
+
/**
* Exception used to cancel consumer's execution from anywhere in its code, due to the
* fact, that a queue task has been cancelled.
*/
-class QueueTaskCancellationException extends \RuntimeException
+class QueueTaskCancellationException extends RuntimeException
{
}
diff --git a/src/Printed/Bundle/Queue/Helper/QueueTaskHelper.php b/src/Printed/Bundle/Queue/Helper/QueueTaskHelper.php
index 0dfe16e..966ddad 100644
--- a/src/Printed/Bundle/Queue/Helper/QueueTaskHelper.php
+++ b/src/Printed/Bundle/Queue/Helper/QueueTaskHelper.php
@@ -8,21 +8,14 @@
class QueueTaskHelper
{
- /** @var QueueTaskRepository */
- private $queueTaskRepository;
-
- public function __construct(QueueTaskRepository $queueTaskRepository)
+ public function __construct(private readonly QueueTaskRepository $queueTaskRepository)
{
- $this->queueTaskRepository = $queueTaskRepository;
}
- /**
- * @param QueueTaskInterface $task
- * @return AbstractQueuePayload
- */
public function getPayload(QueueTaskInterface $task): AbstractQueuePayload
{
$class = $task->getPayloadClass();
+
return new $class($task->getPayload());
}
@@ -32,18 +25,16 @@ public function getPayload(QueueTaskInterface $task): AbstractQueuePayload
* Learn about $queueTaskPayloadCriteria in QueueTaskRepository.
*
* @param string[] $taskPublicIds
- * @param string|null $queueName
- * @param array $queueTaskPayloadCriteria
*/
public function requestTasksCancellationOrThrow(
array $taskPublicIds,
- string $queueName = null,
- array $queueTaskPayloadCriteria = []
- ) {
+ ?string $queueName = null,
+ array $queueTaskPayloadCriteria = [],
+ ): void {
$tasks = $this->queueTaskRepository->findByPublicIdsAndQueueNameOrThrow(
$taskPublicIds,
$queueName,
- $queueTaskPayloadCriteria
+ $queueTaskPayloadCriteria,
);
foreach ($tasks as $task) {
diff --git a/src/Printed/Bundle/Queue/Queue/AbstractQueueConsumer.php b/src/Printed/Bundle/Queue/Queue/AbstractQueueConsumer.php
index 1af01cd..5c24732 100644
--- a/src/Printed/Bundle/Queue/Queue/AbstractQueueConsumer.php
+++ b/src/Printed/Bundle/Queue/Queue/AbstractQueueConsumer.php
@@ -1,7 +1,10 @@
em = $em;
- $this->validator = $validator;
- $this->logger = $logger;
- $this->locator = $locator;
- $this->containerParameters = $containerParameters;
$this->queueBundleOptions = $locator->get('printed.bundle.queue.service.queue_bundle_options');
$this->newDeploymentsDetector = $locator->get('printed.bundle.queue.service.new_deployments_detector');
@@ -119,7 +96,7 @@ public function __construct(
*/
$this->internalQueueConsumerEntityManager = $internalQueueConsumerEntityManager ?: $em;
- $this->startUpDateTime = new \DateTime();
+ $this->startUpDateTime = new DateTime();
}
public static function getSubscribedServices(): array
@@ -128,7 +105,7 @@ public static function getSubscribedServices(): array
* Dependencies for this class are required this way instead of injecting them to the constructor, so that:
*
* 1. The subclasses overriding this method don't forget to merge the parent deps.
- * 2. More deps can be added without introducing a breaking change to the the constructor's params.
+ * 2. More deps can be added without introducing a breaking change to the constructor's params.
*/
return [
'printed.bundle.queue.service.queue_bundle_options' => QueueBundleOptions::class,
@@ -149,10 +126,6 @@ public static function getSubscribedServices(): array
* * {@link TASK_FAILED}
* * {@link TASK_COMPLETE}
*
- * @param AbstractQueuePayload $payload
- *
- * @return bool
- *
* @throws QueueFatalErrorException
*/
abstract public function run(AbstractQueuePayload $payload): bool;
@@ -160,25 +133,18 @@ abstract public function run(AbstractQueuePayload $payload): bool;
/**
* Return the number of attempts the task will be given before it is marked as failed and dropped from
* the queue. When hit the queue will be marked in the database with failed because of limit.
- *
- * @return int
*/
public function getAttemptLimit(): int
{
return 1;
}
- /**
- * @param array $data
- *
- * @return array
- */
public function getLoggerContext(array $data = []): array
{
return array_merge(
[
'time' => time(),
- 'consumer' => get_called_class()
+ 'consumer' => get_called_class(),
],
$data
);
@@ -186,10 +152,8 @@ public function getLoggerContext(array $data = []): array
/**
* A helper method to quickly dispatch queue payloads.
- *
- * @param AbstractQueuePayload $payload
*/
- public function dispatchQueuePayload(AbstractQueuePayload $payload)
+ public function dispatchQueuePayload(AbstractQueuePayload $payload): void
{
$queue = $this->locator->get('printed.bundle.queue.service.queue_task_dispatcher');
$queue->dispatch($payload);
@@ -198,7 +162,7 @@ public function dispatchQueuePayload(AbstractQueuePayload $payload)
/**
* {@inheritdoc}
*/
- public function execute(AMQPMessage $msg)
+ public function execute(AMQPMessage $msg): bool|int
{
$this->message = $msg;
@@ -222,11 +186,9 @@ public function execute(AMQPMessage $msg)
$this->clearKnownEntityManagers();
- // @codingStandardsIgnoreStart
- $id = $msg->delivery_info['delivery_tag'];
- $queueName = $msg->delivery_info['routing_key'];
- $redelivered = $msg->delivery_info['redelivered'];
- // @codingStandardsIgnoreEnd
+ $id = $msg->getDeliveryTag();
+ $queueName = $msg->getRoutingKey();
+ $redelivered = $msg->isRedelivered();
// Attempt to retrieve the task from the database.
// The task ID was given as the message body.
@@ -237,6 +199,7 @@ public function execute(AMQPMessage $msg)
// Ideally we would email at this point, but there is nothing to email about.
if (is_null($this->task)) {
$this->logger->emergency(sprintf('Invalid task "%s" given to "%s"', $this->message->body, $queueName));
+
// Instead of using failed we use complete, this prevents the job being re-queued.
return self::TASK_COMPLETE;
}
@@ -257,18 +220,17 @@ public function execute(AMQPMessage $msg)
$this->task->getQueueName(),
$redelivered ? 're-attempting' : 'attempting',
$this->task->getId(),
- json_encode($this->task->getPayload())
+ json_encode($this->task->getPayload()),
),
[
'rabbitmq_id' => $id,
'rabbitmq_redelivered' => $redelivered,
'timestamp' => $this->task->getStartedDate()->getTimestamp(),
- 'attempts' => $this->task->getAttempts()
+ 'attempts' => $this->task->getAttempts(),
]
);
try {
-
$errors = $this->validator->validate($payload);
if ($errors->count()) {
throw new QueueFatalErrorException((string) $errors);
@@ -278,7 +240,6 @@ public function execute(AMQPMessage $msg)
// Handle the job.
$queueTaskStatus = $this->run($payload) ? QueueTaskStatus::COMPLETE : QueueTaskStatus::FAILED;
-
} catch (QueueFatalErrorException $exception) {
$queueTaskStatus = QueueTaskStatus::FAILED;
@@ -292,44 +253,31 @@ public function execute(AMQPMessage $msg)
$this->task->setAttempts($this->getAttemptLimit());
$this->task->setResponseError(get_class($exception), $exception->getMessage(), $exception->getTrace());
$this->logger->error($exception->getMessage(), $this->getLoggerContext());
-
} catch (QueueTaskCancellationException $exception) {
$queueTaskStatus = QueueTaskStatus::CANCELLED;
$exception = null;
-
- } catch (\Throwable $exception) {
+ } catch (Throwable $exception) {
$queueTaskStatus = QueueTaskStatus::FAILED;
- // Its good to know why a task failed, in this case we can log the exception.
+ // It's good to know why a task failed, in this case we can log the exception.
$this->task->setResponseError(get_class($exception), $exception->getMessage(), $exception->getTrace());
$this->logger->emergency(
sprintf(
"%s\n%s\n\n%s",
get_class($exception),
$exception->getMessage(),
- $exception->getTraceAsString()
+ $exception->getTraceAsString(),
)
);
-
}
- switch ($queueTaskStatus) {
- case QueueTaskStatus::COMPLETE:
- $this->updateTaskComplete();
- break;
-
- case QueueTaskStatus::FAILED:
- $this->updateTaskFailed();
- break;
-
- case QueueTaskStatus::CANCELLED:
- $this->updateTaskCancelled();
- break;
-
- default:
- throw new \RuntimeException("Unexpected queue task status: `{$queueTaskStatus}`");
- }
+ match ($queueTaskStatus) {
+ QueueTaskStatus::COMPLETE => $this->updateTaskComplete(),
+ QueueTaskStatus::FAILED => $this->updateTaskFailed(),
+ QueueTaskStatus::CANCELLED => $this->updateTaskCancelled(),
+ default => throw new RuntimeException("Unexpected queue task status: `{$queueTaskStatus}`"),
+ };
$this->internalQueueConsumerEntityManager->persist($this->task);
$this->internalQueueConsumerEntityManager->flush($this->task);
@@ -358,7 +306,6 @@ public function execute(AMQPMessage $msg)
return $queueTaskStatus === QueueTaskStatus::FAILED
? self::TASK_FAILED
: self::TASK_COMPLETE;
-
}
/**
@@ -369,7 +316,6 @@ public function execute(AMQPMessage $msg)
* be flushed at this point. In other words, either don't use the entity manager at all, or flush only the entities
* you really intend to flush via `EntityManager::flush($entityIIntendToFlush);`
*
- * @param AbstractQueuePayload $payload
* @return void
*/
protected function onTaskCancelled(AbstractQueuePayload $payload)
@@ -381,20 +327,18 @@ protected function onTaskCancelled(AbstractQueuePayload $payload)
*
* Read about the entity manager's usage caveats in the docblock for ::onTaskCancelled().
*
- * @param AbstractQueuePayload $payload
- * @param \Throwable $exception
- * @param bool $isPermanentFailure
* @return void
*/
- protected function onTaskAbortedByException(AbstractQueuePayload $payload, \Throwable $exception, bool $isPermanentFailure)
- {
+ protected function onTaskAbortedByException(
+ AbstractQueuePayload $payload,
+ Throwable $exception,
+ bool $isPermanentFailure,
+ ) {
}
/**
* Run this from your consumer to update the task's completion percentage without flushing
* anything else into database.
- *
- * @param int $completionPercentage
*/
protected function setTaskCompletionPercentage(int $completionPercentage)
{
@@ -423,7 +367,7 @@ protected function setTaskCompletionPercentage(int $completionPercentage)
* Naturally, you're not forced to cancel your consumer if you don't want to or when
* you're past "the point of no return".
*/
- protected function throwTaskCancellationExceptionIfCancellationRequested()
+ protected function throwTaskCancellationExceptionIfCancellationRequested(): void
{
$this->internalQueueConsumerEntityManager->refresh($this->task);
@@ -434,8 +378,6 @@ protected function throwTaskCancellationExceptionIfCancellationRequested()
/**
* Check the task has attempts left, return true to remove the job.
- *
- * @return bool
*/
private function validateTaskAttempts(): bool
{
@@ -450,12 +392,12 @@ private function validateTaskAttempts(): bool
'The task "%s" exceeded the max attempt limit ("%s") for the consumer "%s"',
$this->task->getId(),
$this->getAttemptLimit(),
- $this->task->getQueueName()
+ $this->task->getQueueName(),
)
);
$this->task->setStatus(QueueTaskStatus::FAILED_LIMIT_EXCEEDED);
- $this->task->setCompletedDate(new \DateTime);
+ $this->task->setCompletedDate(new DateTime);
$this->internalQueueConsumerEntityManager->persist($this->task);
$this->internalQueueConsumerEntityManager->flush($this->task);
@@ -464,9 +406,8 @@ private function validateTaskAttempts(): bool
}
- private function updateTaskRunning()
+ private function updateTaskRunning(): void
{
-
// Increment the task attempts count.
$this->task->setAttempts($this->task->getAttempts() + 1);
@@ -478,23 +419,23 @@ private function updateTaskRunning()
// Mark the task as running but also set the running date.
$this->task->setStatus(QueueTaskStatus::RUNNING);
- $this->task->setStartedDate(new \DateTime);
+ $this->task->setStartedDate(new DateTime);
$this->internalQueueConsumerEntityManager->persist($this->task);
$this->internalQueueConsumerEntityManager->flush($this->task);
}
- private function updateTaskCancelled()
+ private function updateTaskCancelled(): void
{
$this->task->setStatus(QueueTaskStatus::CANCELLED);
- $this->task->setCompletedDate(new \DateTime);
+ $this->task->setCompletedDate(new DateTime);
$this->logger->info(
sprintf(
'Consumer "%s" for task "%s" cancelled',
$this->task->getQueueName(),
- $this->task->getId()
+ $this->task->getId(),
)
);
@@ -503,33 +444,33 @@ private function updateTaskCancelled()
}
- private function updateTaskComplete()
+ private function updateTaskComplete(): void
{
$this->task->setStatus(QueueTaskStatus::COMPLETE);
$this->task->setCompletionPercentage(100);
- $this->task->setCompletedDate(new \DateTime);
+ $this->task->setCompletedDate(new DateTime);
$this->logger->info(
sprintf(
'Consumer "%s" for task "%s" completed',
$this->task->getQueueName(),
- $this->task->getId()
+ $this->task->getId(),
)
);
}
- private function updateTaskFailed()
+ private function updateTaskFailed(): void
{
$this->task->setStatus(QueueTaskStatus::FAILED);
- $this->task->setCompletedDate(new \DateTime);
+ $this->task->setCompletedDate(new DateTime);
$this->logger->error(
sprintf(
'Consumer "%s" for task "%s" failed',
$this->task->getQueueName(),
- $this->task->getId()
- )
+ $this->task->getId(),
+ ),
);
}
@@ -537,7 +478,7 @@ private function updateTaskFailed()
/**
* Clear all known entity managers, so entities are not cached between consumers' runs.
*/
- private function clearKnownEntityManagers()
+ private function clearKnownEntityManagers(): void
{
$this->internalQueueConsumerEntityManager->clear();
@@ -552,7 +493,7 @@ private function clearKnownEntityManagers()
* Ensure the consumer has been running for at least the amount of seconds configured, so tools like supervisord
* don't assume that the consumer didn't even start, if it manages to start and fail too quickly.
*/
- private function sleepUntilMinimalRuntimeIsMet()
+ private function sleepUntilMinimalRuntimeIsMet(): void
{
$minimalRuntimeInSeconds = $this->queueBundleOptions->get('minimal_runtime_in_seconds_on_consumer_exception');
@@ -567,7 +508,7 @@ private function sleepUntilMinimalRuntimeIsMet()
// because the time difference is already more than 1s. It's better to be safe than sorry, I guess.
$minimalRuntimeInSeconds += 1;
- $secondsSinceConsumerStart = (new \DateTime())->getTimeStamp() - $this->startUpDateTime->getTimeStamp();
+ $secondsSinceConsumerStart = (new DateTime())->getTimeStamp() - $this->startUpDateTime->getTimeStamp();
$timeToSleepToMeetMinimalRuntime = $minimalRuntimeInSeconds - $secondsSinceConsumerStart;
if ($timeToSleepToMeetMinimalRuntime <= 0) {
diff --git a/src/Printed/Bundle/Queue/Queue/AbstractQueuePayload.php b/src/Printed/Bundle/Queue/Queue/AbstractQueuePayload.php
index 5e30ff6..69bb668 100644
--- a/src/Printed/Bundle/Queue/Queue/AbstractQueuePayload.php
+++ b/src/Printed/Bundle/Queue/Queue/AbstractQueuePayload.php
@@ -1,5 +1,7 @@
$value) {
- if (0 === strpos($key, '__')) {
- continue;
- }
-
- $filteredPayloadProperties[$key] = $value;
- }
-
- return $filteredPayloadProperties;
+ return array_filter(get_object_vars($this), function ($value, string $key) {
+ return !str_starts_with($key, '__');
+ }, ARRAY_FILTER_USE_BOTH);
}
}
diff --git a/src/Printed/Bundle/Queue/QueueBundle.php b/src/Printed/Bundle/Queue/QueueBundle.php
index 72ef8b6..5658294 100644
--- a/src/Printed/Bundle/Queue/QueueBundle.php
+++ b/src/Printed/Bundle/Queue/QueueBundle.php
@@ -1,5 +1,7 @@
findByQueueNameAndStatuses(
$queueName,
@@ -51,15 +54,13 @@ public function findUnsettled(
/**
* Learn more about the payload criteria by reading the ::findUnsettled() docblock.
*
- * @param string|null $queueName
* @param int[] $queueTaskStatuses Collection of QueueTaskStatus::*
- * @param array $queueTaskPayloadCriteria
- * @return array
+ * @return QueueTaskInterface[]
*/
public function findByQueueNameAndStatuses(
- string $queueName = null,
+ ?string $queueName = null,
array $queueTaskStatuses = [],
- array $queueTaskPayloadCriteria = []
+ array $queueTaskPayloadCriteria = [],
): array {
$dbalConnection = $this->getEntityManager()->getConnection();
$tableAlias = 'qt';
@@ -97,7 +98,7 @@ public function findByQueueNameAndStatuses(
", $resultSetMappingBuilder);
if ($queueTaskStatuses) {
- $nativeQuery->setParameter(1, $queueTaskStatuses, Connection::PARAM_INT_ARRAY);
+ $nativeQuery->setParameter(1, $queueTaskStatuses, ArrayParameterType::INTEGER);
}
$result = $nativeQuery->getResult();
@@ -117,14 +118,12 @@ public function findByQueueNameAndStatuses(
* Learn more about the payload criteria by reading the ::findUnsettled() docblock.
*
* @param string[] $taskPublicIds
- * @param string|null $queueName
- * @param array $queueTaskPayloadCriteria
* @return QueueTaskInterface[]
*/
public function findByPublicIdsAndQueueNameOrThrow(
array $taskPublicIds,
- string $queueName = null,
- array $queueTaskPayloadCriteria = []
+ ?string $queueName = null,
+ array $queueTaskPayloadCriteria = [],
): array {
$dbalConnection = $this->getEntityManager()->getConnection();
$tableAlias = 'qt';
@@ -171,7 +170,7 @@ public function findByPublicIdsAndQueueNameOrThrow(
'Could not find all requested queue tasks. Missing tasks: `"%s"`',
join('", "', $missingTaskIds)
),
- $missingTaskIds
+ $missingTaskIds,
);
}
@@ -182,11 +181,9 @@ public function findByPublicIdsAndQueueNameOrThrow(
* Best effort way to find out, whether there are a queue tasks, created by a given payload, that
* are already in the database (i.e. that are already dispatched).
*
- * @param AbstractQueuePayload $payload
- * @param int|null $queueTaskStatus
* @return QueueTaskInterface[]
*/
- public function findByQueuePayload(AbstractQueuePayload $payload, int $queueTaskStatus = null)
+ public function findByQueuePayload(AbstractQueuePayload $payload, ?int $queueTaskStatus = null)
{
$searchCriteria = [
'queueName' => $payload->getQueueName(),
@@ -206,13 +203,11 @@ public function findByQueuePayload(AbstractQueuePayload $payload, int $queueTask
return $results;
}
- private function assertDatabaseIsPostgres()
+ private function assertDatabaseIsPostgres(): void
{
- if ('postgresql' === $this->getEntityManager()->getConnection()->getDatabasePlatform()->getName()) {
- return;
+ if (!$this->getEntityManager()->getConnection()->getDatabasePlatform() instanceof PostgreSQLPlatform) {
+ throw new RuntimeException('Failed to assert, that database is PostgreSQL.');
}
-
- throw new \RuntimeException('Failed to assert, that database is PostgreSQL.');
}
/**
@@ -226,14 +221,10 @@ private function assertDatabaseIsPostgres()
*
* AND tableAlias.payload->>'field1' = 'value'
* AND tableAlias.payload->'nested'->'field'->>'field2' = '10'
- *
- * @param string $tableAlias
- * @param array $queueTaskPayloadCriteria
- * @return string
*/
private function translateQueueTaskPayloadCriteriaToSql(
string $tableAlias,
- array $queueTaskPayloadCriteria
+ array $queueTaskPayloadCriteria,
): string {
/** @var string[] $sqlLines */
$sqlLines = [];
@@ -256,7 +247,7 @@ private function translateQueueTaskPayloadCriteriaToSql(
"AND %s.payload%s = '%s'",
$tableAlias,
join('', $jsonLevels),
- $criterionValue
+ $criterionValue,
);
}
diff --git a/src/Printed/Bundle/Queue/Resources/config/services.yml b/src/Printed/Bundle/Queue/Resources/config/services.yml
index 64c6e95..e7dfb5f 100644
--- a/src/Printed/Bundle/Queue/Resources/config/services.yml
+++ b/src/Printed/Bundle/Queue/Resources/config/services.yml
@@ -12,6 +12,8 @@ services:
- # (argument populated dynamically in QueueExtension.php)
public: false
+ Printed\Bundle\Queue\ValueObject\QueueBundleOptions: '@printed.bundle.queue.service.queue_bundle_options'
+
# Service Container Parameters.
printed.bundle.queue.service.service_container_parameters:
class: 'Printed\Bundle\Queue\Service\ServiceContainerParameters'
@@ -19,6 +21,8 @@ services:
- '@service_container'
public: false
+ Printed\Bundle\Queue\Service\ServiceContainerParameters: '@printed.bundle.queue.service.service_container_parameters'
+
# Queue Task Dispatcher.
printed.bundle.queue.service.queue_task_dispatcher:
class: 'Printed\Bundle\Queue\Service\QueueTaskDispatcher'
@@ -30,6 +34,8 @@ services:
- '@printed.bundle.queue.service.uuid'
public: false
+ Printed\Bundle\Queue\Service\QueueTaskDispatcher: '@printed.bundle.queue.service.queue_task_dispatcher'
+
# Queue Maintenance Manager.
printed.bundle.queue.service.queue_maintenance:
class: 'Printed\Bundle\Queue\Service\QueueMaintenance'
@@ -37,12 +43,16 @@ services:
- # (argument populated dynamically in QueueExtension.php)
public: false
+ Printed\Bundle\Queue\Service\QueueMaintenance: '@printed.bundle.queue.service.queue_maintenance'
+
printed.bundle.queue.service.queue_maintenance.filesystem_queue_maintenance_strategy:
class: 'Printed\Bundle\Queue\Service\QueueMaintenance\FilesystemQueueMaintenanceStrategy'
arguments:
- '%kernel.cache_dir%'
public: false
+ Printed\Bundle\Queue\Service\QueueMaintenance\FilesystemQueueMaintenanceStrategy: '@printed.bundle.queue.service.queue_maintenance.filesystem_queue_maintenance_strategy'
+
printed.bundle.queue.service.queue_maintenance.cache_queue_maintenance_strategy:
class: 'Printed\Bundle\Queue\Service\QueueMaintenance\CacheQueueMaintenanceStrategy'
arguments:
@@ -50,6 +60,8 @@ services:
- "@logger"
public: false
+ Printed\Bundle\Queue\Service\QueueMaintenance\CacheQueueMaintenanceStrategy: '@printed.bundle.queue.service.queue_maintenance.cache_queue_maintenance_strategy'
+
# New deployments detector.
printed.bundle.queue.service.new_deployments_detector:
class: 'Printed\Bundle\Queue\Service\NewDeploymentsDetector'
@@ -57,10 +69,14 @@ services:
- # (argument populated dynamically in QueueExtension.php)
public: false
+ Printed\Bundle\Queue\Service\NewDeploymentsDetector: '@printed.bundle.queue.service.new_deployments_detector'
+
printed.bundle.queue.service.new_deployments_detector.noop_strategy:
class: 'Printed\Bundle\Queue\Service\NewDeploymentsDetector\NoopNewDeploymentsDetectorStrategy'
public: false
+ Printed\Bundle\Queue\Service\NewDeploymentsDetector\NoopNewDeploymentsDetectorStrategy: '@printed.bundle.queue.service.new_deployments_detector.noop_strategy'
+
printed.bundle.queue.service.new_deployments_detector.cache_strategy:
class: 'Printed\Bundle\Queue\Service\NewDeploymentsDetector\CacheNewDeploymentsDetectorStrategy'
arguments:
@@ -68,6 +84,8 @@ services:
- "@logger"
public: false
+ Printed\Bundle\Queue\Service\NewDeploymentsDetector\CacheNewDeploymentsDetectorStrategy: '@printed.bundle.queue.service.new_deployments_detector.cache_strategy'
+
# Vhost existence ensurer.
printed.bundle.queue.service.rabbit_mq_vhost_existence_ensurer:
class: 'Printed\Bundle\Queue\Service\RabbitMqVhostExistenceEnsurer'
@@ -76,6 +94,8 @@ services:
- '@printed.bundle.queue.service.queue_bundle_options'
public: false
+ Printed\Bundle\Queue\Service\RabbitMqVhostExistenceEnsurer: '@printed.bundle.queue.service.rabbit_mq_vhost_existence_ensurer'
+
# UUID service.
printed.bundle.queue.service.uuid:
class: 'Ramsey\Uuid\UuidFactory'
diff --git a/src/Printed/Bundle/Queue/Resources/config/services/helpers.yml b/src/Printed/Bundle/Queue/Resources/config/services/helpers.yml
index 1193b64..8b23e1d 100644
--- a/src/Printed/Bundle/Queue/Resources/config/services/helpers.yml
+++ b/src/Printed/Bundle/Queue/Resources/config/services/helpers.yml
@@ -4,4 +4,6 @@ services:
class: 'Printed\Bundle\Queue\Helper\QueueTaskHelper'
arguments:
- '@printed.bundle.queue.repository.queue_task'
- public: false
\ No newline at end of file
+ public: false
+
+ Printed\Bundle\Queue\Helper\QueueTaskHelper: '@printed.bundle.queue.helper.queue_task_helper'
diff --git a/src/Printed/Bundle/Queue/Resources/config/services/repositories.yml b/src/Printed/Bundle/Queue/Resources/config/services/repositories.yml
index 2a118bf..5d65068 100644
--- a/src/Printed/Bundle/Queue/Resources/config/services/repositories.yml
+++ b/src/Printed/Bundle/Queue/Resources/config/services/repositories.yml
@@ -7,3 +7,5 @@ services:
factory: ["@doctrine.orm.entity_manager", getRepository]
arguments:
- 'Printed\Bundle\Queue\Entity\QueueTask'
+
+ Printed\Bundle\Queue\Repository\QueueTaskRepository: '@printed.bundle.queue.repository.queue_task'
diff --git a/src/Printed/Bundle/Queue/Service/NewDeploymentsDetector.php b/src/Printed/Bundle/Queue/Service/NewDeploymentsDetector.php
index 1c5e7bc..30a2025 100644
--- a/src/Printed/Bundle/Queue/Service/NewDeploymentsDetector.php
+++ b/src/Printed/Bundle/Queue/Service/NewDeploymentsDetector.php
@@ -1,18 +1,16 @@
newDeploymentsDetectorStrategy = $newDeploymentsDetectorStrategy;
}
public function getCurrentDeploymentStamp(): string
@@ -20,7 +18,7 @@ public function getCurrentDeploymentStamp(): string
return $this->newDeploymentsDetectorStrategy->getCurrentDeploymentStamp();
}
- public function setCurrentDeploymentStamp(string $deploymentStamp)
+ public function setCurrentDeploymentStamp(string $deploymentStamp): void
{
$this->newDeploymentsDetectorStrategy->setCurrentDeploymentStamp($deploymentStamp);
}
diff --git a/src/Printed/Bundle/Queue/Service/NewDeploymentsDetector/CacheNewDeploymentsDetectorStrategy.php b/src/Printed/Bundle/Queue/Service/NewDeploymentsDetector/CacheNewDeploymentsDetectorStrategy.php
index 12e1773..5f107d7 100644
--- a/src/Printed/Bundle/Queue/Service/NewDeploymentsDetector/CacheNewDeploymentsDetectorStrategy.php
+++ b/src/Printed/Bundle/Queue/Service/NewDeploymentsDetector/CacheNewDeploymentsDetectorStrategy.php
@@ -1,5 +1,7 @@
cache = $cache;
- $this->logger = $logger;
}
public function getCurrentDeploymentStamp(): string
@@ -35,7 +29,7 @@ public function getCurrentDeploymentStamp(): string
return $this->cache->fetch(static::CACHE_KEY) ?: 'unset';
}
- public function setCurrentDeploymentStamp(string $deploymentStamp)
+ public function setCurrentDeploymentStamp(string $deploymentStamp): void
{
$result = $this->cache->save(static::CACHE_KEY, $deploymentStamp);
diff --git a/src/Printed/Bundle/Queue/Service/NewDeploymentsDetector/NewDeploymentsDetectorStrategyInterface.php b/src/Printed/Bundle/Queue/Service/NewDeploymentsDetector/NewDeploymentsDetectorStrategyInterface.php
index 4f20b55..b7d2cb7 100644
--- a/src/Printed/Bundle/Queue/Service/NewDeploymentsDetector/NewDeploymentsDetectorStrategyInterface.php
+++ b/src/Printed/Bundle/Queue/Service/NewDeploymentsDetector/NewDeploymentsDetectorStrategyInterface.php
@@ -24,5 +24,5 @@ public function getCurrentDeploymentStamp(): string;
* @param string $deploymentStamp
* @return void
*/
- public function setCurrentDeploymentStamp(string $deploymentStamp);
+ public function setCurrentDeploymentStamp(string $deploymentStamp): void;
}
diff --git a/src/Printed/Bundle/Queue/Service/NewDeploymentsDetector/NoopNewDeploymentsDetectorStrategy.php b/src/Printed/Bundle/Queue/Service/NewDeploymentsDetector/NoopNewDeploymentsDetectorStrategy.php
index d25ba04..955fdba 100644
--- a/src/Printed/Bundle/Queue/Service/NewDeploymentsDetector/NoopNewDeploymentsDetectorStrategy.php
+++ b/src/Printed/Bundle/Queue/Service/NewDeploymentsDetector/NoopNewDeploymentsDetectorStrategy.php
@@ -1,5 +1,7 @@
queueMaintenanceStrategy = $queueMaintenanceStrategy;
}
public function isEnabled(): bool
@@ -20,12 +18,12 @@ public function isEnabled(): bool
return $this->queueMaintenanceStrategy->isEnabled();
}
- public function enable()
+ public function enable(): void
{
$this->queueMaintenanceStrategy->enable();
}
- public function disable()
+ public function disable(): void
{
$this->queueMaintenanceStrategy->disable();
}
diff --git a/src/Printed/Bundle/Queue/Service/QueueMaintenance/CacheQueueMaintenanceStrategy.php b/src/Printed/Bundle/Queue/Service/QueueMaintenance/CacheQueueMaintenanceStrategy.php
index 53905bc..a41e83e 100644
--- a/src/Printed/Bundle/Queue/Service/QueueMaintenance/CacheQueueMaintenanceStrategy.php
+++ b/src/Printed/Bundle/Queue/Service/QueueMaintenance/CacheQueueMaintenanceStrategy.php
@@ -1,9 +1,12 @@
cache = $cache;
- $this->logger = $logger;
}
/**
@@ -67,7 +62,7 @@ public function isEnabled(): bool
/**
* @inheritdoc
*/
- public function enable()
+ public function enable(): void
{
$result = $this->cache->save(static::CACHE_KEY, time());
@@ -75,7 +70,7 @@ public function enable()
return;
}
- throw new \RuntimeException(join(' ', [
+ throw new RuntimeException(join(' ', [
"Couldn't enable queue maintenance mode, because saving the maintenance marker in",
'cache server failed for unknown reason. Please check, whether the cache server is running',
'and whether your cache configuration is correct.',
@@ -85,7 +80,7 @@ public function enable()
/**
* @inheritdoc
*/
- public function disable()
+ public function disable(): void
{
$result = $this->cache->delete(static::CACHE_KEY);
diff --git a/src/Printed/Bundle/Queue/Service/QueueMaintenance/FilesystemQueueMaintenanceStrategy.php b/src/Printed/Bundle/Queue/Service/QueueMaintenance/FilesystemQueueMaintenanceStrategy.php
index ce80035..e09096a 100644
--- a/src/Printed/Bundle/Queue/Service/QueueMaintenance/FilesystemQueueMaintenanceStrategy.php
+++ b/src/Printed/Bundle/Queue/Service/QueueMaintenance/FilesystemQueueMaintenanceStrategy.php
@@ -1,5 +1,7 @@
fileSystem->dumpFile($this->lockFileFullPath, time());
}
@@ -53,8 +53,8 @@ public function enable()
/**
* @inheritdoc
*/
- public function disable()
+ public function disable(): void
{
$this->fileSystem->remove($this->lockFileFullPath);
}
-}
\ No newline at end of file
+}
diff --git a/src/Printed/Bundle/Queue/Service/QueueMaintenance/QueueMaintenanceStrategyInterface.php b/src/Printed/Bundle/Queue/Service/QueueMaintenance/QueueMaintenanceStrategyInterface.php
index a546b5a..69cc59f 100644
--- a/src/Printed/Bundle/Queue/Service/QueueMaintenance/QueueMaintenanceStrategyInterface.php
+++ b/src/Printed/Bundle/Queue/Service/QueueMaintenance/QueueMaintenanceStrategyInterface.php
@@ -17,10 +17,10 @@ public function isEnabled(): bool;
/**
* @return void
*/
- public function enable();
+ public function enable(): void;
/**
* @return void
*/
- public function disable();
-}
\ No newline at end of file
+ public function disable(): void;
+}
diff --git a/src/Printed/Bundle/Queue/Service/QueueTaskDispatcher.php b/src/Printed/Bundle/Queue/Service/QueueTaskDispatcher.php
index 186cc91..0c50932 100644
--- a/src/Printed/Bundle/Queue/Service/QueueTaskDispatcher.php
+++ b/src/Printed/Bundle/Queue/Service/QueueTaskDispatcher.php
@@ -1,7 +1,13 @@
em = $em;
- $this->logger = $logger;
- $this->validator = $validator;
- $this->payloadsDelayedUntilNextDoctrineFlush = [];
- $this->dispatchingOnDoctrineFlushPayloads = false;
-
- $this->defaultRabbitMqProducer = $defaultRabbitMqProducer;
- $this->uuidGenerator = $uuidGenerator;
}
/**
@@ -95,7 +76,7 @@ public function dispatch(AbstractQueuePayload $payload, array $options = []): Qu
$options['preQueueTaskDispatchFn']
&& !is_callable($options['preQueueTaskDispatchFn'])
) {
- throw new \InvalidArgumentException("`preQueueTaskDispatchFn` must either be a callable or a null");
+ throw new InvalidArgumentException("`preQueueTaskDispatchFn` must either be a callable or a null");
}
$task = new QueueTask;
@@ -108,7 +89,7 @@ public function dispatch(AbstractQueuePayload $payload, array $options = []): Qu
$task->setPayloadClass(get_class($payload));
$task->setPayload($payload->getProperties());
- $task->setCreatedDate(new \DateTime);
+ $task->setCreatedDate(new DateTime);
$this->em->persist($task);
$this->em->flush($task);
@@ -148,33 +129,13 @@ public function dispatch(AbstractQueuePayload $payload, array $options = []): Qu
* in the payload), then pass a "payload creator function", which is invoked after final Doctrine flush but before
* the queue task dispatch.
*
- * @param AbstractQueuePayload|callable $payloadOrPayloadCreatorFn This must not be a callable in the array form.
+ * @param Closure|AbstractQueuePayload $payloadOrPayloadCreatorFn This must not be a callable in the array form.
* The callable must return an instance of AbstractQueuePayload and require no arguments
+ *
* @return ScheduledQueueTask
*/
- public function dispatchAfterNextEntityManagerFlush($payloadOrPayloadCreatorFn): ScheduledQueueTask
+ public function dispatchAfterNextEntityManagerFlush(Closure|AbstractQueuePayload $payloadOrPayloadCreatorFn): ScheduledQueueTask
{
- if (
- !$payloadOrPayloadCreatorFn instanceof AbstractQueuePayload
- && !is_callable($payloadOrPayloadCreatorFn)
- ) {
- throw new \InvalidArgumentException(sprintf(
- 'Argument to `%s` function must either be an instance of `%s` or a callable',
- __METHOD__,
- AbstractQueuePayload::class
- ));
- }
-
- /*
- * Disallow "array" callables because I can't get object hashes of arrays.
- */
- if (is_array($payloadOrPayloadCreatorFn)) {
- throw new \InvalidArgumentException(sprintf(
- 'Callables in the array form are not supported in `%s`.',
- __METHOD__
- ));
- }
-
$existingScheduledQueueTask = $this->payloadsDelayedUntilNextDoctrineFlush[spl_object_hash($payloadOrPayloadCreatorFn)]
?? null;
@@ -205,7 +166,7 @@ public function dispatchAfterNextEntityManagerFlush($payloadOrPayloadCreatorFn):
/**
* @internal Don't use this method.
*/
- public function dispatchOnDoctrineFlushPayloads()
+ public function dispatchOnDoctrineFlushPayloads(): void
{
if (
!$this->payloadsDelayedUntilNextDoctrineFlush
@@ -217,8 +178,6 @@ public function dispatchOnDoctrineFlushPayloads()
$this->dispatchingOnDoctrineFlushPayloads = true;
foreach ($this->payloadsDelayedUntilNextDoctrineFlush as $scheduledQueueTask) {
- /** @var ScheduledQueueTask $scheduledQueueTask */
-
$payload = $scheduledQueueTask->getPayload();
$isPayloadConstructedLate = false;
@@ -247,7 +206,7 @@ public function dispatchOnDoctrineFlushPayloads()
$this->dispatchingOnDoctrineFlushPayloads = false;
}
- private function throwIfPayloadInvalid(AbstractQueuePayload $payload)
+ private function throwIfPayloadInvalid(AbstractQueuePayload $payload): void
{
$errors = $this->validator->validate($payload);
diff --git a/src/Printed/Bundle/Queue/Service/RabbitMqVhostExistenceEnsurer.php b/src/Printed/Bundle/Queue/Service/RabbitMqVhostExistenceEnsurer.php
index 0b1c13d..5b08006 100644
--- a/src/Printed/Bundle/Queue/Service/RabbitMqVhostExistenceEnsurer.php
+++ b/src/Printed/Bundle/Queue/Service/RabbitMqVhostExistenceEnsurer.php
@@ -1,50 +1,45 @@
logger = $logger;
$this->rabbitmqUser = $queueBundleOptions->get('rabbitmq_user');
$this->rabbitmqPassword = $queueBundleOptions->get('rabbitmq_password');
$this->rabbitmqVhost = $queueBundleOptions->get('rabbitmq_vhost');
$this->rabbitmqApiBaseUrl = $queueBundleOptions->get('rabbitmq_api_base_url');
- $this->rabbitmqManagementClient = new RabbitMq\ManagementApi\Client(
+ $this->rabbitmqManagementClient = new RabbitMqManagementClient(
null,
$this->rabbitmqApiBaseUrl,
$this->rabbitmqUser,
- $this->rabbitmqPassword
+ $this->rabbitmqPassword,
);
}
- public function ensure()
+ public function ensure(): void
{
$rabbitmqUser = $this->rabbitmqUser;
$rabbitmqVhost = $this->rabbitmqVhost;
@@ -57,7 +52,7 @@ public function ensure()
$this->logger->info('Done.');
}
- private function ensureVhostExists(string $rabbitmqVhost)
+ private function ensureVhostExists(string $rabbitmqVhost): void
{
$vhostOrError = $this->rabbitmqManagementClient->vhosts()->get($rabbitmqVhost);
@@ -73,10 +68,10 @@ private function ensureVhostExists(string $rabbitmqVhost)
*/
$responseErrorType = $vhostOrError['error'] ?? '(undefined)';
if ($responseErrorType !== 'Object Not Found') {
- throw new \RuntimeException(sprintf(
+ throw new RuntimeException(sprintf(
"Expected rabbitmq's api not found error, but got: `%s`. Full response: `%s`",
$responseErrorType,
- json_encode($vhostOrError)
+ json_encode($vhostOrError),
));
}
@@ -86,7 +81,7 @@ private function ensureVhostExists(string $rabbitmqVhost)
$response = $this->rabbitmqManagementClient->vhosts()->create($rabbitmqVhost);
if ($response['error'] ?? false) {
- throw new \RuntimeException(
+ throw new RuntimeException(
sprintf("Couldn't create rabbitmq vhost. Error: `%s`", json_encode($response))
);
}
@@ -94,7 +89,7 @@ private function ensureVhostExists(string $rabbitmqVhost)
$this->logger->info("Created rabbitmq vhost: `{$rabbitmqVhost}`");
}
- private function ensureVhostUserPermissionsSet(string $rabbitmqVhost, string $rabbitmqUser)
+ private function ensureVhostUserPermissionsSet(string $rabbitmqVhost, string $rabbitmqUser): void
{
$expectedPermissions = [
'user' => $rabbitmqUser,
@@ -123,16 +118,15 @@ private function ensureVhostUserPermissionsSet(string $rabbitmqVhost, string $ra
'configure' => $expectedPermissions['configure'],
'write' => $expectedPermissions['write'],
'read' => $expectedPermissions['read'],
- ]
+ ],
);
if ($response['error'] ?? false) {
- throw new \RuntimeException(
+ throw new RuntimeException(
sprintf("Couldn't create rabbitmq vhost permissions. Error: `%s`", json_encode($response))
);
}
$this->logger->info("Created permissions for rabbitmq vhost and user: `{$rabbitmqVhost}`, `{$rabbitmqUser}`");
}
-
}
diff --git a/src/Printed/Bundle/Queue/Service/ServiceContainerParameters.php b/src/Printed/Bundle/Queue/Service/ServiceContainerParameters.php
index 59a108e..bc7365d 100644
--- a/src/Printed/Bundle/Queue/Service/ServiceContainerParameters.php
+++ b/src/Printed/Bundle/Queue/Service/ServiceContainerParameters.php
@@ -1,5 +1,7 @@
getParameterBag()->all());
}
-}
\ No newline at end of file
+}
diff --git a/src/Printed/Bundle/Queue/ValueObject/ScheduledQueueTask.php b/src/Printed/Bundle/Queue/ValueObject/ScheduledQueueTask.php
index e56ce74..6eb68fe 100644
--- a/src/Printed/Bundle/Queue/ValueObject/ScheduledQueueTask.php
+++ b/src/Printed/Bundle/Queue/ValueObject/ScheduledQueueTask.php
@@ -2,58 +2,47 @@
namespace Printed\Bundle\Queue\ValueObject;
+use Closure;
+use InvalidArgumentException;
use Printed\Bundle\Queue\EntityInterface\QueueTaskInterface;
use Printed\Bundle\Queue\Queue\AbstractQueuePayload;
+use RuntimeException;
/**
* A class, that holds a queue payload, that is dispatched at later point of time. Think of it
* as of a Promise.
*
- * When exactly the actual task is dispatched, is dependant on the feature, that created the
+ * When exactly the actual task is dispatched, is dependent on the feature, that created the
* instance of this class. See usages to get an idea.
*/
class ScheduledQueueTask
{
+ /** @var callable|null See QueueTaskDispatcher::dispatch() */
+ private $preQueueTaskDispatchFn;
+
/**
* When payload is not defined then the $payloadCreatorFn is used to construct it just before the queue task is
* dispatched (i.e. after the entity manager flush).
*
* That's impossible for both the payload and the payload creator function not to be set.
*
- * @var AbstractQueuePayload|null
+ * @param Closure|null $payloadCreatorFn See QueueTaskDispatcher::dispatch()
+ * @param QueueTaskInterface|null $queueTask Defined, when the task is dispatched
*/
- private $payload;
-
- /** @var callable|null */
- private $payloadCreatorFn;
-
- /** @var callable|null See QueueTaskDispatcher::dispatch() */
- private $preQueueTaskDispatchFn;
-
- /** @var QueueTaskInterface|null Defined, when the task is dispatched */
- private $queueTask;
-
public function __construct(
- AbstractQueuePayload $payload = null,
- callable $payloadCreatorFn = null,
- QueueTaskInterface $queueTask = null
+ private ?AbstractQueuePayload $payload = null,
+ private readonly ?Closure $payloadCreatorFn = null,
+ private ?QueueTaskInterface $queueTask = null,
) {
if (!$payload && !$payloadCreatorFn) {
- throw new \InvalidArgumentException(sprintf(
+ throw new InvalidArgumentException(sprintf(
"Can't construct `%s` without providing either the queue payload or the queue payload creator function",
get_class()
));
}
-
- $this->payload = $payload;
- $this->payloadCreatorFn = $payloadCreatorFn;
- $this->queueTask = $queueTask;
}
- /**
- * @return AbstractQueuePayload|null
- */
- public function getPayload()
+ public function getPayload(): ?AbstractQueuePayload
{
return $this->payload;
}
@@ -61,49 +50,37 @@ public function getPayload()
public function getPayloadOrThrow(): AbstractQueuePayload
{
if (!$this->payload) {
- throw new \RuntimeException("The queue payload isn't constructed yet. It will be after the final EntityManager flush");
+ throw new RuntimeException("The queue payload isn't constructed yet. It will be after the final EntityManager flush");
}
return $this->payload;
}
- /**
- * @return callable|null
- */
- public function getPreQueueTaskDispatchFn()
+ public function getPreQueueTaskDispatchFn(): ?callable
{
return $this->preQueueTaskDispatchFn;
}
- public function setPreQueueTaskDispatchFn(callable $preQueueTaskDispatchFn = null)
+ public function setPreQueueTaskDispatchFn(callable $preQueueTaskDispatchFn = null): void
{
$this->preQueueTaskDispatchFn = $preQueueTaskDispatchFn;
}
- /**
- * @return QueueTaskInterface|null
- */
- public function getQueueTask()
+ public function getQueueTask(): ?QueueTaskInterface
{
return $this->queueTask;
}
- /**
- * @return QueueTaskInterface
- */
public function getQueueTaskOrThrow(): QueueTaskInterface
{
if (!$this->queueTask) {
- throw new \RuntimeException("Can't retrieve scheduled queue task, because it's not been dispatched yet.");
+ throw new RuntimeException("Can't retrieve scheduled queue task, because it's not been dispatched yet.");
}
return $this->queueTask;
}
- /**
- * @param QueueTaskInterface|null $queueTask
- */
- public function setQueueTask(QueueTaskInterface $queueTask = null)
+ public function setQueueTask(QueueTaskInterface $queueTask = null): void
{
$this->queueTask = $queueTask;
}
@@ -114,17 +91,17 @@ public function setQueueTask(QueueTaskInterface $queueTask = null)
public function constructAndGetPayload(): AbstractQueuePayload
{
if ($this->payload) {
- throw new \RuntimeException('Queue payload is already constructed');
+ throw new RuntimeException('Queue payload is already constructed');
}
if (!$this->payloadCreatorFn) {
- throw new \RuntimeException("Can't construct the queue payload because the payload creator function wasn't provided");
+ throw new RuntimeException("Can't construct the queue payload because the payload creator function wasn't provided");
}
$this->payload = call_user_func($this->payloadCreatorFn);
if (!$this->payload instanceof AbstractQueuePayload) {
- throw new \RuntimeException("Queue payload creator function didn't construct an instance of a queue payload");
+ throw new RuntimeException("Queue payload creator function didn't construct an instance of a queue payload");
}
return $this->payload;