Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[10.x] Add queue job debouncing #50347

Closed
wants to merge 12 commits into from
31 changes: 31 additions & 0 deletions src/Illuminate/Foundation/Bus/Dispatchable.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
namespace Illuminate\Foundation\Bus;

use Closure;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Bus\Dispatcher;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Support\Fluent;

trait Dispatchable
Expand Down Expand Up @@ -41,6 +43,35 @@ public static function dispatchIf($boolean, ...$arguments)
: new Fluent;
}

/**
* Dispatch the job with the given arguments unless the job is already queued.
*
* @see \Illuminate\Queue\Middleware\Debounced::handle()
*/
// todo - add \DateTimeInterface|\DateInterval as accepted types for wait
public static function dispatchDebounced(int $wait, ...$arguments): PendingDispatch
{
$dispatchable = new static(...$arguments);

if (!in_array(Queueable::class, class_uses_recursive(static::class), true)) {
throw new \InvalidArgumentException(
'Debounced jobs must use the '.class_basename(Queueable::class). ' trait.'
);
}

$key = 'debounced.'.get_class($dispatchable);

if ($dispatchable instanceof ShouldBeUnique && method_exists($dispatchable, 'uniqueId')) {
// use the uniqueId to debounce by if defined
$key .= '.uniqueBy.'.$dispatchable->uniqueId();
}

cache()->forever($key, now()->addSeconds($wait)->toISOString());
cache()->increment($key.'.count');

return (new PendingDispatch($dispatchable))->delay($wait);
}

/**
* Dispatch the job with the given arguments unless the given truth test passes.
*
Expand Down
7 changes: 6 additions & 1 deletion src/Illuminate/Queue/CallQueuedHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
use Illuminate\Database\Eloquent\ModelNotFoundException;
use Illuminate\Pipeline\Pipeline;
use Illuminate\Queue\Middleware\Debounced;
use ReflectionClass;
use RuntimeException;

Expand Down Expand Up @@ -118,7 +119,11 @@ protected function dispatchThroughMiddleware(Job $job, $command)
}

return (new Pipeline($this->container))->send($command)
->through(array_merge(method_exists($command, 'middleware') ? $command->middleware() : [], $command->middleware ?? []))
->through(array_merge(
method_exists($command, 'middleware') ? $command->middleware() : [],
$command->middleware ?? [],
[new Debounced()]
))
->then(function ($command) use ($job) {
return $this->dispatcher->dispatchNow(
$command, $this->resolveHandler($job, $command)
Expand Down
74 changes: 74 additions & 0 deletions src/Illuminate/Queue/Middleware/Debounced.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?php

declare(strict_types=1);

namespace Illuminate\Queue\Middleware;

use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Support\Carbon;

class Debounced
{
/**
* @param InteractsWithQueue|mixed $job
* @param $next
*
* @return mixed|void
*/
public function handle(mixed $job, $next)
{
if (($job->connection ?? $job->job->getConnectionName()) === 'sync') {

// if (config('app.debug') && app()->isLocal()) {
// throw new \LogicException('Debounced jobs must not run on the sync queue.');
// }

$next($job);
return;
}

$key = 'debounced.'.get_class($job);

if ($job instanceof ShouldBeUnique && method_exists($job, 'uniqueId')) {
// use the uniqueId to debounce by if defined
$key .= '.uniqueBy.'.$job->uniqueId();
}

$intendedExecutionTime = cache()->pull($key);

if (
// if there's a value for this key, this is a debounced job
! is_null($intendedExecutionTime) &&
! in_array(InteractsWithQueue::class, class_uses_recursive($job), true)
) {
// using the class-string so there's a hard reference
$traitName = class_basename(InteractsWithQueue::class);
throw new \InvalidArgumentException("The Debounced jobs must use the $traitName trait.");
}

$count = cache()->pull($key.'.count', 1);

if ($count > 1) {
// this is an earlier job, so we should delete it
$job->delete();

// decrement the count
cache()->forever($key.'.count', $count - 1);
return;
}

if ($intendedExecutionTime) {
$intendedExecutionTime = Carbon::parse($intendedExecutionTime);

if ($intendedExecutionTime->gt(now())) {
// ensure that the intended execution time from the last job is used
$job->release($intendedExecutionTime->diffInSeconds(now(), false));
return;
}
}

// todo - this is still marked as RUNNING and DONE in the console for every job despite only the last job executes (JobProcessing, JobProcessed events still fire)
$next($job);
}
}
44 changes: 40 additions & 4 deletions src/Illuminate/Queue/QueueServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@

use Aws\DynamoDb\DynamoDbClient;
use Illuminate\Contracts\Debug\ExceptionHandler;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Support\DeferrableProvider;
use Illuminate\Queue\Connectors\BeanstalkdConnector;
use Illuminate\Queue\Connectors\DatabaseConnector;
use Illuminate\Queue\Connectors\NullConnector;
use Illuminate\Queue\Connectors\RedisConnector;
use Illuminate\Queue\Connectors\SqsConnector;
use Illuminate\Queue\Connectors\SyncConnector;
use Illuminate\Queue\Events\JobExceptionOccurred;
use Illuminate\Queue\Failed\DatabaseFailedJobProvider;
use Illuminate\Queue\Failed\DatabaseUuidFailedJobProvider;
use Illuminate\Queue\Failed\DynamoDbFailedJobProvider;
use Illuminate\Queue\Failed\FileFailedJobProvider;
use Illuminate\Queue\Failed\NullFailedJobProvider;
use Illuminate\Queue\Jobs\JobName;
use Illuminate\Support\Arr;
use Illuminate\Support\Facades\Facade;
use Illuminate\Support\ServiceProvider;
Expand Down Expand Up @@ -73,10 +76,15 @@ protected function configureSerializableClosureUses()
protected function registerManager()
{
$this->app->singleton('queue', function ($app) {
// Once we have an instance of the queue manager, we will register the various
// resolvers for the queue connectors. These connectors are responsible for
// creating the classes that accept queue configs and instantiate queues.
return tap(new QueueManager($app), function ($manager) {
return tap(new QueueManager($app), function (QueueManager $manager) {
// We will register the various exception handlers for the queue.
// These handlers will fire when an exception is thrown
// when the worker has attempted to process a job.
// This will let us clean up any side effects.
$this->registerExceptionHandlers($manager);
// We will register the various resolvers for the queue connectors.
// These connectors are responsible for creating the classes
// that accept queue configs and instantiate queues.
$this->registerConnectors($manager);
});
});
Expand All @@ -94,6 +102,34 @@ protected function registerConnection()
});
}

/**
* Register the event handler for queue exceptions.
*
* @param \Illuminate\Queue\QueueManager $manager
*
* @return void
*
* @see \Illuminate\Queue\Middleware\Debounced::handle
* @see \Illuminate\Foundation\Bus\Dispatchable::dispatchDebounced
*/
protected function registerExceptionHandlers(QueueManager $manager)
{
// When a job fails, we will clean up the cache to avoid cluttering the cache
$manager->exceptionOccurred(function (JobExceptionOccurred $event) {
[$class, $method] = JobName::parse($event->job->payload()['job']);

$key = 'debounced.'.get_class($class);

if ($class instanceof ShouldBeUnique && method_exists($class, 'uniqueId')) {
// use the uniqueId to debounce by if defined
$key .= '.uniqueBy.'.$class->uniqueId();
}

cache()->forget($key);
cache()->forget($key.'.count');
});
}

/**
* Register the connectors on the queue manager.
*
Expand Down
2 changes: 2 additions & 0 deletions tests/Queue/QueueSyncQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ class SyncQueueJob implements ShouldQueue
{
use InteractsWithQueue;

public string $connection = 'sync';

public function handle()
{
throw new LogicException($this->getValueFromJob('extra'));
Expand Down
Loading