From 1e3dacb411bb8ec3fa2d9b98336248282cff7e3e Mon Sep 17 00:00:00 2001 From: Vytautas Smilingis Date: Fri, 24 May 2024 16:27:39 +0200 Subject: [PATCH] Added ability to customize `WorkerOptions` --- src/CloudTasksQueue.php | 25 ++++++++++++ src/TaskHandler.php | 2 +- tests/Support/FailingJobWithNoMaxTries.php | 10 +++++ tests/TaskHandlerTest.php | 45 ++++++++++++++++++++++ 4 files changed, 81 insertions(+), 1 deletion(-) create mode 100644 tests/Support/FailingJobWithNoMaxTries.php diff --git a/src/CloudTasksQueue.php b/src/CloudTasksQueue.php index 15738eb..1a32cac 100644 --- a/src/CloudTasksQueue.php +++ b/src/CloudTasksQueue.php @@ -15,6 +15,7 @@ use Google\Protobuf\Timestamp; use Illuminate\Contracts\Queue\Queue as QueueContract; use Illuminate\Queue\Queue as LaravelQueue; +use Illuminate\Queue\WorkerOptions; use Illuminate\Support\Str; use Stackkit\LaravelGoogleCloudTasksQueue\Events\TaskCreated; @@ -27,6 +28,9 @@ class CloudTasksQueue extends LaravelQueue implements QueueContract private static ?Closure $taskHeadersCallback = null; + /** @var (Closure(IncomingTask): WorkerOptions)|null */ + private static ?Closure $workerOptionsCallback = null; + public function __construct(public array $config, public CloudTasksClient $client, public $dispatchAfterCommit = false) { // @@ -52,6 +56,27 @@ public static function forgetTaskHeadersCallback(): void self::$taskHeadersCallback = null; } + /** + * @param Closure(IncomingTask): WorkerOptions $callback + */ + public static function configureWorkerOptionsUsing(Closure $callback): void + { + static::$workerOptionsCallback = $callback; + } + + /** + * @return (Closure(IncomingTask): WorkerOptions)|null + */ + public static function getWorkerOptionsCallback(): ?Closure + { + return self::$workerOptionsCallback; + } + + public static function forgetWorkerOptionsCallback(): void + { + self::$workerOptionsCallback = null; + } + /** * Get the size of the queue. * diff --git a/src/TaskHandler.php b/src/TaskHandler.php index 2cab73e..b47f61d 100644 --- a/src/TaskHandler.php +++ b/src/TaskHandler.php @@ -61,7 +61,7 @@ private function run(IncomingTask $task): void tap(app('cloud-tasks.worker'), fn (Worker $worker) => $worker->process( connectionName: $job->getConnectionName(), job: $job, - options: $this->getWorkerOptions() + options: CloudTasksQueue::getWorkerOptionsCallback() ? (CloudTasksQueue::getWorkerOptionsCallback())($task) : $this->getWorkerOptions() )); } diff --git a/tests/Support/FailingJobWithNoMaxTries.php b/tests/Support/FailingJobWithNoMaxTries.php new file mode 100644 index 0000000..65273ee --- /dev/null +++ b/tests/Support/FailingJobWithNoMaxTries.php @@ -0,0 +1,10 @@ +assertDatabaseCount('failed_jobs', 1); } + #[Test] + public function uses_worker_options_callback_and_after_max_attempts_it_will_log_to_failed_table() + { + // Arrange + CloudTasksQueue::configureWorkerOptionsUsing(function (IncomingTask $task) { + $queueTries = [ + 'high' => 5, + 'low' => 1, + ]; + + return new WorkerOptions(maxTries: $queueTries[$task->queue()] ?? 1); + }); + + $job = $this->dispatch(tap(new FailingJobWithNoMaxTries(), fn ($job) => $job->queue = 'high')); + + // Act & Assert + $this->assertDatabaseCount('failed_jobs', 0); + + $releasedJob = $job->runAndGetReleasedJob(); + $this->assertDatabaseCount('failed_jobs', 0); + + $releasedJob = $releasedJob->runAndGetReleasedJob(); + $this->assertDatabaseCount('failed_jobs', 0); + $releasedJob = $releasedJob->runAndGetReleasedJob(); + $this->assertDatabaseCount('failed_jobs', 0); + $releasedJob = $releasedJob->runAndGetReleasedJob(); + $this->assertDatabaseCount('failed_jobs', 0); + + $releasedJob->run(); + $this->assertDatabaseCount('failed_jobs', 1); + } + #[Test] public function after_max_attempts_it_will_no_longer_execute_the_task() {