Skip to content

Commit

Permalink
Merge pull request #120 from kbond/features
Browse files Browse the repository at this point in the history
More Features
  • Loading branch information
kbond authored Nov 25, 2024
2 parents 8d2e21e + 747bb44 commit 7104238
Show file tree
Hide file tree
Showing 29 changed files with 946 additions and 229 deletions.
23 changes: 18 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ or the [provided tools](#history).

#### Disable Monitoring

You may want to disable monitoring for certain messages. There are two ways to do this:
You may want to disable monitoring for certain messages. There are several ways to do this:

1. When dispatching the message, add the `DisableMonitoringStamp`:
```php
Expand All @@ -97,7 +97,7 @@ You may want to disable monitoring for certain messages. There are two ways to d

$bus->dispatch(new MyMessage(), [new DisableMonitoringStamp()])
```
2. Add the `DisableMonitoringStamp` as a class attribute to your message:
2. Add the `DisableMonitoringStamp` as a class attribute to your message (or parent class/interface):
```php
use Zenstruck\Messenger\Monitor\Stamp\DisableMonitoringStamp;

Expand All @@ -106,8 +106,8 @@ You may want to disable monitoring for certain messages. There are two ways to d
{
}
```
3. You may want to disable monitoring for messages that are dispatched without any handler.
You can do this by using the `DisableMonitoringStamp` with optional constructor argument `true`:
You may want to disable monitoring for messages that are dispatched without any handler.
You can do this by using the `DisableMonitoringStamp` with optional constructor argument `true`:
```php
use Zenstruck\Messenger\Monitor\Stamp\DisableMonitoringStamp;

Expand All @@ -116,6 +116,15 @@ You can do this by using the `DisableMonitoringStamp` with optional constructor
{
}
```
3. Add the message class to the `exclude` config option (can be abstract/interface):
```yaml
# config/packages/zenstruck_messenger_monitor.yaml

zenstruck_messenger_monitor:
storage:
exclude:
- App\Message\MyMessage
```

#### Description

Expand Down Expand Up @@ -145,7 +154,7 @@ add your own in one of two ways:

$bus->dispatch(new MyMessage(), [new TagStamp('tag-1'), new TagStamp('tag-2')])
```
2. Add the `TagStamp` as a class attribute to your message:
2. Add the `TagStamp` as a class attribute to your message (and parent class/interface):
```php
use Zenstruck\Messenger\Monitor\Stamp\TagStamp;

Expand All @@ -155,6 +164,8 @@ add your own in one of two ways:
{
}
```
> [!TIP]
> You can also add the `TagStamp` attribute to parent classes/interfaces.

#### `messenger:monitor:purge` Command

Expand Down Expand Up @@ -300,6 +311,8 @@ when@dev:
```yaml
zenstruck_messenger_monitor:
storage:
# Message classes to disable monitoring for (can be abstract/interface)
exclude: []
orm:

# Your Doctrine entity class that extends "Zenstruck\Messenger\Monitor\History\Model\ProcessedMessage"
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
"symfony/serializer": "^6.4|^7.0",
"symfony/var-dumper": "^6.4|^7.0",
"zenstruck/console-test": "^1.5",
"zenstruck/foundry": "^2.2"
"zenstruck/foundry": "^2.2",
"zenstruck/messenger-test": "^1.11"
},
"suggest": {
"knplabs/knp-time-bundle": "For human readable timestamps and durations on your dashboard.",
Expand Down
17 changes: 13 additions & 4 deletions config/storage_orm.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Zenstruck\Messenger\Monitor\Command\PurgeCommand;
use Zenstruck\Messenger\Monitor\Command\SchedulePurgeCommand;
use Zenstruck\Messenger\Monitor\History\HistoryListener;
use Zenstruck\Messenger\Monitor\EventListener\AddMonitorStampListener;
use Zenstruck\Messenger\Monitor\EventListener\HandleMonitorStampListener;
use Zenstruck\Messenger\Monitor\EventListener\ReceiveMonitorStampListener;
use Zenstruck\Messenger\Monitor\History\ResultNormalizer;
use Zenstruck\Messenger\Monitor\History\Storage;
use Zenstruck\Messenger\Monitor\History\Storage\ORMStorage;
Expand All @@ -29,13 +31,20 @@
->set('.zenstruck_messenger_monitor.history.result_normalizer', ResultNormalizer::class)
->args([param('kernel.project_dir')])

->set('.zenstruck_messenger_monitor.history.listener', HistoryListener::class)
->set('.zenstruck_messenger_monitor.listener.add_monitor_stamp', AddMonitorStampListener::class)
->tag('kernel.event_listener', ['method' => '__invoke', 'event' => SendMessageToTransportsEvent::class])

->set('.zenstruck_messenger_monitor.listener.receive_monitor_stamp', ReceiveMonitorStampListener::class)
->args([
abstract_arg('exclude_classes')
])
->tag('kernel.event_listener', ['method' => '__invoke', 'event' => WorkerMessageReceivedEvent::class])

->set('.zenstruck_messenger_monitor.listener.handle_monitor_stamp', HandleMonitorStampListener::class)
->args([
service('zenstruck_messenger_monitor.history.storage'),
service('.zenstruck_messenger_monitor.history.result_normalizer'),
])
->tag('kernel.event_listener', ['method' => 'addMonitorStamp', 'event' => SendMessageToTransportsEvent::class])
->tag('kernel.event_listener', ['method' => 'receiveMessage', 'event' => WorkerMessageReceivedEvent::class])
->tag('kernel.event_listener', ['method' => 'handleSuccess', 'event' => WorkerMessageHandledEvent::class])
->tag('kernel.event_listener', ['method' => 'handleFailure', 'event' => WorkerMessageFailedEvent::class])

Expand Down
16 changes: 15 additions & 1 deletion src/DependencyInjection/ZenstruckMessengerMonitorExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ public function getConfigTreeBuilder(): TreeBuilder
->children()
->arrayNode('storage')
->children()
->arrayNode('exclude')
->info('Message classes to disable monitoring for (can be abstract/interface)')
->scalarPrototype()
->validate()
->ifTrue(fn($v) => !\class_exists($v) && !\interface_exists($v))
->thenInvalid('Class/interface does not exist.')
->end()
->end()
->end()
->arrayNode('orm')
->children()
->scalarNode('entity_class')
Expand Down Expand Up @@ -91,7 +100,12 @@ protected function loadInternal(array $mergedConfig, ContainerBuilder $container

if ($entity = $mergedConfig['storage']['orm']['entity_class'] ?? null) {
$loader->load('storage_orm.php');
$container->getDefinition('zenstruck_messenger_monitor.history.storage')->setArgument(1, $entity);
$container->getDefinition('zenstruck_messenger_monitor.history.storage')
->setArgument(1, $entity)
;
$container->getDefinition('.zenstruck_messenger_monitor.listener.receive_monitor_stamp')
->setArgument(0, $mergedConfig['storage']['exclude'])
;

if (!\class_exists(Schedule::class)) {
$container->removeDefinition('.zenstruck_messenger_monitor.command.schedule_purge');
Expand Down
28 changes: 28 additions & 0 deletions src/EventListener/AddMonitorStampListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

/*
* This file is part of the zenstruck/messenger-monitor-bundle package.
*
* (c) Kevin Bond <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Zenstruck\Messenger\Monitor\EventListener;

use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Zenstruck\Messenger\Monitor\Stamp\MonitorStamp;

/**
* @author Kevin Bond <[email protected]>
*
* @internal
*/
final class AddMonitorStampListener
{
public function __invoke(SendMessageToTransportsEvent $event): void
{
$event->setEnvelope($event->getEnvelope()->with(new MonitorStamp()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,60 +9,29 @@
* file that was distributed with this source code.
*/

namespace Zenstruck\Messenger\Monitor\History;
namespace Zenstruck\Messenger\Monitor\EventListener;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Stamp\HandledStamp;
use Symfony\Component\Scheduler\Messenger\ScheduledStamp;
use Zenstruck\Messenger\Monitor\History\Model\Result;
use Zenstruck\Messenger\Monitor\History\Model\Results;
use Zenstruck\Messenger\Monitor\Stamp\DisableMonitoringStamp;
use Zenstruck\Messenger\Monitor\History\ResultNormalizer;
use Zenstruck\Messenger\Monitor\History\Storage;
use Zenstruck\Messenger\Monitor\Stamp\MonitorStamp;
use Zenstruck\Messenger\Monitor\Stamp\TagStamp;

/**
* @author Kevin Bond <[email protected]>
*
* @internal
*
* @phpstan-import-type Structure from Result
*/
final class HistoryListener
final class HandleMonitorStampListener
{
public function __construct(private Storage $storage, private ResultNormalizer $normalizer)
{
}

public function addMonitorStamp(SendMessageToTransportsEvent $event): void
{
$event->setEnvelope($event->getEnvelope()->with(new MonitorStamp()));
}

public function receiveMessage(WorkerMessageReceivedEvent $event): void
{
$envelope = $event->getEnvelope();

if ($this->isMonitoringDisabled($envelope)) {
return;
}

$stamp = $envelope->last(MonitorStamp::class);

if (\class_exists(ScheduledStamp::class) && $scheduledStamp = $envelope->last(ScheduledStamp::class)) {
// scheduler transport doesn't trigger SendMessageToTransportsEvent
$stamp = new MonitorStamp($scheduledStamp->messageContext->triggeredAt);

$event->addStamps(TagStamp::forSchedule($scheduledStamp));
}

if ($stamp instanceof MonitorStamp) {
$event->addStamps($stamp->markReceived($event->getReceiverName()));
}
public function __construct(
private Storage $storage,
private ResultNormalizer $normalizer,
) {
}

public function handleSuccess(WorkerMessageHandledEvent $event): void
Expand Down Expand Up @@ -101,35 +70,6 @@ public function handleFailure(WorkerMessageFailedEvent $event): void
);
}

private function isMonitoringDisabled(Envelope $envelope): bool
{
if ($stamp = $envelope->last(DisableMonitoringStamp::class)) {
if (false === $stamp->onlyWhenNoHandler) {
return true;
}

return $this->hasNoHandlers($envelope);
}

$reflection = new \ReflectionClass($envelope->getMessage());
$attributes = [];

while (false !== $reflection && [] === $attributes) {
$attributes = $reflection->getAttributes(DisableMonitoringStamp::class);
$reflection = $reflection->getParentClass();
}

if ([] !== $attributes) {
if (false === $attributes[0]->newInstance()->onlyWhenNoHandler) {
return true;
}

return $this->hasNoHandlers($envelope);
}

return false;
}

private function createResults(Envelope $envelope, ?HandlerFailedException $exception = null): Results
{
$results = [];
Expand Down Expand Up @@ -157,9 +97,4 @@ private function createResults(Envelope $envelope, ?HandlerFailedException $exce

return new Results($results);
}

private function hasNoHandlers(Envelope $envelope): bool
{
return [] === $envelope->all(HandledStamp::class);
}
}
83 changes: 83 additions & 0 deletions src/EventListener/ReceiveMonitorStampListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<?php

/*
* This file is part of the zenstruck/messenger-monitor-bundle package.
*
* (c) Kevin Bond <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Zenstruck\Messenger\Monitor\EventListener;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Stamp\HandledStamp;
use Symfony\Component\Scheduler\Messenger\ScheduledStamp;
use Zenstruck\Messenger\Monitor\Stamp\DisableMonitoringStamp;
use Zenstruck\Messenger\Monitor\Stamp\MonitorStamp;
use Zenstruck\Messenger\Monitor\Stamp\TagStamp;

/**
* @author Kevin Bond <[email protected]>
*
* @internal
*/
final class ReceiveMonitorStampListener
{
/**
* @param class-string[] $excludedClasses
*/
public function __construct(private array $excludedClasses)
{
}

public function __invoke(WorkerMessageReceivedEvent $event): void
{
$envelope = $event->getEnvelope();

if ($this->isMonitoringDisabled($envelope)) {
return;
}

$stamp = $envelope->last(MonitorStamp::class);

if (\class_exists(ScheduledStamp::class) && $scheduledStamp = $envelope->last(ScheduledStamp::class)) {
// scheduler transport doesn't trigger SendMessageToTransportsEvent
$stamp = new MonitorStamp($scheduledStamp->messageContext->triggeredAt);

$event->addStamps(TagStamp::forSchedule($scheduledStamp));
}

if ($stamp instanceof MonitorStamp) {
$event->addStamps($stamp->markReceived($event->getReceiverName()));
}
}

private function isMonitoringDisabled(Envelope $envelope): bool
{
$messageClass = $envelope->getMessage()::class;

foreach ($this->excludedClasses as $excludedClass) {
if (\is_a($messageClass, $excludedClass, true)) {
return true;
}
}

if (!$stamp = DisableMonitoringStamp::firstFrom($envelope)) {
return false;
}

if ($stamp->onlyWhenNoHandler && !$this->hasNoHandlers($envelope)) {
return false;
}

return true;
}

private function hasNoHandlers(Envelope $envelope): bool
{
return [] === $envelope->all(HandledStamp::class);
}
}
4 changes: 2 additions & 2 deletions src/History/Model/ProcessedMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ abstract class ProcessedMessage
private ?string $failureType = null;
private ?string $failureMessage = null;

/** @var Structure[]|Results */
private array|Results $results;
/** @var Structure[]|Results|null */
private array|Results|null $results;

public function __construct(Envelope $envelope, Results $results, ?\Throwable $exception = null)
{
Expand Down
Loading

0 comments on commit 7104238

Please sign in to comment.