Skip to content

Commit

Permalink
Laravel: start of QueueWatcher.
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisLightfootWild committed Apr 2, 2024
1 parent 7371e07 commit 8c26008
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/Instrumentation/Laravel/src/LaravelInstrumentation.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use OpenTelemetry\Contrib\Instrumentation\Laravel\Watchers\ExceptionWatcher;
use OpenTelemetry\Contrib\Instrumentation\Laravel\Watchers\LogWatcher;
use OpenTelemetry\Contrib\Instrumentation\Laravel\Watchers\QueryWatcher;
use OpenTelemetry\Contrib\Instrumentation\Laravel\Watchers\QueueWatcher;
use OpenTelemetry\Contrib\Instrumentation\Laravel\Watchers\RequestWatcher;
use OpenTelemetry\Contrib\Instrumentation\Laravel\Watchers\Watcher;
use function OpenTelemetry\Instrumentation\hook;
Expand Down Expand Up @@ -39,6 +40,7 @@ public static function register(): void
self::registerWatchers($application, new ExceptionWatcher());
self::registerWatchers($application, new LogWatcher());
self::registerWatchers($application, new QueryWatcher($instrumentation));
self::registerWatchers($application, new QueueWatcher($instrumentation));
self::registerWatchers($application, new RequestWatcher());
},
);
Expand Down
86 changes: 86 additions & 0 deletions src/Instrumentation/Laravel/src/Watchers/QueueWatcher.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
<?php

declare(strict_types=1);

namespace OpenTelemetry\Contrib\Instrumentation\Laravel\Watchers;

use Illuminate\Contracts\Foundation\Application;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
use Illuminate\Queue\Queue;
use OpenTelemetry\API\Instrumentation\CachedInstrumentation;
use OpenTelemetry\API\Trace\Propagation\TraceContextPropagator;
use OpenTelemetry\API\Trace\Span;
use OpenTelemetry\API\Trace\SpanKind;
use OpenTelemetry\Context\Context;
use OpenTelemetry\SemConv\TraceAttributes;

class QueueWatcher extends Watcher
{
public function __construct(
private CachedInstrumentation $instrumentation,
) {
}

/** @psalm-suppress UndefinedInterfaceMethod */
public function register(Application $app): void
{
Queue::createPayloadUsing(function ($connection, $queue, $payload) {
$opentelemetry = [];
TraceContextPropagator::getInstance()->inject($opentelemetry);

return [
'opentelemetry' => $opentelemetry,
];
});

$app['events']->listen(JobProcessing::class, [$this, 'handleJobProcessing']);
$app['events']->listen(JobProcessed::class, [$this, 'handleJobProcessed']);
}

public function handleJobProcessing(JobProcessing $jobProcessing): void
{
$parent = TraceContextPropagator::getInstance()->extract(
$jobProcessing->job->payload()['opentelemetry'] ?? [],
);

$span = $this->instrumentation
->tracer()
->spanBuilder('queue')
->setSpanKind(SpanKind::KIND_INTERNAL)
->setParent($parent)
->startSpan();

$span->setAttributes([
TraceAttributes::MESSAGING_DESTINATION_NAME => $jobProcessing->job->getQueue(),
TraceAttributes::MESSAGING_MESSAGE_ID => $jobProcessing->job->uuid(),
TraceAttributes::MESSAGING_MESSAGE_ENVELOPE_SIZE => strlen($jobProcessing->job->getRawBody()),
]);

$span->addEvent('job', [
'connection' => $jobProcessing->connectionName,
'job.id' => $jobProcessing->job->getJobId(),
'job.name' => $jobProcessing->job->getName(),
'raw.body' => $jobProcessing->job->getRawBody(),
]);

Context::storage()->attach($span->storeInContext($parent));
}

public function handleJobProcessed(JobProcessed $jobProcessed): void
{
$scope = Context::storage()->scope();
if (!$scope) {
return;
}
$scope->detach();
$span = Span::fromContext($scope->context());

$span->addEvent('processed', [
'deleted' => $jobProcessed->job->isDeleted(),
'released' => $jobProcessed->job->isReleased(),
]);

$span->end();
}
}

0 comments on commit 8c26008

Please sign in to comment.