Skip to content

Commit

Permalink
bug #7 Bugs in messenger-adapter (Anton Sadovnikov)
Browse files Browse the repository at this point in the history
This PR was squashed before being merged into the 1.0-dev branch (closes #7).

Discussion
----------

Bugs in messenger-adapter

#6

Hi, i tried to use messenger-adapter and found few bugs:

1. `QueueInteropTransport::send` dont implement `TransportInterface`:
`Compile Error: Declaration of Enqueue\\MessengerAdapter\\QueueInteropTransport::send($message): void must be compatible with Symfony\\Component\\Messenger\\Transport\\SenderInterface::send(Symfony\\Component\\Messenger\\Envelope $envelope)`
1. If options `deliveryDelay` is true, must be call `producer::setDelayStrategy` method

Commits
-------

fe49dc8 Bugs in messenger-adapter
  • Loading branch information
sroze committed Jun 6, 2018
2 parents 6c3d5b9 + fe49dc8 commit 8cb604f
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 42 deletions.
11 changes: 5 additions & 6 deletions Bundle/Resources/config/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ services:
enqueue.messenger_transport.factory:
class: 'Enqueue\MessengerAdapter\QueueInteropTransportFactory'
arguments:
- "@messenger.transport.decoder"
- "@messenger.transport.encoder"
- "@service_container"
- "%kernel.debug%"
tags:
- 'messenger.transport_factory'
- '@messenger.transport.decoder'
- '@messenger.transport.encoder'
- '@service_container'
- '%kernel.debug%'
tags: ['messenger.transport_factory']
44 changes: 39 additions & 5 deletions QueueInteropTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@

namespace Enqueue\MessengerAdapter;

use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
use Enqueue\MessengerAdapter\Exception\RejectMessageException;
use Enqueue\MessengerAdapter\Exception\RequeueMessageException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Interop\Queue\Exception as InteropQueueException;
use Enqueue\MessengerAdapter\Exception\SendingMessageFailedException;
use Symfony\Component\OptionsResolver\Options;
use Symfony\Component\OptionsResolver\OptionsResolver;

/**
* Symfony Messenger transport.
Expand All @@ -39,10 +45,11 @@ public function __construct(DecoderInterface $decoder, EncoderInterface $encoder
$this->decoder = $decoder;
$this->encoder = $encoder;
$this->contextManager = $contextManager;
$this->options = $options;
$this->debug = $debug;

$this->receiveTimeout = 1000; // 1s
$resolver = new OptionsResolver();
$this->configureOptions($resolver);
$this->options = $resolver->resolve($options);
}

/**
Expand Down Expand Up @@ -93,7 +100,7 @@ public function receive(callable $handler): void
/**
* {@inheritdoc}
*/
public function send($message): void
public function send(Envelope $message): void
{
$psrContext = $this->contextManager->psrContext();
$destination = $this->getDestination();
Expand All @@ -114,6 +121,9 @@ public function send($message): void
$producer = $psrContext->createProducer();

if (isset($this->options['deliveryDelay'])) {
if ($producer instanceof DelayStrategyAware) {
$producer->setDelayStrategy($this->options['delayStrategy']);
}
$producer->setDeliveryDelay($this->options['deliveryDelay']);
}
if (isset($this->options['priority'])) {
Expand Down Expand Up @@ -145,11 +155,35 @@ public function stop(): void
$this->shouldStop = true;
}

public function configureOptions(OptionsResolver $resolver): void
{
$resolver->setDefaults(array(
'receiveTimeout' => null,
'deliveryDelay' => null,
'delayStrategy' => RabbitMqDelayPluginDelayStrategy::class,
'priority' => null,
'timeToLive' => null,
'topic' => array('name' => 'messages'),
'queue' => array('name' => 'messages'),
));

$resolver->setAllowedTypes('receiveTimeout', array('null', 'int'));
$resolver->setAllowedTypes('deliveryDelay', array('null', 'int'));
$resolver->setAllowedTypes('priority', array('null', 'int'));
$resolver->setAllowedTypes('timeToLive', array('null', 'int'));
$resolver->setAllowedTypes('delayStrategy', array('null', 'string'));

$resolver->setAllowedValues('delayStrategy', array(null, RabbitMqDelayPluginDelayStrategy::class, RabbitMqDlxDelayStrategy::class));
$resolver->setNormalizer('delayStrategy', function (Options $options, $value) {
return null !== $value ? new $value() : null;
});
}

private function getDestination(): array
{
return array(
'topic' => $this->options['topic']['name'] ?? 'messages',
'queue' => $this->options['queue']['name'] ?? 'messages',
'topic' => $this->options['topic']['name'],
'queue' => $this->options['queue']['name'],
);
}
}
23 changes: 14 additions & 9 deletions QueueInteropTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Enqueue\MessengerAdapter;

use Interop\Queue\PsrContext;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
Expand All @@ -29,7 +30,6 @@ class QueueInteropTransportFactory implements TransportFactoryInterface
private $decoder;
private $encoder;
private $debug;

private $container;

public function __construct(DecoderInterface $decoder, EncoderInterface $encoder, ContainerInterface $container, bool $debug = false)
Expand All @@ -54,7 +54,7 @@ public function createSender(string $dsn, array $options): SenderInterface

public function createTransport(string $dsn, array $options): TransportInterface
{
list($contextManager, $options) = $this->parseDsn($dsn);
[$contextManager, $options] = $this->parseDsn($dsn);

return new QueueInteropTransport(
$this->decoder,
Expand All @@ -70,15 +70,17 @@ public function supports(string $dsn, array $options): bool
return 0 === strpos($dsn, 'enqueue://');
}

private function parseDsn(string $dsn)
private function parseDsn(string $dsn): array
{
$parsedDsn = parse_url($dsn);
$enqueueContextName = $parsedDsn['host'];

$amqpOptions = array();
if (isset($parsedUrl['query'])) {
parse_str($parsedUrl['query'], $parsedQuery);

if (isset($parsedDsn['query'])) {
parse_str($parsedDsn['query'], $parsedQuery);
$parsedQuery = array_map(function ($e) {
return is_numeric($e) ? (int) $e : $e;
}, $parsedQuery);
$amqpOptions = array_replace_recursive($amqpOptions, $parsedQuery);
}

Expand All @@ -90,10 +92,13 @@ private function parseDsn(string $dsn)
));
}

$psrContext = $this->container->get($contextService);
if (!$psrContext instanceof PsrContext) {
throw new \RuntimeException(sprintf('Service "%s" not instanceof PsrContext', $contextService));
}

return array(
new AmqpContextManager(
$this->container->get($contextService)
),
new AmqpContextManager($psrContext),
$amqpOptions,
);
}
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ enqueue://default
?queue[name]=queue_name
&topic[name]=topic_name
&deliveryDelay=1800
&timeToLime=3600
&delayStrategy=Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy
&timeToLive=3600
&receiveTimeout=1000
&priority=1
```
17 changes: 9 additions & 8 deletions Tests/MessageBusProcessorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Enqueue\MessengerAdapter\Tests;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\MessageBusInterface;
Expand Down Expand Up @@ -39,6 +40,7 @@ public function testProcess()
{
$message = $this->getTestMessage();
$receivedMessage = new ReceivedMessage('test');
$envelope = new Envelope($receivedMessage);
$contextProphecy = $this->prophesize(PsrContext::class);
$busProphecy = $this->prophesize(MessageBusInterface::class);
$busProphecy->dispatch($receivedMessage)->shouldBeCalled();
Expand All @@ -47,8 +49,7 @@ public function testProcess()
'body' => 'body',
'headers' => array('header'),
'properties' => 'props',
))->shouldBeCalled()->willReturn($receivedMessage);

))->shouldBeCalled()->willReturn($envelope);
$messageBusProcessor = new MessageBusProcessor($busProphecy->reveal(), $decoderProphecy->reveal());
$this->assertSame(PsrProcessor::ACK, $messageBusProcessor->process($message, $contextProphecy->reveal()));
}
Expand All @@ -57,12 +58,12 @@ public function testProcessReject()
{
$message = $this->getTestMessage();
$receivedMessage = new ReceivedMessage('test');
$envelope = new Envelope($receivedMessage);
$contextProphecy = $this->prophesize(PsrContext::class);
$decoderProphecy = $this->prophesize(DecoderInterface::class);
$decoderProphecy->decode(Argument::any())->shouldBeCalled()->willReturn($receivedMessage);
$decoderProphecy->decode(Argument::any())->shouldBeCalled()->willReturn($envelope);
$busProphecy = $this->prophesize(MessageBusInterface::class);
$busProphecy->dispatch($receivedMessage)->shouldBeCalled()->willThrow(new RejectMessageException());

$messageBusProcessor = new MessageBusProcessor($busProphecy->reveal(), $decoderProphecy->reveal());
$this->assertSame(PsrProcessor::REJECT, $messageBusProcessor->process($message, $contextProphecy->reveal()));
}
Expand All @@ -71,12 +72,12 @@ public function testProcessRequeue()
{
$message = $this->getTestMessage();
$receivedMessage = new ReceivedMessage('test');
$envelope = new Envelope($receivedMessage);
$contextProphecy = $this->prophesize(PsrContext::class);
$decoderProphecy = $this->prophesize(DecoderInterface::class);
$decoderProphecy->decode(Argument::any())->shouldBeCalled()->willReturn($receivedMessage);
$decoderProphecy->decode(Argument::any())->shouldBeCalled()->willReturn($envelope);
$busProphecy = $this->prophesize(MessageBusInterface::class);
$busProphecy->dispatch($receivedMessage)->shouldBeCalled()->willThrow(new RequeueMessageException());

$messageBusProcessor = new MessageBusProcessor($busProphecy->reveal(), $decoderProphecy->reveal());
$this->assertSame(PsrProcessor::REQUEUE, $messageBusProcessor->process($message, $contextProphecy->reveal()));
}
Expand All @@ -85,12 +86,12 @@ public function testProcessRejectAnyException()
{
$message = $this->getTestMessage();
$receivedMessage = new ReceivedMessage('test');
$envelope = new Envelope($receivedMessage);
$contextProphecy = $this->prophesize(PsrContext::class);
$decoderProphecy = $this->prophesize(DecoderInterface::class);
$decoderProphecy->decode(Argument::any())->shouldBeCalled()->willReturn($receivedMessage);
$decoderProphecy->decode(Argument::any())->shouldBeCalled()->willReturn($envelope);
$busProphecy = $this->prophesize(MessageBusInterface::class);
$busProphecy->dispatch($receivedMessage)->shouldBeCalled()->willThrow(new \InvalidArgumentException());

$messageBusProcessor = new MessageBusProcessor($busProphecy->reveal(), $decoderProphecy->reveal());
$this->assertSame(PsrProcessor::REJECT, $messageBusProcessor->process($message, $contextProphecy->reveal()));
}
Expand Down
41 changes: 37 additions & 4 deletions Tests/QueueInteropTransportFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Enqueue\MessengerAdapter\Tests;

use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
use Enqueue\MessengerAdapter\QueueInteropTransport;
use Enqueue\MessengerAdapter\QueueInteropTransportFactory;
use Symfony\Component\DependencyInjection\ContainerInterface;
Expand Down Expand Up @@ -51,16 +52,48 @@ public function testCreatesTransport()
$this->assertEquals($expectedTransport, $factory->createReceiver($dsn, array()));
}

public function testDnsParsing()
{
$queuePsrContext = $this->prophesize(PsrContext::class)->reveal();
$decoder = $this->prophesize(DecoderInterface::class);
$encoder = $this->prophesize(EncoderInterface::class);

$container = $this->prophesize(ContainerInterface::class);
$container->has('enqueue.transport.default.context')->willReturn(true);
$container->get('enqueue.transport.default.context')->willReturn($queuePsrContext);

$factory = $this->getFactory($decoder->reveal(), $encoder->reveal(), $container->reveal());
$dsn = 'enqueue://default?queue[name]=test&topic[name]=test&deliveryDelay=100&delayStrategy=Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy&timeToLive=100&receiveTimeout=100&priority=100';

$expectedTransport = new QueueInteropTransport(
$decoder->reveal(),
$encoder->reveal(),
new AmqpContextManager($queuePsrContext),
array(
'topic' => array('name' => 'test'),
'queue' => array('name' => 'test'),
'deliveryDelay' => 100,
'delayStrategy' => RabbitMqDelayPluginDelayStrategy::class,
'priority' => 100,
'timeToLive' => 100,
'receiveTimeout' => 100,
),
true
);

$this->assertEquals($expectedTransport, $factory->createTransport($dsn, array()));

// Ensure BC for Symfony beta 4.1
$this->assertEquals($expectedTransport, $factory->createSender($dsn, array()));
$this->assertEquals($expectedTransport, $factory->createReceiver($dsn, array()));
}

/**
* @expectedException \RuntimeException
* @expectedExceptionMessage Can't find Enqueue's transport named "foo": Service "enqueue.transport.foo.context" is not found.
*/
public function testItThrowsAnExceptionWhenContextDoesNotExist()
{
$queuePsrContext = $this->prophesize(PsrContext::class);
$decoder = $this->prophesize(DecoderInterface::class);
$encoder = $this->prophesize(EncoderInterface::class);

$container = $this->prophesize(ContainerInterface::class);
$container->has('enqueue.transport.foo.context')->willReturn(false);

Expand Down
Loading

0 comments on commit 8cb604f

Please sign in to comment.