Skip to content

Commit

Permalink
[10.x] Adds JobQueueing event (#49722)
Browse files Browse the repository at this point in the history
* Adds JobQueueing event

* style fixes

* fix broken tests

* fix broken tests
  • Loading branch information
dmason30 authored Jan 17, 2024
1 parent d710a01 commit c62d15b
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 28 deletions.
58 changes: 58 additions & 0 deletions src/Illuminate/Queue/Events/JobQueueing.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?php

namespace Illuminate\Queue\Events;

use RuntimeException;

class JobQueueing
{
/**
* The connection name.
*
* @var string
*/
public $connectionName;

/**
* The job instance.
*
* @var \Closure|string|object
*/
public $job;

/**
* The job payload.
*
* @var string|null
*/
public $payload;

/**
* Create a new event instance.
*
* @param string $connectionName
* @param \Closure|string|object $job
* @param string|null $payload
* @return void
*/
public function __construct($connectionName, $job, $payload = null)
{
$this->connectionName = $connectionName;
$this->job = $job;
$this->payload = $payload;
}

/**
* Get the decoded job payload.
*
* @return array
*/
public function payload()
{
if ($this->payload === null) {
throw new RuntimeException('The job payload was not provided when the event was dispatched.');
}

return json_decode($this->payload, true, flags: JSON_THROW_ON_ERROR);
}
}
19 changes: 19 additions & 0 deletions src/Illuminate/Queue/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueueAfterCommit;
use Illuminate\Queue\Events\JobQueued;
use Illuminate\Queue\Events\JobQueueing;
use Illuminate\Support\Arr;
use Illuminate\Support\InteractsWithTime;
use Illuminate\Support\Str;
Expand Down Expand Up @@ -327,13 +328,17 @@ protected function enqueueUsing($job, $payload, $queue, $delay, $callback)
$this->container->bound('db.transactions')) {
return $this->container->make('db.transactions')->addCallback(
function () use ($payload, $queue, $delay, $callback, $job) {
$this->raiseJobQueueingEvent($job, $payload);

return tap($callback($payload, $queue, $delay), function ($jobId) use ($job, $payload) {
$this->raiseJobQueuedEvent($jobId, $job, $payload);
});
}
);
}

$this->raiseJobQueueingEvent($job, $payload);

return tap($callback($payload, $queue, $delay), function ($jobId) use ($job, $payload) {
$this->raiseJobQueuedEvent($jobId, $job, $payload);
});
Expand Down Expand Up @@ -362,6 +367,20 @@ protected function shouldDispatchAfterCommit($job)
return false;
}

/**
* Raise the job queueing event.
*
* @param \Closure|string|object $job
* @param string $payload
* @return void
*/
protected function raiseJobQueueingEvent($job, $payload)
{
if ($this->container->bound('events')) {
$this->container['events']->dispatch(new JobQueueing($this->connectionName, $job, $payload));
}
}

/**
* Raise the job queued event.
*
Expand Down
27 changes: 17 additions & 10 deletions tests/Integration/Queue/RedisQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Foundation\Testing\Concerns\InteractsWithRedis;
use Illuminate\Queue\Events\JobQueued;
use Illuminate\Queue\Events\JobQueueing;
use Illuminate\Queue\Jobs\RedisJob;
use Illuminate\Queue\RedisQueue;
use Illuminate\Support\InteractsWithTime;
Expand Down Expand Up @@ -200,7 +201,7 @@ public function testPopPopsDelayedJobOffOfRedisWhenExpireNull($driver)
$job = new RedisQueueIntegrationTestJob(10);
$this->queue->later(-10, $job);

$this->container->shouldHaveReceived('bound')->with('events')->once();
$this->container->shouldHaveReceived('bound')->with('events')->twice();

// Pop and check it is popped correctly
$before = $this->currentTime();
Expand Down Expand Up @@ -278,7 +279,7 @@ public function testNotExpireJobsWhenExpireNull($driver)
// Make an expired reserved job
$failed = new RedisQueueIntegrationTestJob(-20);
$this->queue->push($failed);
$this->container->shouldHaveReceived('bound')->with('events')->once();
$this->container->shouldHaveReceived('bound')->with('events')->twice();

$beforeFailPop = $this->currentTime();
$this->queue->pop();
Expand All @@ -287,7 +288,7 @@ public function testNotExpireJobsWhenExpireNull($driver)
// Push an item into queue
$job = new RedisQueueIntegrationTestJob(10);
$this->queue->push($job);
$this->container->shouldHaveReceived('bound')->with('events')->times(2);
$this->container->shouldHaveReceived('bound')->with('events')->times(4);

// Pop and check it is popped correctly
$before = $this->currentTime();
Expand Down Expand Up @@ -326,7 +327,7 @@ public function testExpireJobsWhenExpireSet($driver)
// Push an item into queue
$job = new RedisQueueIntegrationTestJob(10);
$this->queue->push($job);
$this->container->shouldHaveReceived('bound')->with('events')->once();
$this->container->shouldHaveReceived('bound')->with('events')->twice();

// Pop and check it is popped correctly
$before = $this->currentTime();
Expand Down Expand Up @@ -466,19 +467,24 @@ public function testSize($driver)
* @param string $driver
*/
#[DataProvider('redisDriverProvider')]
public function testPushJobQueuedEvent($driver)
public function testPushJobQueueingAndJobQueuedEvents($driver)
{
$events = m::mock(Dispatcher::class);
$events->shouldReceive('dispatch')->withArgs(function (JobQueueing $jobQueuing) {
$this->assertInstanceOf(RedisQueueIntegrationTestJob::class, $jobQueuing->job);

return true;
})->andReturnNull()->once();
$events->shouldReceive('dispatch')->withArgs(function (JobQueued $jobQueued) {
$this->assertInstanceOf(RedisQueueIntegrationTestJob::class, $jobQueued->job);
$this->assertIsString(RedisQueueIntegrationTestJob::class, $jobQueued->id);
$this->assertIsString($jobQueued->id);

return true;
})->andReturnNull()->once();

$container = m::mock(Container::class);
$container->shouldReceive('bound')->with('events')->andReturn(true)->once();
$container->shouldReceive('offsetGet')->with('events')->andReturn($events)->once();
$container->shouldReceive('bound')->with('events')->andReturn(true)->twice();
$container->shouldReceive('offsetGet')->with('events')->andReturn($events)->twice();

$queue = new RedisQueue($this->redis[$driver]);
$queue->setContainer($container);
Expand All @@ -493,11 +499,12 @@ public function testPushJobQueuedEvent($driver)
public function testBulkJobQueuedEvent($driver)
{
$events = m::mock(Dispatcher::class);
$events->shouldReceive('dispatch')->with(m::type(JobQueueing::class))->andReturnNull()->times(3);
$events->shouldReceive('dispatch')->with(m::type(JobQueued::class))->andReturnNull()->times(3);

$container = m::mock(Container::class);
$container->shouldReceive('bound')->with('events')->andReturn(true)->times(3);
$container->shouldReceive('offsetGet')->with('events')->andReturn($events)->times(3);
$container->shouldReceive('bound')->with('events')->andReturn(true)->times(6);
$container->shouldReceive('offsetGet')->with('events')->andReturn($events)->times(6);

$queue = new RedisQueue($this->redis[$driver]);
$queue->setContainer($container);
Expand Down
4 changes: 2 additions & 2 deletions tests/Queue/QueueBeanstalkdQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public function testPushProperlyPushesJobOntoBeanstalkd()
$this->queue->push('foo', ['data'], 'stack');
$this->queue->push('foo', ['data']);

$this->container->shouldHaveReceived('bound')->with('events')->times(2);
$this->container->shouldHaveReceived('bound')->with('events')->times(4);

Str::createUuidsNormally();
}
Expand All @@ -67,7 +67,7 @@ public function testDelayedPushProperlyPushesJobOntoBeanstalkd()
$this->queue->later(5, 'foo', ['data'], 'stack');
$this->queue->later(5, 'foo', ['data']);

$this->container->shouldHaveReceived('bound')->with('events')->times(2);
$this->container->shouldHaveReceived('bound')->with('events')->times(4);

Str::createUuidsNormally();
}
Expand Down
20 changes: 14 additions & 6 deletions tests/Queue/QueueDatabaseQueueIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Illuminate\Events\Dispatcher;
use Illuminate\Queue\DatabaseQueue;
use Illuminate\Queue\Events\JobQueued;
use Illuminate\Queue\Events\JobQueueing;
use Illuminate\Support\Carbon;
use Illuminate\Support\Str;
use PHPUnit\Framework\TestCase;
Expand Down Expand Up @@ -247,21 +248,28 @@ public function testThatReservedJobsAreNotPopped()
$this->assertNull($popped_job);
}

public function testJobPayloadIsAvailableOnEvent()
public function testJobPayloadIsAvailableOnEvents()
{
$event = null;
$jobQueueingEvent = null;
$jobQueuedEvent = null;
Str::createUuidsUsingSequence([
'expected-job-uuid',
]);
$this->container['events']->listen(function (JobQueued $e) use (&$event) {
$event = $e;
$this->container['events']->listen(function (JobQueueing $e) use (&$jobQueueingEvent) {
$jobQueueingEvent = $e;
});
$this->container['events']->listen(function (JobQueued $e) use (&$jobQueuedEvent) {
$jobQueuedEvent = $e;
});

$this->queue->push('MyJob', [
'laravel' => 'Framework',
]);

$this->assertIsArray($event->payload());
$this->assertSame('expected-job-uuid', $event->payload()['uuid']);
$this->assertIsArray($jobQueueingEvent->payload());
$this->assertSame('expected-job-uuid', $jobQueueingEvent->payload()['uuid']);

$this->assertIsArray($jobQueuedEvent->payload());
$this->assertSame('expected-job-uuid', $jobQueuedEvent->payload()['uuid']);
}
}
4 changes: 2 additions & 2 deletions tests/Queue/QueueDatabaseQueueUnitTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public function testPushProperlyPushesJobOntoDatabase($uuid, $job, $displayNameS

$queue->push($job, ['data']);

$container->shouldHaveReceived('bound')->with('events')->once();
$container->shouldHaveReceived('bound')->with('events')->twice();

Str::createUuidsNormally();
}
Expand Down Expand Up @@ -87,7 +87,7 @@ public function testDelayedPushProperlyPushesJobOntoDatabase()

$queue->later(10, 'foo', ['data']);

$container->shouldHaveReceived('bound')->with('events')->once();
$container->shouldHaveReceived('bound')->with('events')->twice();

Str::createUuidsNormally();
}
Expand Down
10 changes: 5 additions & 5 deletions tests/Queue/QueueRedisQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public function testPushProperlyPushesJobOntoRedis()

$id = $queue->push('foo', ['data']);
$this->assertSame('foo', $id);
$container->shouldHaveReceived('bound')->with('events')->once();
$container->shouldHaveReceived('bound')->with('events')->twice();

Str::createUuidsNormally();
}
Expand All @@ -60,7 +60,7 @@ public function testPushProperlyPushesJobOntoRedisWithCustomPayloadHook()

$id = $queue->push('foo', ['data']);
$this->assertSame('foo', $id);
$container->shouldHaveReceived('bound')->with('events')->once();
$container->shouldHaveReceived('bound')->with('events')->twice();

Queue::createPayloadUsing(null);

Expand Down Expand Up @@ -91,7 +91,7 @@ public function testPushProperlyPushesJobOntoRedisWithTwoCustomPayloadHook()

$id = $queue->push('foo', ['data']);
$this->assertSame('foo', $id);
$container->shouldHaveReceived('bound')->with('events')->once();
$container->shouldHaveReceived('bound')->with('events')->twice();

Queue::createPayloadUsing(null);

Expand Down Expand Up @@ -120,7 +120,7 @@ public function testDelayedPushProperlyPushesJobOntoRedis()

$id = $queue->later(1, 'foo', ['data']);
$this->assertSame('foo', $id);
$container->shouldHaveReceived('bound')->with('events')->once();
$container->shouldHaveReceived('bound')->with('events')->twice();

Str::createUuidsNormally();
}
Expand All @@ -147,7 +147,7 @@ public function testDelayedPushWithDateTimeProperlyPushesJobOntoRedis()
);

$queue->later($date, 'foo', ['data']);
$container->shouldHaveReceived('bound')->with('events')->once();
$container->shouldHaveReceived('bound')->with('events')->twice();

Str::createUuidsNormally();
}
Expand Down
6 changes: 3 additions & 3 deletions tests/Queue/QueueSqsQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public function testDelayedPushWithDateTimeProperlyPushesJobOntoSqs()
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload, 'DelaySeconds' => 5])->andReturn($this->mockedSendMessageResponseModel);
$id = $queue->later($now->addSeconds(5), $this->mockedJob, $this->mockedData, $this->queueName);
$this->assertEquals($this->mockedMessageId, $id);
$container->shouldHaveReceived('bound')->with('events')->once();
$container->shouldHaveReceived('bound')->with('events')->twice();
}

public function testDelayedPushProperlyPushesJobOntoSqs()
Expand All @@ -129,7 +129,7 @@ public function testDelayedPushProperlyPushesJobOntoSqs()
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload, 'DelaySeconds' => $this->mockedDelay])->andReturn($this->mockedSendMessageResponseModel);
$id = $queue->later($this->mockedDelay, $this->mockedJob, $this->mockedData, $this->queueName);
$this->assertEquals($this->mockedMessageId, $id);
$container->shouldHaveReceived('bound')->with('events')->once();
$container->shouldHaveReceived('bound')->with('events')->twice();
}

public function testPushProperlyPushesJobOntoSqs()
Expand All @@ -141,7 +141,7 @@ public function testPushProperlyPushesJobOntoSqs()
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload])->andReturn($this->mockedSendMessageResponseModel);
$id = $queue->push($this->mockedJob, $this->mockedData, $this->queueName);
$this->assertEquals($this->mockedMessageId, $id);
$container->shouldHaveReceived('bound')->with('events')->once();
$container->shouldHaveReceived('bound')->with('events')->twice();
}

public function testSizeProperlyReadsSqsQueueSize()
Expand Down

0 comments on commit c62d15b

Please sign in to comment.