From f0b6ccd27597aaf67d858f8d7da66c8244a3709e Mon Sep 17 00:00:00 2001 From: Asem Alalami Date: Wed, 19 Jun 2024 16:30:24 +0300 Subject: [PATCH] listen to JobPushed event for Horizon --- src/Providers/QueueMonitorProvider.php | 18 +++++-- src/Services/QueueMonitor.php | 67 +++++++++++++++++++++++++- 2 files changed, 81 insertions(+), 4 deletions(-) diff --git a/src/Providers/QueueMonitorProvider.php b/src/Providers/QueueMonitorProvider.php index 1e2785b..50b6e47 100644 --- a/src/Providers/QueueMonitorProvider.php +++ b/src/Providers/QueueMonitorProvider.php @@ -59,9 +59,21 @@ public function boot(): void // listen to JobQueued event if (config('queue-monitor.monitor_queued_jobs', true)) { - Event::listen(JobQueued::class, function (JobQueued $event) { - QueueMonitor::handleJobQueued($event); - }); + /** + * If the project uses Horizon, we will listen to the JobPushed event, + * because Horizon fires JobPushed event when the job is queued or retry the job again from its UI + * + * @see https://laravel.com/docs/horizon + */ + if (class_exists('Laravel\Horizon\Events\JobPushed')) { + Event::listen('Laravel\Horizon\Events\JobPushed', function ($event) { + QueueMonitor::handleJobPushed($event); + }); + } else { + Event::listen(JobQueued::class, function (JobQueued $event) { + QueueMonitor::handleJobQueued($event); + }); + } } // listen to other job events diff --git a/src/Services/QueueMonitor.php b/src/Services/QueueMonitor.php index 8da30a1..2e7bc48 100644 --- a/src/Services/QueueMonitor.php +++ b/src/Services/QueueMonitor.php @@ -2,6 +2,8 @@ namespace romanzipp\QueueMonitor\Services; +use Illuminate\Container\Container; +use Illuminate\Contracts\Encryption\Encrypter; use Illuminate\Contracts\Queue\Job as JobContract; use Illuminate\Queue\Events\JobExceptionOccurred; use Illuminate\Queue\Events\JobFailed; @@ -41,6 +43,16 @@ public static function handleJobQueued(JobQueued $event): void self::jobQueued($event); } + /** + * @param \Laravel\Horizon\Events\JobPushed $event + * + * @return void + */ + public static function handleJobPushed($event): void + { + self::jobPushed($event); + } + /** * Handle Job Processing. * @@ -125,6 +137,7 @@ protected static function jobQueued(JobQueued $event): void QueueMonitor::getModel()::query()->create([ 'job_id' => $event->id, + 'job_uuid' => isset($event->payload) ? $event->payload()['uuid'] : $event->id, 'name' => get_class($event->job), /** @phpstan-ignore-next-line */ 'queue' => $event->job->queue ?: 'default', @@ -134,6 +147,41 @@ protected static function jobQueued(JobQueued $event): void ]); } + /** + * Start Queue Monitoring for Job. + * + * @param \Laravel\Horizon\Events\JobPushed $event + * + * @return void + */ + protected static function jobPushed($event): void + { + if ( ! self::shouldBeMonitored($event->payload->displayName())) { + return; + } + + // add initial data + if (method_exists($event->payload->displayName(), 'initialMonitorData')) { + $data = json_encode(static::getJobInstance($event->payload->decoded['data'])->initialMonitorData()); + } + + QueueMonitor::getModel()::query()->create([ + 'job_id' => $event->payload->decoded['id'] ?? $event->payload->decoded['uuid'], + 'job_uuid' => $event->payload->decoded['uuid'] ?? $event->payload->decoded['uuid'], + 'name' => $event->payload->displayName(), + /** @phpstan-ignore-next-line */ + 'queue' => $event->queue ?: 'default', + 'status' => MonitorStatus::QUEUED, + 'queued_at' => now(), + 'data' => $data ?? null, + ]); + + // mark the retried job + if ($event->payload->isRetry()) { + QueueMonitor::getModel()::query()->where('job_uuid', $event->payload->retryOf())->update(['retried' => true]); + } + } + /** * Job Start Processing. * @@ -253,10 +301,27 @@ protected static function jobFinished(JobContract $job, int $status, ?\Throwable * * @return bool */ - public static function shouldBeMonitored(object $job): bool + public static function shouldBeMonitored(object|string $job): bool { $class = $job instanceof JobContract ? $job->resolveName() : $job; return array_key_exists(IsMonitored::class, ClassUses::classUsesRecursive($class)); } + + /** + * @param array $data + * @return IsMonitored + */ + private static function getJobInstance(array $data) + { + if (str_starts_with($data['command'], 'O:')) { + return unserialize($data['command']); + } + + if (Container::getInstance()->bound(Encrypter::class)) { + return unserialize(Container::getInstance()[Encrypter::class]->decrypt($data['command'])); + } + + throw new \RuntimeException('Unable to extract job payload.'); + } }