Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jessarcher committed Oct 11, 2023
1 parent 71a125f commit b200cd7
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 47 deletions.
5 changes: 3 additions & 2 deletions database/migrations/2023_06_07_000001_create_pulse_tables.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,14 @@ public function up(): void
$table->string('user_id')->nullable();
$table->string('job');
$table->uuid('job_uuid');
$table->unsignedInteger('attempt')->nullable();
$table->string('connection');
$table->string('queue');
$table->datetime('processing_at')->nullable();
$table->datetime('released_at')->nullable();
$table->datetime('processed_at')->nullable();
$table->datetime('failed_at')->nullable();
$table->unsignedInteger('slow')->default(0);
$table->unsignedInteger('slowest')->nullable();
$table->unsignedInteger('duration')->nullable();

// TODO: verify this update index. Needs to find job quickly.
$table->index(['job_uuid']);
Expand Down
2 changes: 1 addition & 1 deletion dist/pulse.css

Large diffs are not rendered by default.

46 changes: 39 additions & 7 deletions resources/views/livewire/queues.blade.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,23 @@
<x-slot:actions>
<div class="flex gap-4">
<div class="flex items-center gap-2 text-sm text-gray-700 dark:text-gray-300 font-medium">
<div class="h-0.5 w-5 rounded-full bg-[rgba(147,51,234,0.5)]"></div>
<div class="h-0.5 w-4 rounded-full bg-[rgba(107,114,128,0.5)]"></div>
Queued
</div>
<div class="flex items-center gap-2 text-sm text-gray-700 dark:text-gray-300 font-medium">
<div class="h-0.5 w-5 rounded-full bg-[#9333ea]"></div>
<div class="h-0.5 w-4 rounded-full bg-[rgba(147,51,234,0.5)]"></div>
Processing
</div>
<div class="flex items-center gap-2 text-sm text-gray-700 dark:text-gray-300 font-medium">
<div class="h-0.5 w-4 rounded-full bg-[#eab308]"></div>
Released
</div>
<div class="flex items-center gap-2 text-sm text-gray-700 dark:text-gray-300 font-medium">
<div class="h-0.5 w-4 rounded-full bg-[#9333ea]"></div>
Processed
</div>
<div class="flex items-center gap-2 text-sm text-gray-700 dark:text-gray-300 font-medium">
<div class="h-0.5 w-5 rounded-full bg-[#e11d48]"></div>
<div class="h-0.5 w-4 rounded-full bg-[#e11d48]"></div>
Failed
</div>
</div>
Expand Down Expand Up @@ -59,6 +67,8 @@ class="min-h-full flex flex-col"
@php
$highest = $readings->map(fn ($reading) => max(
$reading->queued,
$reading->processing,
$reading->released,
$reading->processed,
$reading->failed,
))->max()
Expand All @@ -69,7 +79,7 @@ class="min-h-full flex flex-col"

<div
wire:ignore
class="h-12"
class="h-14"
x-data="{
init() {
let chart = new Chart(
Expand All @@ -81,14 +91,34 @@ class="h-12"
datasets: [
{
label: 'Queued',
borderColor: 'rgba(147,51,234,0.5)',
borderColor: 'rgba(107,114,128,0.5)',
borderWidth: 2,
borderCapStyle: 'round',
data: @js(collect($readings)->pluck('queued')),
pointStyle: false,
tension: 0.2,
spanGaps: false,
},
{
label: 'Processing',
borderColor: 'rgba(147,51,234,0.5)',
borderWidth: 2,
borderCapStyle: 'round',
data: @js(collect($readings)->pluck('processing')),
pointStyle: false,
tension: 0.2,
spanGaps: false,
},
{
label: 'Released',
borderColor: '#eab308',
borderWidth: 2,
borderCapStyle: 'round',
data: @js(collect($readings)->pluck('released')),
pointStyle: false,
tension: 0.2,
spanGaps: false,
},
{
label: 'Processed',
borderColor: '#9333ea',
Expand Down Expand Up @@ -160,8 +190,10 @@ class="h-12"
chart.data.labels = queues['{{ $queue }}'].map(reading => reading.date)
chart.data.datasets[0].data = queues['{{ $queue }}'].map(reading => reading.queued)
chart.data.datasets[1].data = queues['{{ $queue }}'].map(reading => reading.processed)
chart.data.datasets[2].data = queues['{{ $queue }}'].map(reading => reading.failed)
chart.data.datasets[1].data = queues['{{ $queue }}'].map(reading => reading.processing)
chart.data.datasets[2].data = queues['{{ $queue }}'].map(reading => reading.released)
chart.data.datasets[3].data = queues['{{ $queue }}'].map(reading => reading.processed)
chart.data.datasets[4].data = queues['{{ $queue }}'].map(reading => reading.failed)
chart.update()
})
}
Expand Down
10 changes: 8 additions & 2 deletions src/Pulse.php
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,16 @@ public function register(string|array $recorders): self
/**
* Record the given entry.
*/
public function record(Entry|Update $entry): self
public function record(Entry|Update|array $entries): self
{
if (! is_array($entries)) {
$entries = [$entries];
}

if ($this->shouldRecord) {
$this->entries[] = $entry;
foreach ($entries as $entry) {
$this->entries[] = $entry;
}
}

return $this;
Expand Down
25 changes: 23 additions & 2 deletions src/Queries/Queues.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ public function __invoke(Interval $interval): Collection
'date' => $currentBucket->subSeconds($i * $secondsPerPeriod)->format('Y-m-d H:i'),
'queued' => 0,
'processing' => 0,
'failed' => 0,
'released' => 0,
'processed' => 0,
'failed' => 0,
])
->reverse()
->keyBy('date');
Expand All @@ -60,6 +61,7 @@ public function __invoke(Interval $interval): Collection
->select('bucket', 'connection', 'queue')
->selectRaw('COUNT(`queued_at`) AS `queued`')
->selectRaw('COUNT(`processing_at`) AS `processing`')
->selectRaw('COUNT(`released_at`) AS `released`')
->selectRaw('COUNT(`processed_at`) AS `processed`')
->selectRaw('COUNT(`failed_at`) AS `failed`')
->fromSub(
Expand All @@ -69,6 +71,7 @@ public function __invoke(Interval $interval): Collection
->select('connection', 'queue')
->selectRaw('`date` AS `queued_at`')
->selectRaw('NULL AS `processing_at`')
->selectRaw('NULL AS `released_at`')
->selectRaw('NULL AS `processed_at`')
->selectRaw('NULL AS `failed_at`')
// Divide the data into buckets.
Expand All @@ -81,19 +84,35 @@ public function __invoke(Interval $interval): Collection
->select('connection', 'queue')
->selectRaw('NULL AS `queued_at`')
->addSelect('processing_at')
->selectRaw('NULL AS `released_at`')
->selectRaw('NULL AS `processed_at`')
->selectRaw('NULL AS `failed_at`')
// Divide the data into buckets.
->selectRaw('FLOOR(UNIX_TIMESTAMP(CONVERT_TZ(`processing_at`, ?, @@session.time_zone)) / ?) AS `bucket`', [$now->format('P'), $secondsPerPeriod])
->where('processing_at', '>=', $now->ceilSeconds($interval->totalSeconds / $maxDataPoints)->subSeconds((int) $interval->totalSeconds))
->whereNotNull('processing_at')
)
// Released
->union(fn (Builder $query) => $query
->from('pulse_jobs')
->select('connection', 'queue')
->selectRaw('NULL AS `queued_at`')
->selectRaw('NULL AS `processing_at`')
->addSelect('released_at')
->selectRaw('NULL AS `processed_at`')
->selectRaw('NULL AS `failed_at`')
// Divide the data into buckets.
->selectRaw('FLOOR(UNIX_TIMESTAMP(CONVERT_TZ(`released_at`, ?, @@session.time_zone)) / ?) AS `bucket`', [$now->format('P'), $secondsPerPeriod])
->where('released_at', '>=', $now->ceilSeconds($interval->totalSeconds / $maxDataPoints)->subSeconds((int) $interval->totalSeconds))
->whereNotNull('released_at')
)
// Processed
->union(fn (Builder $query) => $query
->from('pulse_jobs')
->select('connection', 'queue')
->selectRaw('NULL AS `queued_at`')
->selectRaw('NULL AS `processing_at`')
->selectRaw('NULL AS `released_at`')
->addSelect('processed_at')
->selectRaw('NULL AS `failed_at`')
// Divide the data into buckets.
Expand All @@ -107,6 +126,7 @@ public function __invoke(Interval $interval): Collection
->select('connection', 'queue')
->selectRaw('NULL AS `queued_at`')
->selectRaw('NULL AS `processing_at`')
->selectRaw('NULL AS `released_at`')
->selectRaw('NULL AS `processed_at`')
->addSelect('failed_at')
// Divide the data into buckets.
Expand All @@ -130,7 +150,8 @@ public function __invoke(Interval $interval): Collection
return [$date => (object) [
'date' => $date,
'queued' => $reading->queued,
'processing' => $reading->queued,
'processing' => $reading->processing,
'released' => $reading->released,
'processed' => $reading->processed,
'failed' => $reading->failed,
]];
Expand Down
4 changes: 2 additions & 2 deletions src/Queries/SlowJobs.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ public function __invoke(Interval $interval): Collection
$now = new CarbonImmutable;

return $this->connection()->table('pulse_jobs')
->selectRaw('`job`, SUM(slow) as count, MAX(slowest) as slowest')
->selectRaw('`job`, COUNT(*) AS count, MAX(duration) AS slowest')
// TODO: processed_at or failed_at
->where('date', '>=', $now->subSeconds((int) $interval->totalSeconds)->toDateTimeString())
->where('slow', '>', 0)
->where('duration', '>=', $this->config->get('pulse.slow_query_threshold'))
->groupBy('job')
->orderByDesc('slowest')
->get();
Expand Down
66 changes: 35 additions & 31 deletions src/Recorders/Jobs.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public function __construct(
/**
* Record the job.
*/
public function record(JobReleasedAfterException|JobFailed|JobProcessed|JobProcessing|JobQueued $event): Entry|Update|null
public function record(JobReleasedAfterException|JobFailed|JobProcessed|JobProcessing|JobQueued $event): Entry|Update|array|null
{
if ($event->connectionName === 'sync') {
return null;
Expand All @@ -65,63 +65,67 @@ public function record(JobReleasedAfterException|JobFailed|JobProcessed|JobProce
if ($event instanceof JobQueued) {
return new Entry($this->table, [
'date' => $now->toDateTimeString(),
'job' => is_string($event->job)
? $event->job
: $event->job::class,
'job' => $event->job::class,
'job_uuid' => $event->payload()['uuid'],
'connection' => $event->connectionName,
'queue' => $event->job->queue ?? 'default',
'user_id' => $this->pulse->authenticatedUserIdResolver(),
]);
}

// TODO: Store an entry per-retry?
if ($event instanceof JobProcessing) {
$this->lastJobStartedProcessingAt = $now;
// TODO: Add update here?

return null;
}
// TODO: Allow this to be ingested immediately?
$duration = $this->lastJobStartedProcessingAt->diffInMilliseconds($now);
$processingAt = $this->lastJobStartedProcessingAt?->toDateTimeString();
$slow = $duration >= $this->config->get('pulse.slow_job_threshold') ? 1 : 0;

if ($event instanceof JobReleasedAfterException) {
return tap(new Update(
return new Update(
$this->table,
['job_uuid' => (string) $event->job->uuid()],
fn (array $attributes) => [
'processing_at' => $attributes['processing_at'] ?? $processingAt,
'slowest' => max($attributes['slowest'] ?? 0, $duration),
'slow' => $attributes['slow'] + $slow,
['job_uuid' => (string) $event->job->uuid(), 'attempt' => null],
[
'attempt' => $event->job->attempts(),
'processing_at' => $this->lastJobStartedProcessingAt->toDateTimeString(),
],
), fn () => $this->lastJobStartedProcessingAt = null);
);
}

if ($event instanceof JobReleasedAfterException) {
return tap([
new Update(
$this->table,
['job_uuid' => $event->job->uuid(), 'attempt' => $event->job->attempts()],
[
'released_at' => $now->toDateTimeString(),
'duration' => $this->lastJobStartedProcessingAt->diffInMilliseconds($now),
],
),
new Entry($this->table, [
'date' => $now->toDateTimeString(),
'job' => $event->job::class,
'job_uuid' => $event->job->uuid(),
'connection' => $event->connectionName,
'queue' => $event->job->queue ?? 'default',
]),
], fn () => $this->lastJobStartedProcessingAt = null);
}

if ($event instanceof JobProcessed) {
return tap(new Update(
$this->table,
['job_uuid' => (string) $event->job->uuid()],
fn (array $attributes) => [
'processing_at' => $attributes['processing_at'] ?? $processingAt,
['job_uuid' => (string) $event->job->uuid(), 'attempt' => $event->job->attempts()],
[
'processed_at' => $now->toDateTimeString(),
'slowest' => max($attributes['slowest'] ?? 0, $duration),
'slow' => $attributes['slow'] + $slow,
'duration' => $this->lastJobStartedProcessingAt->diffInMilliseconds($now),
],
), fn () => $this->lastJobStartedProcessingAt = null);
}

if ($event instanceof JobFailed) {
return tap(new Update(
$this->table,
['job_uuid' => (string) $event->job->uuid()],
fn (array $attributes) => [
'processing_at' => $attributes['processing_at'] ?? $processingAt,
['job_uuid' => (string) $event->job->uuid(), 'attempt' => $event->job->attempts()],
[
'failed_at' => $now->toDateTimeString(),
'slowest' => max($attributes['slowest'] ?? 0, $duration),
'slow' => $attributes['slow'] + $slow,
'duration' => $this->lastJobStartedProcessingAt->diffInMilliseconds($now),
],
), fn () => $this->lastJobStartedProcessingAt = null);
}
Expand Down

0 comments on commit b200cd7

Please sign in to comment.