Skip to content

Commit

Permalink
listen to JobPushed event for Horizon
Browse files Browse the repository at this point in the history
  • Loading branch information
AsemAlalami authored and romanzipp committed Jul 13, 2024
1 parent 36a2005 commit f0b6ccd
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 4 deletions.
18 changes: 15 additions & 3 deletions src/Providers/QueueMonitorProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,21 @@ public function boot(): void

// listen to JobQueued event
if (config('queue-monitor.monitor_queued_jobs', true)) {
Event::listen(JobQueued::class, function (JobQueued $event) {
QueueMonitor::handleJobQueued($event);
});
/**
* If the project uses Horizon, we will listen to the JobPushed event,
* because Horizon fires JobPushed event when the job is queued or retry the job again from its UI
*
* @see https://laravel.com/docs/horizon
*/
if (class_exists('Laravel\Horizon\Events\JobPushed')) {
Event::listen('Laravel\Horizon\Events\JobPushed', function ($event) {
QueueMonitor::handleJobPushed($event);
});
} else {
Event::listen(JobQueued::class, function (JobQueued $event) {
QueueMonitor::handleJobQueued($event);
});
}
}

// listen to other job events
Expand Down
67 changes: 66 additions & 1 deletion src/Services/QueueMonitor.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace romanzipp\QueueMonitor\Services;

use Illuminate\Container\Container;
use Illuminate\Contracts\Encryption\Encrypter;
use Illuminate\Contracts\Queue\Job as JobContract;
use Illuminate\Queue\Events\JobExceptionOccurred;
use Illuminate\Queue\Events\JobFailed;
Expand Down Expand Up @@ -41,6 +43,16 @@ public static function handleJobQueued(JobQueued $event): void
self::jobQueued($event);
}

/**
* @param \Laravel\Horizon\Events\JobPushed $event
*
* @return void
*/
public static function handleJobPushed($event): void

Check failure on line 51 in src/Services/QueueMonitor.php

View workflow job for this annotation

GitHub Actions / phpstan

Parameter $event of method romanzipp\QueueMonitor\Services\QueueMonitor::handleJobPushed() has invalid type Laravel\Horizon\Events\JobPushed.
{
self::jobPushed($event);
}

/**
* Handle Job Processing.
*
Expand Down Expand Up @@ -125,6 +137,7 @@ protected static function jobQueued(JobQueued $event): void

QueueMonitor::getModel()::query()->create([
'job_id' => $event->id,
'job_uuid' => isset($event->payload) ? $event->payload()['uuid'] : $event->id,
'name' => get_class($event->job),
/** @phpstan-ignore-next-line */
'queue' => $event->job->queue ?: 'default',
Expand All @@ -134,6 +147,41 @@ protected static function jobQueued(JobQueued $event): void
]);
}

/**
* Start Queue Monitoring for Job.
*
* @param \Laravel\Horizon\Events\JobPushed $event
*
* @return void
*/
protected static function jobPushed($event): void
{
if ( ! self::shouldBeMonitored($event->payload->displayName())) {
return;
}

// add initial data
if (method_exists($event->payload->displayName(), 'initialMonitorData')) {
$data = json_encode(static::getJobInstance($event->payload->decoded['data'])->initialMonitorData());
}

QueueMonitor::getModel()::query()->create([
'job_id' => $event->payload->decoded['id'] ?? $event->payload->decoded['uuid'],
'job_uuid' => $event->payload->decoded['uuid'] ?? $event->payload->decoded['uuid'],
'name' => $event->payload->displayName(),
/** @phpstan-ignore-next-line */
'queue' => $event->queue ?: 'default',
'status' => MonitorStatus::QUEUED,
'queued_at' => now(),
'data' => $data ?? null,
]);

// mark the retried job
if ($event->payload->isRetry()) {
QueueMonitor::getModel()::query()->where('job_uuid', $event->payload->retryOf())->update(['retried' => true]);
}
}

/**
* Job Start Processing.
*
Expand Down Expand Up @@ -253,10 +301,27 @@ protected static function jobFinished(JobContract $job, int $status, ?\Throwable
*
* @return bool
*/
public static function shouldBeMonitored(object $job): bool
public static function shouldBeMonitored(object|string $job): bool
{
$class = $job instanceof JobContract ? $job->resolveName() : $job;

return array_key_exists(IsMonitored::class, ClassUses::classUsesRecursive($class));
}

/**
* @param array $data
* @return IsMonitored
*/
private static function getJobInstance(array $data)
{
if (str_starts_with($data['command'], 'O:')) {
return unserialize($data['command']);
}

if (Container::getInstance()->bound(Encrypter::class)) {
return unserialize(Container::getInstance()[Encrypter::class]->decrypt($data['command']));
}

throw new \RuntimeException('Unable to extract job payload.');
}
}

0 comments on commit f0b6ccd

Please sign in to comment.