diff --git a/src/Instrumentation/Laravel/src/LaravelInstrumentation.php b/src/Instrumentation/Laravel/src/LaravelInstrumentation.php index d5e85a5c..f5dcc4e4 100644 --- a/src/Instrumentation/Laravel/src/LaravelInstrumentation.php +++ b/src/Instrumentation/Laravel/src/LaravelInstrumentation.php @@ -12,6 +12,7 @@ use OpenTelemetry\Contrib\Instrumentation\Laravel\Watchers\ExceptionWatcher; use OpenTelemetry\Contrib\Instrumentation\Laravel\Watchers\LogWatcher; use OpenTelemetry\Contrib\Instrumentation\Laravel\Watchers\QueryWatcher; +use OpenTelemetry\Contrib\Instrumentation\Laravel\Watchers\QueueWatcher; use OpenTelemetry\Contrib\Instrumentation\Laravel\Watchers\RequestWatcher; use OpenTelemetry\Contrib\Instrumentation\Laravel\Watchers\Watcher; use function OpenTelemetry\Instrumentation\hook; @@ -39,6 +40,7 @@ public static function register(): void self::registerWatchers($application, new ExceptionWatcher()); self::registerWatchers($application, new LogWatcher()); self::registerWatchers($application, new QueryWatcher($instrumentation)); + self::registerWatchers($application, new QueueWatcher($instrumentation)); self::registerWatchers($application, new RequestWatcher()); }, ); diff --git a/src/Instrumentation/Laravel/src/Watchers/QueueWatcher.php b/src/Instrumentation/Laravel/src/Watchers/QueueWatcher.php new file mode 100644 index 00000000..e310942e --- /dev/null +++ b/src/Instrumentation/Laravel/src/Watchers/QueueWatcher.php @@ -0,0 +1,86 @@ +inject($opentelemetry); + + return [ + 'opentelemetry' => $opentelemetry, + ]; + }); + + $app['events']->listen(JobProcessing::class, [$this, 'handleJobProcessing']); + $app['events']->listen(JobProcessed::class, [$this, 'handleJobProcessed']); + } + + public function handleJobProcessing(JobProcessing $jobProcessing): void + { + $parent = TraceContextPropagator::getInstance()->extract( + $jobProcessing->job->payload()['opentelemetry'] ?? [], + ); + + $span = $this->instrumentation + ->tracer() + ->spanBuilder('queue') + ->setSpanKind(SpanKind::KIND_INTERNAL) + ->setParent($parent) + ->startSpan(); + + $span->setAttributes([ + TraceAttributes::MESSAGING_DESTINATION_NAME => $jobProcessing->job->getQueue(), + TraceAttributes::MESSAGING_MESSAGE_ID => $jobProcessing->job->uuid(), + TraceAttributes::MESSAGING_MESSAGE_ENVELOPE_SIZE => strlen($jobProcessing->job->getRawBody()), + ]); + + $span->addEvent('job', [ + 'connection' => $jobProcessing->connectionName, + 'job.id' => $jobProcessing->job->getJobId(), + 'job.name' => $jobProcessing->job->getName(), + 'raw.body' => $jobProcessing->job->getRawBody(), + ]); + + Context::storage()->attach($span->storeInContext($parent)); + } + + public function handleJobProcessed(JobProcessed $jobProcessed): void + { + $scope = Context::storage()->scope(); + if (!$scope) { + return; + } + $scope->detach(); + $span = Span::fromContext($scope->context()); + + $span->addEvent('processed', [ + 'deleted' => $jobProcessed->job->isDeleted(), + 'released' => $jobProcessed->job->isReleased(), + ]); + + $span->end(); + } +}