diff --git a/src/Tracing/Instrumentation/EventSubscriber/MessageEventSubscriber.php b/src/Tracing/Instrumentation/EventSubscriber/MessageEventSubscriber.php index e1f8b98..8531290 100644 --- a/src/Tracing/Instrumentation/EventSubscriber/MessageEventSubscriber.php +++ b/src/Tracing/Instrumentation/EventSubscriber/MessageEventSubscriber.php @@ -10,7 +10,11 @@ use Instrumentation\Semantics\Attribute\MessageAttributeProviderInterface; use Instrumentation\Semantics\OperationName\MessageOperationNameResolverInterface; use Instrumentation\Tracing\Instrumentation\MainSpanContextInterface; +use Instrumentation\Tracing\Instrumentation\Messenger\AbstractDateTimeStamp; use Instrumentation\Tracing\Instrumentation\Messenger\AttributesStamp; +use Instrumentation\Tracing\Instrumentation\Messenger\ConsumedAtStamp; +use Instrumentation\Tracing\Instrumentation\Messenger\HandledAtStamp; +use Instrumentation\Tracing\Instrumentation\Messenger\SentAtStamp; use Instrumentation\Tracing\Instrumentation\TracerAwareTrait; use Instrumentation\Tracing\Propagation\Messenger\PropagationStrategyStamp; use OpenTelemetry\API\Trace\SpanInterface; @@ -24,6 +28,7 @@ use Symfony\Component\EventDispatcher\EventSubscriberInterface; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\AbstractWorkerMessageEvent; +use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; @@ -43,6 +48,7 @@ class MessageEventSubscriber implements EventSubscriberInterface public static function getSubscribedEvents(): array { return [ + SendMessageToTransportsEvent::class => [['onSend', 512]], WorkerMessageReceivedEvent::class => [['onConsume', 512]], // before all SF listeners WorkerMessageHandledEvent::class => [['onHandled', -512]], WorkerMessageFailedEvent::class => [['onHandled', -512]], @@ -59,8 +65,18 @@ public function __construct( $this->scopes = new \SplObjectStorage(); } + public function onSend(SendMessageToTransportsEvent $event): void + { + $envelope = $event->getEnvelope(); + $envelope = $envelope->with(new SentAtStamp()); + + $event->setEnvelope($envelope); + } + public function onConsume(WorkerMessageReceivedEvent $event): void { + $event->addStamps(new ConsumedAtStamp()); + $attributes = array_merge($this->getAttributes($event->getEnvelope()), [ 'messaging.operation' => 'process', 'messaging.consumer_id' => gethostname(), @@ -110,6 +126,21 @@ public function onHandled(AbstractWorkerMessageEvent $event): void $span->setStatus(StatusCode::STATUS_ERROR); } + $event->addStamps(new HandledAtStamp()); + + foreach ([ + 'sent_at' => SentAtStamp::class, + 'consumed_at' => ConsumedAtStamp::class, + 'handled_at' => HandledAtStamp::class, + ] as $attribute => $stampFqdn) { + /** @var AbstractDateTimeStamp $stamp */ + if (!$stamp = $event->getEnvelope()->last($stampFqdn)) { + continue; + } + + $span->setAttribute('messenger.'.$attribute, $stamp->getDate()->format('Y-m-d H:i:s.u')); + } + if ($this->createSubSpan) { if (null !== $scope = $this->scopes[$span]) { $scope->detach(); diff --git a/src/Tracing/Instrumentation/Messenger/AbstractDateTimeStamp.php b/src/Tracing/Instrumentation/Messenger/AbstractDateTimeStamp.php new file mode 100644 index 0000000..0f3ec21 --- /dev/null +++ b/src/Tracing/Instrumentation/Messenger/AbstractDateTimeStamp.php @@ -0,0 +1,27 @@ + + */ + +namespace Instrumentation\Tracing\Instrumentation\Messenger; + +use Symfony\Component\Messenger\Stamp\StampInterface; + +abstract class AbstractDateTimeStamp implements StampInterface +{ + private readonly \DateTimeInterface $dateTime; + + public function __construct(?\DateTimeInterface $dateTime = null) + { + $this->dateTime = $dateTime ?: new \DateTime(); + } + + public function getDate(): \DateTimeInterface + { + return $this->dateTime; + } +} diff --git a/src/Tracing/Instrumentation/Messenger/ConsumedAtStamp.php b/src/Tracing/Instrumentation/Messenger/ConsumedAtStamp.php new file mode 100644 index 0000000..135215b --- /dev/null +++ b/src/Tracing/Instrumentation/Messenger/ConsumedAtStamp.php @@ -0,0 +1,14 @@ + + */ + +namespace Instrumentation\Tracing\Instrumentation\Messenger; + +final class ConsumedAtStamp extends AbstractDateTimeStamp +{ +} diff --git a/src/Tracing/Instrumentation/Messenger/HandledAtStamp.php b/src/Tracing/Instrumentation/Messenger/HandledAtStamp.php new file mode 100644 index 0000000..ca41eff --- /dev/null +++ b/src/Tracing/Instrumentation/Messenger/HandledAtStamp.php @@ -0,0 +1,14 @@ + + */ + +namespace Instrumentation\Tracing\Instrumentation\Messenger; + +final class HandledAtStamp extends AbstractDateTimeStamp +{ +} diff --git a/src/Tracing/Instrumentation/Messenger/SentAtStamp.php b/src/Tracing/Instrumentation/Messenger/SentAtStamp.php new file mode 100644 index 0000000..518fe1c --- /dev/null +++ b/src/Tracing/Instrumentation/Messenger/SentAtStamp.php @@ -0,0 +1,14 @@ + + */ + +namespace Instrumentation\Tracing\Instrumentation\Messenger; + +final class SentAtStamp extends AbstractDateTimeStamp +{ +}