Skip to content

BrandEmbassy/queue-management

Folders and files

NameName
Last commit message
Last commit date

Latest commit

87756c3 · Nov 14, 2024
Feb 1, 2024
Nov 14, 2024
Nov 14, 2024
Jan 31, 2024
Jan 31, 2024
Apr 15, 2024
Nov 13, 2024
Jul 18, 2024
Nov 14, 2024
Apr 14, 2023
Apr 15, 2024
Nov 13, 2024
Feb 1, 2024

Repository files navigation

Queue management

Usage

1. Create Job class

<?php declare(strict_types = 1);

use BE\QueueManagement\Jobs\SimpleJob;

class ExampleJob extends SimpleJob
{
    public const JOB_NAME = 'exampleJob';
    public const PARAMETER_FOO = 'foo';


    public function getFoo(): string
    {
        return $this->getParameter(self::PARAMETER_FOO);
    }
}

2. Create job processor

<?php declare(strict_types = 1);

use BE\QueueManagement\Jobs\Execution\JobProcessorInterface;
use BE\QueueManagement\Jobs\JobInterface;
use YourApp\Jobs\ExampleJob;

class ExampleJobProcessor implements JobProcessorInterface
{
    public function process(JobInterface $job): void
    {
        assert($job instanceof ExampleJob);
        
        echo $job->getFoo();
    }
}

3. Create job definition

For example using neon DI:

parameters:
    queue:
        jobs:
            defaultJobLoader: BE\QueueManagement\Jobs\Loading\SimpleJobLoader()
            jobDefinitions:
                exampleJob:
                    class: YourApp\Jobs\ExampleJob
                    queueName: example_queue
                    maxAttempts: 20 # null means no limit
                    jobLoader: YourApp\JobLoaders\ExampleJobLoader() # if not set default job loader is used
                    jobDelayRule: BE\QueueManagement\Jobs\FailResolving\DelayRules\ConstantDelayRule()
                    jobProcessorService: queue.processors.exampleJobProcessor

services:
    queue.processors.exampleJobProcessor: YourApp\JobProcessors\ExampleJobProcessor 

    # JobDefinitionsContainer
    queue.jobDefinitionsContainer: BE\QueueManagement\Jobs\JobDefinitions\JobDefinitionsContainer(%queue.jobs.jobDefinitions%)

4. Push job into queue

<?php declare(strict_types = 1);

use BE\QueueManagement\Queue\QueueManagerInterface;
use YourApp\Jobs\ExampleJob;
use BE\QueueManagement\Jobs\JobInterface;
use BE\QueueManagement\Jobs\JobDefinitions\JobDefinitionsContainer;

class JobPusher
{

    /**
     * @var QueueManagerInterface
     */
    protected $queueManager;

    /**
     * @var JobDefinitionsContainer
     */
    private $jobDefinitionsContainer;


    public function __construct(QueueManagerInterface $queueManager, JobDefinitionsContainer $jobDefinitionsContainer) {
        $this->queueManager = $queueManager;
        $this->jobDefinitionsContainer = $jobDefinitionsContainer;
    }
    

    protected function push(string $jobUuid): void
    {
        $jobDefinition = $this->jobDefinitionsContainer->get(ExampleJob::JOB_NAME);

        $exampleJob = new ExampleJob(
            $jobUuid,
            new DateTimeImmutable(),
            JobInterface::INIT_ATTEMPTS,
            $jobDefinition,
            new ArrayCollection([ExampleJob::PARAMETER_FOO => 'bar'])
        );

        $this->queueManager->push($exampleJob);
    }
}

5. Run worker

<?php declare(strict_types = 1);

namespace BE\AdapterSdk\Console\Commands\Queue;

use BE\QueueManagement\Queue\WorkerInterface;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Command\Command;

class WorkerStartCommand extends Command
{
    /**
     * @var WorkerInterface
     */
    private $worker;


    public function __construct(WorkerInterface $worker)
    {
        parent::__construct();
        $this->worker = $worker;
    }


    protected function configure(): void
    {
        $this->setName('queue:worker:start');
        $this->setDescription('Start queue worker');
    }


    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $queueName = $input->getArgument(self::QUEUE_NAME);
        assert(is_string($queueName));

        $this->sqsWorker->start($queueName);
        
        $output->writeln('Worker started');

        return 0;
    }
}