Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[10.x] Add Job debouncing #50338

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/Illuminate/Foundation/Bus/Dispatchable.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Closure;
use Illuminate\Contracts\Bus\Dispatcher;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Support\Fluent;

trait Dispatchable
Expand Down Expand Up @@ -41,6 +42,28 @@ public static function dispatchIf($boolean, ...$arguments)
: new Fluent;
}

/**
* Dispatch the job with the given arguments unless the job is already queued.
*
* @see \Illuminate\Queue\Middleware\Debounced::handle()
*/
// todo - add \DateTimeInterface|\DateInterval as accepted types for wait
public static function dispatchDebounced(int $wait, ...$arguments): PendingDispatch
{
$dispatchable = new static(...$arguments);
$key = 'debounced.' . get_class($dispatchable);

if ($dispatchable instanceof ShouldBeUnique && method_exists($dispatchable, 'uniqueId')) {
// use the uniqueId to debounce by if defined
$key .= '.uniqueBy.' . $dispatchable->uniqueId();
}

cache()->forever($key, now()->addSeconds($wait)->toISOString());
cache()->increment($key . '.count');

return (new PendingDispatch($dispatchable))->delay($wait);
}

/**
* Dispatch the job with the given arguments unless the given truth test passes.
*
Expand Down
7 changes: 6 additions & 1 deletion src/Illuminate/Queue/CallQueuedHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
use Illuminate\Database\Eloquent\ModelNotFoundException;
use Illuminate\Pipeline\Pipeline;
use Illuminate\Queue\Middleware\Debounced;
use ReflectionClass;
use RuntimeException;

Expand Down Expand Up @@ -118,7 +119,11 @@ protected function dispatchThroughMiddleware(Job $job, $command)
}

return (new Pipeline($this->container))->send($command)
->through(array_merge(method_exists($command, 'middleware') ? $command->middleware() : [], $command->middleware ?? []))
->through(array_merge(
method_exists($command, 'middleware') ? $command->middleware() : [],
$command->middleware ?? [],
[new Debounced()]
))
->then(function ($command) use ($job) {
return $this->dispatcher->dispatchNow(
$command, $this->resolveHandler($job, $command)
Expand Down
58 changes: 58 additions & 0 deletions src/Illuminate/Queue/Middleware/Debounced.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?php

declare(strict_types = 1);

namespace Illuminate\Queue\Middleware;

use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Support\Carbon;

class Debounced
{
/**
* @param InteractsWithQueue|mixed $job
* @param $next
*
* @return mixed|void
*/
public function handle(mixed $job, $next)
{
$key = 'debounced.' . get_class($job);

if ($job instanceof ShouldBeUnique && method_exists($job, 'uniqueId')) {
// use the uniqueId to debounce by if defined
$key .= '.uniqueBy.' . $job->uniqueId();
}

if (!in_array(InteractsWithQueue::class, class_uses_recursive($job), true)) {
// using the class-string so there's a hard reference
$traitName = class_basename(InteractsWithQueue::class);
throw new \InvalidArgumentException("The Debounced jobs must use the $traitName trait.");
}

$count = cache()->pull($key . '.count', 1);

if ($count > 1) {
// this is an earlier job, so we should delete it
$job->delete();

// decrement the count
cache()->forever($key . '.count', $count - 1);
return;
}

if ($intendedExecutionTime = cache()->pull($key)) {
$intendedExecutionTime = Carbon::parse($intendedExecutionTime);

if ($intendedExecutionTime->gt(now())) {
// ensure that the intended execution time from the last job is used
$job->release($intendedExecutionTime->diffInSeconds(now(), false));
return;
}
}

// todo - this is still marked as RUNNING and DONE in the console for every job despite only the last job executes (JobProcessing, JobProcessed events still fire)
$next($job);
}
}
Loading