From c62d15ba607b423a65da217f4201c1f4840ba1bf Mon Sep 17 00:00:00 2001 From: Daniel Mason Date: Wed, 17 Jan 2024 21:20:57 +0000 Subject: [PATCH] [10.x] Adds JobQueueing event (#49722) * Adds JobQueueing event * style fixes * fix broken tests * fix broken tests --- src/Illuminate/Queue/Events/JobQueueing.php | 58 +++++++++++++++++++ src/Illuminate/Queue/Queue.php | 19 ++++++ tests/Integration/Queue/RedisQueueTest.php | 27 +++++---- tests/Queue/QueueBeanstalkdQueueTest.php | 4 +- .../QueueDatabaseQueueIntegrationTest.php | 20 +++++-- tests/Queue/QueueDatabaseQueueUnitTest.php | 4 +- tests/Queue/QueueRedisQueueTest.php | 10 ++-- tests/Queue/QueueSqsQueueTest.php | 6 +- 8 files changed, 120 insertions(+), 28 deletions(-) create mode 100644 src/Illuminate/Queue/Events/JobQueueing.php diff --git a/src/Illuminate/Queue/Events/JobQueueing.php b/src/Illuminate/Queue/Events/JobQueueing.php new file mode 100644 index 000000000000..ebb0769681b4 --- /dev/null +++ b/src/Illuminate/Queue/Events/JobQueueing.php @@ -0,0 +1,58 @@ +connectionName = $connectionName; + $this->job = $job; + $this->payload = $payload; + } + + /** + * Get the decoded job payload. + * + * @return array + */ + public function payload() + { + if ($this->payload === null) { + throw new RuntimeException('The job payload was not provided when the event was dispatched.'); + } + + return json_decode($this->payload, true, flags: JSON_THROW_ON_ERROR); + } +} diff --git a/src/Illuminate/Queue/Queue.php b/src/Illuminate/Queue/Queue.php index 1aa09ee30bdb..09eb24526311 100755 --- a/src/Illuminate/Queue/Queue.php +++ b/src/Illuminate/Queue/Queue.php @@ -9,6 +9,7 @@ use Illuminate\Contracts\Queue\ShouldBeEncrypted; use Illuminate\Contracts\Queue\ShouldQueueAfterCommit; use Illuminate\Queue\Events\JobQueued; +use Illuminate\Queue\Events\JobQueueing; use Illuminate\Support\Arr; use Illuminate\Support\InteractsWithTime; use Illuminate\Support\Str; @@ -327,6 +328,8 @@ protected function enqueueUsing($job, $payload, $queue, $delay, $callback) $this->container->bound('db.transactions')) { return $this->container->make('db.transactions')->addCallback( function () use ($payload, $queue, $delay, $callback, $job) { + $this->raiseJobQueueingEvent($job, $payload); + return tap($callback($payload, $queue, $delay), function ($jobId) use ($job, $payload) { $this->raiseJobQueuedEvent($jobId, $job, $payload); }); @@ -334,6 +337,8 @@ function () use ($payload, $queue, $delay, $callback, $job) { ); } + $this->raiseJobQueueingEvent($job, $payload); + return tap($callback($payload, $queue, $delay), function ($jobId) use ($job, $payload) { $this->raiseJobQueuedEvent($jobId, $job, $payload); }); @@ -362,6 +367,20 @@ protected function shouldDispatchAfterCommit($job) return false; } + /** + * Raise the job queueing event. + * + * @param \Closure|string|object $job + * @param string $payload + * @return void + */ + protected function raiseJobQueueingEvent($job, $payload) + { + if ($this->container->bound('events')) { + $this->container['events']->dispatch(new JobQueueing($this->connectionName, $job, $payload)); + } + } + /** * Raise the job queued event. * diff --git a/tests/Integration/Queue/RedisQueueTest.php b/tests/Integration/Queue/RedisQueueTest.php index 74f1dffcec4a..0905ada8d4c3 100644 --- a/tests/Integration/Queue/RedisQueueTest.php +++ b/tests/Integration/Queue/RedisQueueTest.php @@ -6,6 +6,7 @@ use Illuminate\Contracts\Events\Dispatcher; use Illuminate\Foundation\Testing\Concerns\InteractsWithRedis; use Illuminate\Queue\Events\JobQueued; +use Illuminate\Queue\Events\JobQueueing; use Illuminate\Queue\Jobs\RedisJob; use Illuminate\Queue\RedisQueue; use Illuminate\Support\InteractsWithTime; @@ -200,7 +201,7 @@ public function testPopPopsDelayedJobOffOfRedisWhenExpireNull($driver) $job = new RedisQueueIntegrationTestJob(10); $this->queue->later(-10, $job); - $this->container->shouldHaveReceived('bound')->with('events')->once(); + $this->container->shouldHaveReceived('bound')->with('events')->twice(); // Pop and check it is popped correctly $before = $this->currentTime(); @@ -278,7 +279,7 @@ public function testNotExpireJobsWhenExpireNull($driver) // Make an expired reserved job $failed = new RedisQueueIntegrationTestJob(-20); $this->queue->push($failed); - $this->container->shouldHaveReceived('bound')->with('events')->once(); + $this->container->shouldHaveReceived('bound')->with('events')->twice(); $beforeFailPop = $this->currentTime(); $this->queue->pop(); @@ -287,7 +288,7 @@ public function testNotExpireJobsWhenExpireNull($driver) // Push an item into queue $job = new RedisQueueIntegrationTestJob(10); $this->queue->push($job); - $this->container->shouldHaveReceived('bound')->with('events')->times(2); + $this->container->shouldHaveReceived('bound')->with('events')->times(4); // Pop and check it is popped correctly $before = $this->currentTime(); @@ -326,7 +327,7 @@ public function testExpireJobsWhenExpireSet($driver) // Push an item into queue $job = new RedisQueueIntegrationTestJob(10); $this->queue->push($job); - $this->container->shouldHaveReceived('bound')->with('events')->once(); + $this->container->shouldHaveReceived('bound')->with('events')->twice(); // Pop and check it is popped correctly $before = $this->currentTime(); @@ -466,19 +467,24 @@ public function testSize($driver) * @param string $driver */ #[DataProvider('redisDriverProvider')] - public function testPushJobQueuedEvent($driver) + public function testPushJobQueueingAndJobQueuedEvents($driver) { $events = m::mock(Dispatcher::class); + $events->shouldReceive('dispatch')->withArgs(function (JobQueueing $jobQueuing) { + $this->assertInstanceOf(RedisQueueIntegrationTestJob::class, $jobQueuing->job); + + return true; + })->andReturnNull()->once(); $events->shouldReceive('dispatch')->withArgs(function (JobQueued $jobQueued) { $this->assertInstanceOf(RedisQueueIntegrationTestJob::class, $jobQueued->job); - $this->assertIsString(RedisQueueIntegrationTestJob::class, $jobQueued->id); + $this->assertIsString($jobQueued->id); return true; })->andReturnNull()->once(); $container = m::mock(Container::class); - $container->shouldReceive('bound')->with('events')->andReturn(true)->once(); - $container->shouldReceive('offsetGet')->with('events')->andReturn($events)->once(); + $container->shouldReceive('bound')->with('events')->andReturn(true)->twice(); + $container->shouldReceive('offsetGet')->with('events')->andReturn($events)->twice(); $queue = new RedisQueue($this->redis[$driver]); $queue->setContainer($container); @@ -493,11 +499,12 @@ public function testPushJobQueuedEvent($driver) public function testBulkJobQueuedEvent($driver) { $events = m::mock(Dispatcher::class); + $events->shouldReceive('dispatch')->with(m::type(JobQueueing::class))->andReturnNull()->times(3); $events->shouldReceive('dispatch')->with(m::type(JobQueued::class))->andReturnNull()->times(3); $container = m::mock(Container::class); - $container->shouldReceive('bound')->with('events')->andReturn(true)->times(3); - $container->shouldReceive('offsetGet')->with('events')->andReturn($events)->times(3); + $container->shouldReceive('bound')->with('events')->andReturn(true)->times(6); + $container->shouldReceive('offsetGet')->with('events')->andReturn($events)->times(6); $queue = new RedisQueue($this->redis[$driver]); $queue->setContainer($container); diff --git a/tests/Queue/QueueBeanstalkdQueueTest.php b/tests/Queue/QueueBeanstalkdQueueTest.php index ed4e6f904e20..beab4bfa98df 100755 --- a/tests/Queue/QueueBeanstalkdQueueTest.php +++ b/tests/Queue/QueueBeanstalkdQueueTest.php @@ -45,7 +45,7 @@ public function testPushProperlyPushesJobOntoBeanstalkd() $this->queue->push('foo', ['data'], 'stack'); $this->queue->push('foo', ['data']); - $this->container->shouldHaveReceived('bound')->with('events')->times(2); + $this->container->shouldHaveReceived('bound')->with('events')->times(4); Str::createUuidsNormally(); } @@ -67,7 +67,7 @@ public function testDelayedPushProperlyPushesJobOntoBeanstalkd() $this->queue->later(5, 'foo', ['data'], 'stack'); $this->queue->later(5, 'foo', ['data']); - $this->container->shouldHaveReceived('bound')->with('events')->times(2); + $this->container->shouldHaveReceived('bound')->with('events')->times(4); Str::createUuidsNormally(); } diff --git a/tests/Queue/QueueDatabaseQueueIntegrationTest.php b/tests/Queue/QueueDatabaseQueueIntegrationTest.php index 259fe46adcbe..4c4f7c91c5c1 100644 --- a/tests/Queue/QueueDatabaseQueueIntegrationTest.php +++ b/tests/Queue/QueueDatabaseQueueIntegrationTest.php @@ -9,6 +9,7 @@ use Illuminate\Events\Dispatcher; use Illuminate\Queue\DatabaseQueue; use Illuminate\Queue\Events\JobQueued; +use Illuminate\Queue\Events\JobQueueing; use Illuminate\Support\Carbon; use Illuminate\Support\Str; use PHPUnit\Framework\TestCase; @@ -247,21 +248,28 @@ public function testThatReservedJobsAreNotPopped() $this->assertNull($popped_job); } - public function testJobPayloadIsAvailableOnEvent() + public function testJobPayloadIsAvailableOnEvents() { - $event = null; + $jobQueueingEvent = null; + $jobQueuedEvent = null; Str::createUuidsUsingSequence([ 'expected-job-uuid', ]); - $this->container['events']->listen(function (JobQueued $e) use (&$event) { - $event = $e; + $this->container['events']->listen(function (JobQueueing $e) use (&$jobQueueingEvent) { + $jobQueueingEvent = $e; + }); + $this->container['events']->listen(function (JobQueued $e) use (&$jobQueuedEvent) { + $jobQueuedEvent = $e; }); $this->queue->push('MyJob', [ 'laravel' => 'Framework', ]); - $this->assertIsArray($event->payload()); - $this->assertSame('expected-job-uuid', $event->payload()['uuid']); + $this->assertIsArray($jobQueueingEvent->payload()); + $this->assertSame('expected-job-uuid', $jobQueueingEvent->payload()['uuid']); + + $this->assertIsArray($jobQueuedEvent->payload()); + $this->assertSame('expected-job-uuid', $jobQueuedEvent->payload()['uuid']); } } diff --git a/tests/Queue/QueueDatabaseQueueUnitTest.php b/tests/Queue/QueueDatabaseQueueUnitTest.php index 17087db4e592..53c65720551c 100644 --- a/tests/Queue/QueueDatabaseQueueUnitTest.php +++ b/tests/Queue/QueueDatabaseQueueUnitTest.php @@ -46,7 +46,7 @@ public function testPushProperlyPushesJobOntoDatabase($uuid, $job, $displayNameS $queue->push($job, ['data']); - $container->shouldHaveReceived('bound')->with('events')->once(); + $container->shouldHaveReceived('bound')->with('events')->twice(); Str::createUuidsNormally(); } @@ -87,7 +87,7 @@ public function testDelayedPushProperlyPushesJobOntoDatabase() $queue->later(10, 'foo', ['data']); - $container->shouldHaveReceived('bound')->with('events')->once(); + $container->shouldHaveReceived('bound')->with('events')->twice(); Str::createUuidsNormally(); } diff --git a/tests/Queue/QueueRedisQueueTest.php b/tests/Queue/QueueRedisQueueTest.php index 442676de71ce..007f743653d8 100644 --- a/tests/Queue/QueueRedisQueueTest.php +++ b/tests/Queue/QueueRedisQueueTest.php @@ -35,7 +35,7 @@ public function testPushProperlyPushesJobOntoRedis() $id = $queue->push('foo', ['data']); $this->assertSame('foo', $id); - $container->shouldHaveReceived('bound')->with('events')->once(); + $container->shouldHaveReceived('bound')->with('events')->twice(); Str::createUuidsNormally(); } @@ -60,7 +60,7 @@ public function testPushProperlyPushesJobOntoRedisWithCustomPayloadHook() $id = $queue->push('foo', ['data']); $this->assertSame('foo', $id); - $container->shouldHaveReceived('bound')->with('events')->once(); + $container->shouldHaveReceived('bound')->with('events')->twice(); Queue::createPayloadUsing(null); @@ -91,7 +91,7 @@ public function testPushProperlyPushesJobOntoRedisWithTwoCustomPayloadHook() $id = $queue->push('foo', ['data']); $this->assertSame('foo', $id); - $container->shouldHaveReceived('bound')->with('events')->once(); + $container->shouldHaveReceived('bound')->with('events')->twice(); Queue::createPayloadUsing(null); @@ -120,7 +120,7 @@ public function testDelayedPushProperlyPushesJobOntoRedis() $id = $queue->later(1, 'foo', ['data']); $this->assertSame('foo', $id); - $container->shouldHaveReceived('bound')->with('events')->once(); + $container->shouldHaveReceived('bound')->with('events')->twice(); Str::createUuidsNormally(); } @@ -147,7 +147,7 @@ public function testDelayedPushWithDateTimeProperlyPushesJobOntoRedis() ); $queue->later($date, 'foo', ['data']); - $container->shouldHaveReceived('bound')->with('events')->once(); + $container->shouldHaveReceived('bound')->with('events')->twice(); Str::createUuidsNormally(); } diff --git a/tests/Queue/QueueSqsQueueTest.php b/tests/Queue/QueueSqsQueueTest.php index 3886a3f83854..021e66484b68 100755 --- a/tests/Queue/QueueSqsQueueTest.php +++ b/tests/Queue/QueueSqsQueueTest.php @@ -116,7 +116,7 @@ public function testDelayedPushWithDateTimeProperlyPushesJobOntoSqs() $this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload, 'DelaySeconds' => 5])->andReturn($this->mockedSendMessageResponseModel); $id = $queue->later($now->addSeconds(5), $this->mockedJob, $this->mockedData, $this->queueName); $this->assertEquals($this->mockedMessageId, $id); - $container->shouldHaveReceived('bound')->with('events')->once(); + $container->shouldHaveReceived('bound')->with('events')->twice(); } public function testDelayedPushProperlyPushesJobOntoSqs() @@ -129,7 +129,7 @@ public function testDelayedPushProperlyPushesJobOntoSqs() $this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload, 'DelaySeconds' => $this->mockedDelay])->andReturn($this->mockedSendMessageResponseModel); $id = $queue->later($this->mockedDelay, $this->mockedJob, $this->mockedData, $this->queueName); $this->assertEquals($this->mockedMessageId, $id); - $container->shouldHaveReceived('bound')->with('events')->once(); + $container->shouldHaveReceived('bound')->with('events')->twice(); } public function testPushProperlyPushesJobOntoSqs() @@ -141,7 +141,7 @@ public function testPushProperlyPushesJobOntoSqs() $this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload])->andReturn($this->mockedSendMessageResponseModel); $id = $queue->push($this->mockedJob, $this->mockedData, $this->queueName); $this->assertEquals($this->mockedMessageId, $id); - $container->shouldHaveReceived('bound')->with('events')->once(); + $container->shouldHaveReceived('bound')->with('events')->twice(); } public function testSizeProperlyReadsSqsQueueSize()