Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More Features #120

Merged
merged 6 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ or the [provided tools](#history).

#### Disable Monitoring

You may want to disable monitoring for certain messages. There are two ways to do this:
You may want to disable monitoring for certain messages. There are several ways to do this:

1. When dispatching the message, add the `DisableMonitoringStamp`:
```php
Expand All @@ -97,7 +97,7 @@ You may want to disable monitoring for certain messages. There are two ways to d

$bus->dispatch(new MyMessage(), [new DisableMonitoringStamp()])
```
2. Add the `DisableMonitoringStamp` as a class attribute to your message:
2. Add the `DisableMonitoringStamp` as a class attribute to your message (or parent class/interface):
```php
use Zenstruck\Messenger\Monitor\Stamp\DisableMonitoringStamp;

Expand All @@ -106,8 +106,8 @@ You may want to disable monitoring for certain messages. There are two ways to d
{
}
```
3. You may want to disable monitoring for messages that are dispatched without any handler.
You can do this by using the `DisableMonitoringStamp` with optional constructor argument `true`:
You may want to disable monitoring for messages that are dispatched without any handler.
You can do this by using the `DisableMonitoringStamp` with optional constructor argument `true`:
```php
use Zenstruck\Messenger\Monitor\Stamp\DisableMonitoringStamp;

Expand All @@ -116,6 +116,15 @@ You can do this by using the `DisableMonitoringStamp` with optional constructor
{
}
```
3. Add the message class to the `exclude` config option (can be abstract/interface):
```yaml
# config/packages/zenstruck_messenger_monitor.yaml

zenstruck_messenger_monitor:
storage:
exclude:
- App\Message\MyMessage
```

#### Description

Expand Down Expand Up @@ -145,7 +154,7 @@ add your own in one of two ways:

$bus->dispatch(new MyMessage(), [new TagStamp('tag-1'), new TagStamp('tag-2')])
```
2. Add the `TagStamp` as a class attribute to your message:
2. Add the `TagStamp` as a class attribute to your message (and parent class/interface):
```php
use Zenstruck\Messenger\Monitor\Stamp\TagStamp;

Expand All @@ -155,6 +164,8 @@ add your own in one of two ways:
{
}
```
> [!TIP]
> You can also add the `TagStamp` attribute to parent classes/interfaces.

#### `messenger:monitor:purge` Command

Expand Down Expand Up @@ -300,6 +311,8 @@ when@dev:
```yaml
zenstruck_messenger_monitor:
storage:
# Message classes to disable monitoring for (can be abstract/interface)
exclude: []
orm:

# Your Doctrine entity class that extends "Zenstruck\Messenger\Monitor\History\Model\ProcessedMessage"
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
"symfony/serializer": "^6.4|^7.0",
"symfony/var-dumper": "^6.4|^7.0",
"zenstruck/console-test": "^1.5",
"zenstruck/foundry": "^2.2"
"zenstruck/foundry": "^2.2",
"zenstruck/messenger-test": "^1.11"
},
"suggest": {
"knplabs/knp-time-bundle": "For human readable timestamps and durations on your dashboard.",
Expand Down
17 changes: 13 additions & 4 deletions config/storage_orm.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Zenstruck\Messenger\Monitor\Command\PurgeCommand;
use Zenstruck\Messenger\Monitor\Command\SchedulePurgeCommand;
use Zenstruck\Messenger\Monitor\History\HistoryListener;
use Zenstruck\Messenger\Monitor\EventListener\AddMonitorStampListener;
use Zenstruck\Messenger\Monitor\EventListener\HandleMonitorStampListener;
use Zenstruck\Messenger\Monitor\EventListener\ReceiveMonitorStampListener;
use Zenstruck\Messenger\Monitor\History\ResultNormalizer;
use Zenstruck\Messenger\Monitor\History\Storage;
use Zenstruck\Messenger\Monitor\History\Storage\ORMStorage;
Expand All @@ -29,13 +31,20 @@
->set('.zenstruck_messenger_monitor.history.result_normalizer', ResultNormalizer::class)
->args([param('kernel.project_dir')])

->set('.zenstruck_messenger_monitor.history.listener', HistoryListener::class)
->set('.zenstruck_messenger_monitor.listener.add_monitor_stamp', AddMonitorStampListener::class)
->tag('kernel.event_listener', ['method' => '__invoke', 'event' => SendMessageToTransportsEvent::class])

->set('.zenstruck_messenger_monitor.listener.receive_monitor_stamp', ReceiveMonitorStampListener::class)
->args([
abstract_arg('exclude_classes')
])
->tag('kernel.event_listener', ['method' => '__invoke', 'event' => WorkerMessageReceivedEvent::class])

->set('.zenstruck_messenger_monitor.listener.handle_monitor_stamp', HandleMonitorStampListener::class)
->args([
service('zenstruck_messenger_monitor.history.storage'),
service('.zenstruck_messenger_monitor.history.result_normalizer'),
])
->tag('kernel.event_listener', ['method' => 'addMonitorStamp', 'event' => SendMessageToTransportsEvent::class])
->tag('kernel.event_listener', ['method' => 'receiveMessage', 'event' => WorkerMessageReceivedEvent::class])
->tag('kernel.event_listener', ['method' => 'handleSuccess', 'event' => WorkerMessageHandledEvent::class])
->tag('kernel.event_listener', ['method' => 'handleFailure', 'event' => WorkerMessageFailedEvent::class])

Expand Down
16 changes: 15 additions & 1 deletion src/DependencyInjection/ZenstruckMessengerMonitorExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ public function getConfigTreeBuilder(): TreeBuilder
->children()
->arrayNode('storage')
->children()
->arrayNode('exclude')
->info('Message classes to disable monitoring for (can be abstract/interface)')
->scalarPrototype()
->validate()
->ifTrue(fn($v) => !\class_exists($v) && !\interface_exists($v))
->thenInvalid('Class/interface does not exist.')
->end()
->end()
->end()
->arrayNode('orm')
->children()
->scalarNode('entity_class')
Expand Down Expand Up @@ -91,7 +100,12 @@ protected function loadInternal(array $mergedConfig, ContainerBuilder $container

if ($entity = $mergedConfig['storage']['orm']['entity_class'] ?? null) {
$loader->load('storage_orm.php');
$container->getDefinition('zenstruck_messenger_monitor.history.storage')->setArgument(1, $entity);
$container->getDefinition('zenstruck_messenger_monitor.history.storage')
->setArgument(1, $entity)
;
$container->getDefinition('.zenstruck_messenger_monitor.listener.receive_monitor_stamp')
->setArgument(0, $mergedConfig['storage']['exclude'])
;

if (!\class_exists(Schedule::class)) {
$container->removeDefinition('.zenstruck_messenger_monitor.command.schedule_purge');
Expand Down
28 changes: 28 additions & 0 deletions src/EventListener/AddMonitorStampListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

/*
* This file is part of the zenstruck/messenger-monitor-bundle package.
*
* (c) Kevin Bond <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Zenstruck\Messenger\Monitor\EventListener;

use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Zenstruck\Messenger\Monitor\Stamp\MonitorStamp;

/**
* @author Kevin Bond <[email protected]>
*
* @internal
*/
final class AddMonitorStampListener
{
public function __invoke(SendMessageToTransportsEvent $event): void
{
$event->setEnvelope($event->getEnvelope()->with(new MonitorStamp()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,60 +9,29 @@
* file that was distributed with this source code.
*/

namespace Zenstruck\Messenger\Monitor\History;
namespace Zenstruck\Messenger\Monitor\EventListener;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Stamp\HandledStamp;
use Symfony\Component\Scheduler\Messenger\ScheduledStamp;
use Zenstruck\Messenger\Monitor\History\Model\Result;
use Zenstruck\Messenger\Monitor\History\Model\Results;
use Zenstruck\Messenger\Monitor\Stamp\DisableMonitoringStamp;
use Zenstruck\Messenger\Monitor\History\ResultNormalizer;
use Zenstruck\Messenger\Monitor\History\Storage;
use Zenstruck\Messenger\Monitor\Stamp\MonitorStamp;
use Zenstruck\Messenger\Monitor\Stamp\TagStamp;

/**
* @author Kevin Bond <[email protected]>
*
* @internal
*
* @phpstan-import-type Structure from Result
*/
final class HistoryListener
final class HandleMonitorStampListener
{
public function __construct(private Storage $storage, private ResultNormalizer $normalizer)
{
}

public function addMonitorStamp(SendMessageToTransportsEvent $event): void
{
$event->setEnvelope($event->getEnvelope()->with(new MonitorStamp()));
}

public function receiveMessage(WorkerMessageReceivedEvent $event): void
{
$envelope = $event->getEnvelope();

if ($this->isMonitoringDisabled($envelope)) {
return;
}

$stamp = $envelope->last(MonitorStamp::class);

if (\class_exists(ScheduledStamp::class) && $scheduledStamp = $envelope->last(ScheduledStamp::class)) {
// scheduler transport doesn't trigger SendMessageToTransportsEvent
$stamp = new MonitorStamp($scheduledStamp->messageContext->triggeredAt);

$event->addStamps(TagStamp::forSchedule($scheduledStamp));
}

if ($stamp instanceof MonitorStamp) {
$event->addStamps($stamp->markReceived($event->getReceiverName()));
}
public function __construct(
private Storage $storage,
private ResultNormalizer $normalizer,
) {
}

public function handleSuccess(WorkerMessageHandledEvent $event): void
Expand Down Expand Up @@ -101,35 +70,6 @@ public function handleFailure(WorkerMessageFailedEvent $event): void
);
}

private function isMonitoringDisabled(Envelope $envelope): bool
{
if ($stamp = $envelope->last(DisableMonitoringStamp::class)) {
if (false === $stamp->onlyWhenNoHandler) {
return true;
}

return $this->hasNoHandlers($envelope);
}

$reflection = new \ReflectionClass($envelope->getMessage());
$attributes = [];

while (false !== $reflection && [] === $attributes) {
$attributes = $reflection->getAttributes(DisableMonitoringStamp::class);
$reflection = $reflection->getParentClass();
}

if ([] !== $attributes) {
if (false === $attributes[0]->newInstance()->onlyWhenNoHandler) {
return true;
}

return $this->hasNoHandlers($envelope);
}

return false;
}

private function createResults(Envelope $envelope, ?HandlerFailedException $exception = null): Results
{
$results = [];
Expand Down Expand Up @@ -157,9 +97,4 @@ private function createResults(Envelope $envelope, ?HandlerFailedException $exce

return new Results($results);
}

private function hasNoHandlers(Envelope $envelope): bool
{
return [] === $envelope->all(HandledStamp::class);
}
}
83 changes: 83 additions & 0 deletions src/EventListener/ReceiveMonitorStampListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<?php

/*
* This file is part of the zenstruck/messenger-monitor-bundle package.
*
* (c) Kevin Bond <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Zenstruck\Messenger\Monitor\EventListener;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Stamp\HandledStamp;
use Symfony\Component\Scheduler\Messenger\ScheduledStamp;
use Zenstruck\Messenger\Monitor\Stamp\DisableMonitoringStamp;
use Zenstruck\Messenger\Monitor\Stamp\MonitorStamp;
use Zenstruck\Messenger\Monitor\Stamp\TagStamp;

/**
* @author Kevin Bond <[email protected]>
*
* @internal
*/
final class ReceiveMonitorStampListener
{
/**
* @param class-string[] $excludedClasses
*/
public function __construct(private array $excludedClasses)
{
}

public function __invoke(WorkerMessageReceivedEvent $event): void
{
$envelope = $event->getEnvelope();

if ($this->isMonitoringDisabled($envelope)) {
return;
}

$stamp = $envelope->last(MonitorStamp::class);

if (\class_exists(ScheduledStamp::class) && $scheduledStamp = $envelope->last(ScheduledStamp::class)) {
// scheduler transport doesn't trigger SendMessageToTransportsEvent
$stamp = new MonitorStamp($scheduledStamp->messageContext->triggeredAt);

$event->addStamps(TagStamp::forSchedule($scheduledStamp));
}

if ($stamp instanceof MonitorStamp) {
$event->addStamps($stamp->markReceived($event->getReceiverName()));
}
}

private function isMonitoringDisabled(Envelope $envelope): bool
{
$messageClass = $envelope->getMessage()::class;

foreach ($this->excludedClasses as $excludedClass) {
if (\is_a($messageClass, $excludedClass, true)) {
return true;
}
}

if (!$stamp = DisableMonitoringStamp::firstFrom($envelope)) {
return false;
}

if ($stamp->onlyWhenNoHandler && !$this->hasNoHandlers($envelope)) {
return false;
}

return true;
}

private function hasNoHandlers(Envelope $envelope): bool
{
return [] === $envelope->all(HandledStamp::class);
}
}
4 changes: 2 additions & 2 deletions src/History/Model/ProcessedMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ abstract class ProcessedMessage
private ?string $failureType = null;
private ?string $failureMessage = null;

/** @var Structure[]|Results */
private array|Results $results;
/** @var Structure[]|Results|null */
private array|Results|null $results;

public function __construct(Envelope $envelope, Results $results, ?\Throwable $exception = null)
{
Expand Down
Loading
Loading