diff --git a/src/Illuminate/Foundation/Bus/Dispatchable.php b/src/Illuminate/Foundation/Bus/Dispatchable.php index 46b500559ff7..81929bb12ade 100644 --- a/src/Illuminate/Foundation/Bus/Dispatchable.php +++ b/src/Illuminate/Foundation/Bus/Dispatchable.php @@ -4,6 +4,7 @@ use Closure; use Illuminate\Contracts\Bus\Dispatcher; +use Illuminate\Contracts\Queue\ShouldBeUnique; use Illuminate\Support\Fluent; trait Dispatchable @@ -41,6 +42,28 @@ public static function dispatchIf($boolean, ...$arguments) : new Fluent; } + /** + * Dispatch the job with the given arguments unless the job is already queued. + * + * @see \Illuminate\Queue\Middleware\Debounced::handle() + */ + // todo - add \DateTimeInterface|\DateInterval as accepted types for wait + public static function dispatchDebounced(int $wait, ...$arguments): PendingDispatch + { + $dispatchable = new static(...$arguments); + $key = 'debounced.' . get_class($dispatchable); + + if ($dispatchable instanceof ShouldBeUnique && method_exists($dispatchable, 'uniqueId')) { + // use the uniqueId to debounce by if defined + $key .= '.uniqueBy.' . $dispatchable->uniqueId(); + } + + cache()->forever($key, now()->addSeconds($wait)->toISOString()); + cache()->increment($key . '.count'); + + return (new PendingDispatch($dispatchable))->delay($wait); + } + /** * Dispatch the job with the given arguments unless the given truth test passes. * diff --git a/src/Illuminate/Queue/CallQueuedHandler.php b/src/Illuminate/Queue/CallQueuedHandler.php index 5bee1d9ebb4c..2a1ffc86eb29 100644 --- a/src/Illuminate/Queue/CallQueuedHandler.php +++ b/src/Illuminate/Queue/CallQueuedHandler.php @@ -14,6 +14,7 @@ use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing; use Illuminate\Database\Eloquent\ModelNotFoundException; use Illuminate\Pipeline\Pipeline; +use Illuminate\Queue\Middleware\Debounced; use ReflectionClass; use RuntimeException; @@ -118,7 +119,11 @@ protected function dispatchThroughMiddleware(Job $job, $command) } return (new Pipeline($this->container))->send($command) - ->through(array_merge(method_exists($command, 'middleware') ? $command->middleware() : [], $command->middleware ?? [])) + ->through(array_merge( + method_exists($command, 'middleware') ? $command->middleware() : [], + $command->middleware ?? [], + [new Debounced()] + )) ->then(function ($command) use ($job) { return $this->dispatcher->dispatchNow( $command, $this->resolveHandler($job, $command) diff --git a/src/Illuminate/Queue/Middleware/Debounced.php b/src/Illuminate/Queue/Middleware/Debounced.php new file mode 100644 index 000000000000..ac155a325e32 --- /dev/null +++ b/src/Illuminate/Queue/Middleware/Debounced.php @@ -0,0 +1,58 @@ +uniqueId(); + } + + if (!in_array(InteractsWithQueue::class, class_uses_recursive($job), true)) { + // using the class-string so there's a hard reference + $traitName = class_basename(InteractsWithQueue::class); + throw new \InvalidArgumentException("The Debounced jobs must use the $traitName trait."); + } + + $count = cache()->pull($key . '.count', 1); + + if ($count > 1) { + // this is an earlier job, so we should delete it + $job->delete(); + + // decrement the count + cache()->forever($key . '.count', $count - 1); + return; + } + + if ($intendedExecutionTime = cache()->pull($key)) { + $intendedExecutionTime = Carbon::parse($intendedExecutionTime); + + if ($intendedExecutionTime->gt(now())) { + // ensure that the intended execution time from the last job is used + $job->release($intendedExecutionTime->diffInSeconds(now(), false)); + return; + } + } + + // todo - this is still marked as RUNNING and DONE in the console for every job despite only the last job executes (JobProcessing, JobProcessed events still fire) + $next($job); + } +}