Skip to content

Commit

Permalink
Added ability to customize WorkerOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
Plytas committed May 24, 2024
1 parent e692e30 commit 1e3dacb
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 1 deletion.
25 changes: 25 additions & 0 deletions src/CloudTasksQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Google\Protobuf\Timestamp;
use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Queue\Queue as LaravelQueue;
use Illuminate\Queue\WorkerOptions;
use Illuminate\Support\Str;
use Stackkit\LaravelGoogleCloudTasksQueue\Events\TaskCreated;

Expand All @@ -27,6 +28,9 @@ class CloudTasksQueue extends LaravelQueue implements QueueContract

private static ?Closure $taskHeadersCallback = null;

/** @var (Closure(IncomingTask): WorkerOptions)|null */
private static ?Closure $workerOptionsCallback = null;

public function __construct(public array $config, public CloudTasksClient $client, public $dispatchAfterCommit = false)
{
//
Expand All @@ -52,6 +56,27 @@ public static function forgetTaskHeadersCallback(): void
self::$taskHeadersCallback = null;
}

/**
* @param Closure(IncomingTask): WorkerOptions $callback
*/
public static function configureWorkerOptionsUsing(Closure $callback): void
{
static::$workerOptionsCallback = $callback;
}

/**
* @return (Closure(IncomingTask): WorkerOptions)|null
*/
public static function getWorkerOptionsCallback(): ?Closure
{
return self::$workerOptionsCallback;
}

public static function forgetWorkerOptionsCallback(): void
{
self::$workerOptionsCallback = null;
}

/**
* Get the size of the queue.
*
Expand Down
2 changes: 1 addition & 1 deletion src/TaskHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private function run(IncomingTask $task): void
tap(app('cloud-tasks.worker'), fn (Worker $worker) => $worker->process(
connectionName: $job->getConnectionName(),
job: $job,
options: $this->getWorkerOptions()
options: CloudTasksQueue::getWorkerOptionsCallback() ? (CloudTasksQueue::getWorkerOptionsCallback())($task) : $this->getWorkerOptions()
));
}

Expand Down
10 changes: 10 additions & 0 deletions tests/Support/FailingJobWithNoMaxTries.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Tests\Support;

class FailingJobWithNoMaxTries extends FailingJob
{
public $tries = null;
}
45 changes: 45 additions & 0 deletions tests/TaskHandlerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@
namespace Tests;

use Illuminate\Queue\Events\JobReleasedAfterException;
use Illuminate\Queue\WorkerOptions;
use Illuminate\Support\Facades\Event;
use Override;
use PHPUnit\Framework\Attributes\Test;
use PHPUnit\Framework\Attributes\TestWith;
use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksApi;
use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksQueue;
use Stackkit\LaravelGoogleCloudTasksQueue\IncomingTask;
use Tests\Support\EncryptedJob;
use Tests\Support\FailingJob;
use Tests\Support\FailingJobWithMaxTries;
use Tests\Support\FailingJobWithMaxTriesAndRetryUntil;
use Tests\Support\FailingJobWithNoMaxTries;
use Tests\Support\FailingJobWithRetryUntil;
use Tests\Support\FailingJobWithUnlimitedTries;
use Tests\Support\JobOutput;
Expand All @@ -28,6 +33,14 @@ protected function setUp(): void
CloudTasksApi::fake();
}

#[Override]
protected function tearDown(): void
{
parent::tearDown();

CloudTasksQueue::forgetWorkerOptionsCallback();
}

#[Test]
public function it_can_run_a_task()
{
Expand Down Expand Up @@ -77,6 +90,38 @@ public function after_max_attempts_it_will_log_to_failed_table()
$this->assertDatabaseCount('failed_jobs', 1);
}

#[Test]
public function uses_worker_options_callback_and_after_max_attempts_it_will_log_to_failed_table()
{
// Arrange
CloudTasksQueue::configureWorkerOptionsUsing(function (IncomingTask $task) {
$queueTries = [
'high' => 5,
'low' => 1,
];

return new WorkerOptions(maxTries: $queueTries[$task->queue()] ?? 1);
});

$job = $this->dispatch(tap(new FailingJobWithNoMaxTries(), fn ($job) => $job->queue = 'high'));

// Act & Assert
$this->assertDatabaseCount('failed_jobs', 0);

$releasedJob = $job->runAndGetReleasedJob();
$this->assertDatabaseCount('failed_jobs', 0);

$releasedJob = $releasedJob->runAndGetReleasedJob();
$this->assertDatabaseCount('failed_jobs', 0);
$releasedJob = $releasedJob->runAndGetReleasedJob();
$this->assertDatabaseCount('failed_jobs', 0);
$releasedJob = $releasedJob->runAndGetReleasedJob();
$this->assertDatabaseCount('failed_jobs', 0);

$releasedJob->run();
$this->assertDatabaseCount('failed_jobs', 1);
}

#[Test]
public function after_max_attempts_it_will_no_longer_execute_the_task()
{
Expand Down

0 comments on commit 1e3dacb

Please sign in to comment.