Skip to content

Commit

Permalink
Merge pull request #23 from PcComponentes/feature/allow_set_up_aggreg…
Browse files Browse the repository at this point in the history
…ate_message_occurred_on_format

feat: allow set up aggregate message occurred on format
  • Loading branch information
calmohallag authored Sep 20, 2022
2 parents 9e68c73 + 3ea5e4c commit 178ae62
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 27 deletions.
7 changes: 4 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
},
"require": {
"ext-amqp": "*",
"php": "^7.4 || ^8.0",
"php": "^8.0",
"pccomponentes/ddd-logging": "^2.1",
"pccomponentes/ddd": "^2.0 || 3.0 || 3.1",
"pccomponentes/ddd": "^4.0",
"beberlei/assert": "^3.2",
"ramsey/uuid": "^4.0"
"ramsey/uuid": "^4.0",
"symfony/framework-bundle": "^4.4|^5.0"
},
"require-dev": {
"pccomponentes/coding-standard": "^1.0",
Expand Down
34 changes: 34 additions & 0 deletions src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php
declare(strict_types=1);

namespace PcComponentes\SymfonyMessengerBundle\DependencyInjection;

use Symfony\Component\Config\Definition\Builder\TreeBuilder;
use Symfony\Component\Config\Definition\ConfigurationInterface;

final class Configuration implements ConfigurationInterface
{
public function getConfigTreeBuilder(): TreeBuilder
{
$treeBuilder = new TreeBuilder('symfony_messenger');
$treeBuilder->getRootNode()
->children()
->arrayNode('aggregate_message')
->children()
->arrayNode('occurred_on')
->addDefaultsIfNotSet()
->children()
->enumNode('format')
->values(['U', 'U.v', 'U.u'])
->defaultValue('U')
->end()
->end()
->end()
->end()
->end()
->end()
->end();

return $treeBuilder;
}
}
11 changes: 0 additions & 11 deletions src/DependencyInjection/SerializerCompilerPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

namespace PcComponentes\SymfonyMessengerBundle\DependencyInjection;

use PcComponentes\Ddd\Util\Message\Serialization\JsonApi\AggregateMessageJsonApiSerializable;
use PcComponentes\Ddd\Util\Message\Serialization\JsonApi\AggregateMessageStreamDeserializer;
use PcComponentes\Ddd\Util\Message\Serialization\JsonApi\SimpleMessageJsonApiSerializable;
use PcComponentes\Ddd\Util\Message\Serialization\JsonApi\SimpleMessageStreamDeserializer;
use PcComponentes\DddLogging\DomainTrace\Tracker;
Expand All @@ -30,15 +28,6 @@ public function process(ContainerBuilder $container): void
new Reference('pccom.messenger_bundle.aggregate_message.serializer.stream_deserializer'),
],
),
'pccom.messenger_bundle.aggregate_message.serializer.json_api_serializer' => new Definition(
AggregateMessageJsonApiSerializable::class,
),
'pccom.messenger_bundle.aggregate_message.serializer.stream_deserializer' => new Definition(
AggregateMessageStreamDeserializer::class,
[
new Reference('pccom.messenger_bundle.mapping_registry.aggregate_message'),
],
),
]);

$container->addDefinitions([
Expand Down
38 changes: 38 additions & 0 deletions src/DependencyInjection/SymfonyMessengerExtension.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php
declare(strict_types=1);

namespace PcComponentes\SymfonyMessengerBundle\DependencyInjection;

use PcComponentes\Ddd\Util\Message\Serialization\JsonApi\AggregateMessageJsonApiSerializable;
use PcComponentes\Ddd\Util\Message\Serialization\JsonApi\AggregateMessageStreamDeserializer;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;
use Symfony\Component\DependencyInjection\Extension\Extension;
use Symfony\Component\DependencyInjection\Reference;

final class SymfonyMessengerExtension extends Extension
{
public function load(array $configs, ContainerBuilder $container)
{
$configuration = new Configuration();
$config = $this->processConfiguration($configuration, $configs);

$occurredOnFormat = $config['aggregate_message']['occurred_on']['format'];

$container->addDefinitions([
'pccom.messenger_bundle.aggregate_message.serializer.json_api_serializer' => new Definition(
AggregateMessageJsonApiSerializable::class,
[
$occurredOnFormat,
],
),
'pccom.messenger_bundle.aggregate_message.serializer.stream_deserializer' => new Definition(
AggregateMessageStreamDeserializer::class,
[
new Reference('pccom.messenger_bundle.mapping_registry.aggregate_message'),
$occurredOnFormat,
],
),
]);
}
}
1 change: 1 addition & 0 deletions src/Middleware/SimpleMessagePublisherMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope

foreach ($commandsResult as $commands) {
if (null === $commands) {

continue;
}

Expand Down
10 changes: 7 additions & 3 deletions src/Serializer/AggregateMessageSerializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ public function decode(array $encodedEnvelope): Envelope
} catch (MessageClassNotFoundException $exception) {
throw new MessageDecodingFailedException('Message class not found', 0, $exception);
} catch (Throwable $exception) {
throw new MessageDecodingFailedException('Unable to instantiate class for message. ' . $exception->getMessage(), 0, $exception);
throw new MessageDecodingFailedException(
'Unable to instantiate class for message. ' . $exception->getMessage(),
0,
$exception,
);
}

$this->obtainDomainTrace($aggregateMessage, $encodedEnvelope);
Expand Down Expand Up @@ -84,7 +88,7 @@ private function streamFromEncodedEnvelope(array $encodedEnvelope): AggregateMes
return new AggregateMessageStream(
$aggregateMessage['message_id'],
$aggregateMessage['attributes']['aggregate_id'],
(int) $aggregateMessage['occurred_on'],
(float) $aggregateMessage['occurred_on'],
$aggregateMessage['type'],
self::AGGREGATE_VERSION,
\json_encode($aggregateMessage['attributes']),
Expand Down Expand Up @@ -126,7 +130,7 @@ private function streamFromLegacyEncodedEnvelope(array $encodedEnvelope): Aggreg
return new AggregateMessageStream(
$aggregateMessage['message_id'],
$aggregateMessage['aggregate_id'],
$occurredOn->getTimestamp(),
(float) $occurredOn->getTimestamp(),
$aggregateMessage['name'],
self::AGGREGATE_VERSION,
\json_encode($aggregateMessage['payload']),
Expand Down
7 changes: 5 additions & 2 deletions src/Serializer/DomainSerializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,13 @@ protected function buildRetryCountFromDeathHeader(array $encodedEnvelope): int
}

$retryCount = 0;

foreach ($encodedEnvelope['headers']['x-death'] as $death) {
if (false === \array_key_exists('count', $death)) {
continue;
}

$retryCount = $retryCount + (int) $death['count'];
$retryCount += (int) $death['count'];
}

return $retryCount;
Expand All @@ -83,7 +84,9 @@ protected function extractEnvelopeRetryCount(Envelope $envelope): int
{
$retryCountStamp = $envelope->last(RedeliveryStamp::class);

return null !== $retryCountStamp ? $retryCountStamp->getRetryCount() : 0;
return null !== $retryCountStamp
? $retryCountStamp->getRetryCount()
: 0;
}

private function getCorrelationId(array $encodedEnvelope): string
Expand Down
6 changes: 5 additions & 1 deletion src/Serializer/SimpleMessageSerializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ public function decode(array $encodedEnvelope): Envelope
} catch (MessageClassNotFoundException $exception) {
throw new MessageDecodingFailedException('Message class not found', 0, $exception);
} catch (Throwable $exception) {
throw new MessageDecodingFailedException('Unable to instantiate class for message. ' . $exception->getMessage(), 0, $exception);
throw new MessageDecodingFailedException(
'Unable to instantiate class for message. ' . $exception->getMessage(),
0,
$exception,
);
}

$this->obtainDomainTrace($simpleMessage, $encodedEnvelope);
Expand Down
43 changes: 36 additions & 7 deletions tests/Middleware/SimpleMessagePublisherMiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use PcComponentes\Ddd\Domain\Model\ValueObject\DateTimeValueObject;
use PcComponentes\Ddd\Domain\Model\ValueObject\Uuid;
use PcComponentes\Ddd\Util\Message\ValueObject\AggregateId;
use PcComponentes\SymfonyMessengerBundle\Bus\AllHandledStampExtractor;
use PcComponentes\SymfonyMessengerBundle\Middleware\SimpleMessagePublisherMiddleware;
use PcComponentes\SymfonyMessengerBundle\Tests\Mock\CommandMock;
Expand Down Expand Up @@ -33,7 +34,14 @@ public function test_given_converters_dispatch_only_one_command_per_converter_th
$firstMessageId = Uuid::v4();
$secondMessageId = Uuid::v4();

$envelope = new Envelope(EventMock::fromPayload(Uuid::v4(), Uuid::v4(), DateTimeValueObject::now(), []));
$envelope = new Envelope(
EventMock::fromPayload(
Uuid::v4(),
AggregateId::from(Uuid::v4()->value()),
DateTimeValueObject::now(),
[],
),
);
$envelope = $envelope->with(
new HandledStamp(CommandMock::fromPayload($firstMessageId, []), 'ConverterA'),
new HandledStamp(CommandMock::fromPayload($secondMessageId, []), 'ConverterB'),
Expand All @@ -51,7 +59,14 @@ public function test_given_converters_dispatch_one_command_and_the_other_does_no
{
$firstMessageId = Uuid::v4();

$envelope = new Envelope(EventMock::fromPayload(Uuid::v4(), Uuid::v4(), DateTimeValueObject::now(), []));
$envelope = new Envelope(
EventMock::fromPayload(
Uuid::v4(),
AggregateId::from(Uuid::v4()->value()),
DateTimeValueObject::now(),
[],
),
);
$envelope = $envelope->with(
new HandledStamp(CommandMock::fromPayload($firstMessageId, []), 'ConverterA'),
new HandledStamp(null, 'ConverterB'),
Expand All @@ -70,13 +85,20 @@ public function test_given_converters_dispatch_both_one_command_and_an_array_of_
$secondMessageId = Uuid::v4();
$thirdMessageId = Uuid::v4();

$envelope = new Envelope(EventMock::fromPayload(Uuid::v4(), Uuid::v4(), DateTimeValueObject::now(), []));
$envelope = new Envelope(
EventMock::fromPayload(
Uuid::v4(),
AggregateId::from(Uuid::v4()->value()),
DateTimeValueObject::now(),
[],
),
);
$envelope = $envelope->with(
new HandledStamp(CommandMock::fromPayload($firstMessageId, []), 'ConverterA'),
new HandledStamp([
CommandMock::fromPayload($secondMessageId, []),
CommandMock::fromPayload($thirdMessageId, []),
],
CommandMock::fromPayload($secondMessageId, []),
CommandMock::fromPayload($thirdMessageId, []),
],
'ConverterB'
),
);
Expand All @@ -96,7 +118,14 @@ public function test_given_converters_dispatch_both_one_command_and_an_array_of_
$secondMessageId = Uuid::v4();
$thirdMessageId = Uuid::v4();

$envelope = new Envelope(EventMock::fromPayload(Uuid::v4(), Uuid::v4(), DateTimeValueObject::now(), []));
$envelope = new Envelope(
EventMock::fromPayload(
Uuid::v4(),
AggregateId::from(Uuid::v4()->value()),
DateTimeValueObject::now(),
[],
),
);
$envelope = $envelope->with(
new HandledStamp(CommandMock::fromPayload($firstMessageId, []), 'ConverterA'),
new HandledStamp([
Expand Down

0 comments on commit 178ae62

Please sign in to comment.