Skip to content

Commit

Permalink
add message timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
cdaguerre committed Nov 27, 2024
1 parent 5bb3cb4 commit 13f380c
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
use Instrumentation\Semantics\Attribute\MessageAttributeProviderInterface;
use Instrumentation\Semantics\OperationName\MessageOperationNameResolverInterface;
use Instrumentation\Tracing\Instrumentation\MainSpanContextInterface;
use Instrumentation\Tracing\Instrumentation\Messenger\AbstractDateTimeStamp;
use Instrumentation\Tracing\Instrumentation\Messenger\AttributesStamp;
use Instrumentation\Tracing\Instrumentation\Messenger\ConsumedAtStamp;
use Instrumentation\Tracing\Instrumentation\Messenger\HandledAtStamp;
use Instrumentation\Tracing\Instrumentation\Messenger\SentAtStamp;
use Instrumentation\Tracing\Instrumentation\TracerAwareTrait;
use Instrumentation\Tracing\Propagation\Messenger\PropagationStrategyStamp;
use OpenTelemetry\API\Trace\SpanInterface;
Expand All @@ -24,6 +28,7 @@
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\AbstractWorkerMessageEvent;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
Expand All @@ -43,6 +48,7 @@ class MessageEventSubscriber implements EventSubscriberInterface
public static function getSubscribedEvents(): array
{
return [
SendMessageToTransportsEvent::class => [['onSend', 512]],
WorkerMessageReceivedEvent::class => [['onConsume', 512]], // before all SF listeners
WorkerMessageHandledEvent::class => [['onHandled', -512]],
WorkerMessageFailedEvent::class => [['onHandled', -512]],
Expand All @@ -59,8 +65,18 @@ public function __construct(
$this->scopes = new \SplObjectStorage();
}

public function onSend(SendMessageToTransportsEvent $event): void
{
$envelope = $event->getEnvelope();
$envelope = $envelope->with(new SentAtStamp());

$event->setEnvelope($envelope);
}

public function onConsume(WorkerMessageReceivedEvent $event): void
{
$event->addStamps(new ConsumedAtStamp());

$attributes = array_merge($this->getAttributes($event->getEnvelope()), [
'messaging.operation' => 'process',
'messaging.consumer_id' => gethostname(),
Expand Down Expand Up @@ -110,6 +126,21 @@ public function onHandled(AbstractWorkerMessageEvent $event): void
$span->setStatus(StatusCode::STATUS_ERROR);
}

$event->addStamps(new HandledAtStamp());

foreach ([
'sent_at' => SentAtStamp::class,
'consumed_at' => ConsumedAtStamp::class,
'handled_at' => HandledAtStamp::class,
] as $attribute => $stampFqdn) {
/** @var AbstractDateTimeStamp $stamp */
if (!$stamp = $event->getEnvelope()->last($stampFqdn)) {
continue;
}

$span->setAttribute('messenger.'.$attribute, $stamp->getDate()->format('Y-m-d H:i:s.u'));
}

if ($this->createSubSpan) {
if (null !== $scope = $this->scopes[$span]) {
$scope->detach();
Expand Down
27 changes: 27 additions & 0 deletions src/Tracing/Instrumentation/Messenger/AbstractDateTimeStamp.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

/*
* This file is part of the worldia/instrumentation-bundle package.
* (c) Worldia <[email protected]>
*/

namespace Instrumentation\Tracing\Instrumentation\Messenger;

use Symfony\Component\Messenger\Stamp\StampInterface;

abstract class AbstractDateTimeStamp implements StampInterface
{
private readonly \DateTimeInterface $dateTime;

public function __construct(?\DateTimeInterface $dateTime = null)
{
$this->dateTime = $dateTime ?: new \DateTime();
}

public function getDate(): \DateTimeInterface
{
return $this->dateTime;
}
}
14 changes: 14 additions & 0 deletions src/Tracing/Instrumentation/Messenger/ConsumedAtStamp.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

/*
* This file is part of the worldia/instrumentation-bundle package.
* (c) Worldia <[email protected]>
*/

namespace Instrumentation\Tracing\Instrumentation\Messenger;

final class ConsumedAtStamp extends AbstractDateTimeStamp
{
}
14 changes: 14 additions & 0 deletions src/Tracing/Instrumentation/Messenger/HandledAtStamp.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

/*
* This file is part of the worldia/instrumentation-bundle package.
* (c) Worldia <[email protected]>
*/

namespace Instrumentation\Tracing\Instrumentation\Messenger;

final class HandledAtStamp extends AbstractDateTimeStamp
{
}
14 changes: 14 additions & 0 deletions src/Tracing/Instrumentation/Messenger/SentAtStamp.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

/*
* This file is part of the worldia/instrumentation-bundle package.
* (c) Worldia <[email protected]>
*/

namespace Instrumentation\Tracing\Instrumentation\Messenger;

final class SentAtStamp extends AbstractDateTimeStamp
{
}

0 comments on commit 13f380c

Please sign in to comment.