diff --git a/README.md b/README.md index dc3788a8..3cf7a9b5 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ Currently these are the supported backends, with more coming with each release: * Doctrine DBAL * Pheanstalk * PhpAmqp / RabbitMQ + * Queue interop You can learn more on our website about Bernard and its [related projects][website] or just dive directly into [the documentation][documentation]. diff --git a/composer.json b/composer.json index 4b828ddd..f6f9904b 100644 --- a/composer.json +++ b/composer.json @@ -25,7 +25,9 @@ "phpspec/phpspec": "^2.4", "phpunit/phpunit": "^5.5|^6.0", "iron-io/iron_mq": "~4.0", - "league/container": "~2.3" + "league/container": "~2.3", + "queue-interop/queue-interop": "^0.6", + "queue-interop/amqp-interop": "^0.6" }, "suggest": { "php-amqplib/php-amqplib": "Allow sending messages to an AMQP server using php-amqplib", @@ -34,7 +36,9 @@ "pda/pheanstalk": "Allow sending messages to Beanstalk using pheanstalk", "predis/predis": "Allow sending messages to Redis using predis", "aws/aws-sdk-php": "Allow sending messages to AWS services like Simple Queue Service", - "mongodb/mongodb": "Allow sending messages to a MongoDB server via PHP Driver" + "mongodb/mongodb": "Allow sending messages to a MongoDB server via PHP Driver", + "queue-interop/queue-interop": "Allow sending messages using queue interop compatible transports", + "queue-interop/amqp-interop": "Allow sending messages using amqp interop compatible transports" }, "autoload" : { diff --git a/doc/drivers.rst b/doc/drivers.rst index eda84129..23e15a77 100644 --- a/doc/drivers.rst +++ b/doc/drivers.rst @@ -13,6 +13,7 @@ Several different types of drivers are supported. Currently these are available: * `Redis Extension`_ * `Predis`_ * `Amazon SQS`_ +* `Queue Interop`_ Google AppEngine ---------------- @@ -475,3 +476,28 @@ require a HTTP request to amazon to be resolved. $driver = new SqsDriver($connection, array( 'queue-name' => 'queue-url', )); + +Queue Interop +------------- + +This driver adds ability to use any `queue interop `_ compatible transport. +For example we choose enqueue/fs one to demonstrate how it is working. + +.. code-block:: json + + { + "require" : { + "enqueue/fs" : "^0.7" + } + } + +.. code-block:: php + + createContext(); + + $driver = new InteropDriver($context); diff --git a/src/Driver/InteropDriver.php b/src/Driver/InteropDriver.php new file mode 100644 index 00000000..4b087d78 --- /dev/null +++ b/src/Driver/InteropDriver.php @@ -0,0 +1,150 @@ +context = $context; + + $this->consumers = []; + } + + /** + * {@inheritdoc} + */ + public function listQueues() + { + return []; + } + + /** + * {@inheritdoc} + */ + public function createQueue($queueName) + { + if ($this->context instanceof AmqpContext) { + $this->context->declareQueue($this->createAmqpQueue($queueName)); + } + } + + /** + * {@inheritdoc} + */ + public function countMessages($queueName) + { + if ($this->context instanceof AmqpContext) { + return $this->context->declareQueue($this->createAmqpQueue($queueName)); + } + + return 0; + } + + /** + * {@inheritdoc} + */ + public function pushMessage($queueName, $message) + { + $queue = $this->context->createQueue($queueName); + $message = $this->context->createMessage($message); + + $this->context->createProducer()->send($queue, $message); + } + + /** + * {@inheritdoc} + */ + public function popMessage($queueName, $duration = 5) + { + if ($message = $this->getQueueConsumer($queueName)->receive($duration * 1000)) { + return [$message->getBody(), $message]; + } + } + + /** + * {@inheritdoc} + */ + public function acknowledgeMessage($queueName, $receipt) + { + $this->getQueueConsumer($queueName)->acknowledge($receipt); + } + + /** + * {@inheritdoc} + */ + public function peekQueue($queueName, $index = 0, $limit = 20) + { + return []; + } + + /** + * {@inheritdoc} + */ + public function removeQueue($queueName) + { + if ($this->context instanceof AmqpContext) { + $queue = $this->createAmqpQueue($queueName); + + $this->context->deleteQueue($queue); + } + } + + /** + * {@inheritdoc} + */ + public function info() + { + return []; + } + + /** + * @param string $queueName + * + * @return PsrConsumer + */ + private function getQueueConsumer($queueName) + { + if (false == array_key_exists($queueName, $this->consumers)) { + $queue = $this->context->createQueue($queueName); + + $this->consumers[$queueName] = $this->context->createConsumer($queue); + } + + return $this->consumers[$queueName]; + } + + /** + * @param string $queueName + * + * @return AmqpQueue + */ + private function createAmqpQueue($queueName) + { + /** @var AmqpContext $context */ + $context = $this->context; + + $queue = $context->createQueue($queueName); + $queue->addFlag(AmqpQueue::FLAG_DURABLE); + + return $queue; + } +} diff --git a/tests/Driver/InteropDriverTest.php b/tests/Driver/InteropDriverTest.php new file mode 100644 index 00000000..0755b919 --- /dev/null +++ b/tests/Driver/InteropDriverTest.php @@ -0,0 +1,296 @@ +assertInstanceOf('Bernard\Driver', new InteropDriver($this->createInteropContextMock())); + } + + public function testListQueuesMethodDoesNothingAndAlwaysReturnEmptyArray() + { + $driver = new InteropDriver($this->createInteropContextMock()); + + $this->assertSame([], $driver->listQueues()); + } + + public function testCreateQueueMethodDoesNothingAndAlwaysReturnNull() + { + $driver = new InteropDriver($this->createInteropContextMock()); + + $this->assertNull($driver->createQueue('aQueueName')); + } + + public function testPushMessageMethodPublishMessageToQueueUsingInteropProducer() + { + $queue = $this->createMock(PsrQueue::class); + $message = $this->createMock(PsrMessage::class); + + $producer = $this->createMock(PsrProducer::class); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($queue), $this->identicalTo($message)) + ; + + $context = $this->createInteropContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theQueueName') + ->willReturn($queue) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->with('theBody') + ->willReturn($message) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + + $driver = new InteropDriver($context); + + $driver->pushMessage('theQueueName', 'theBody'); + } + + public function testPopMessageReturnNullIfInteropConsumerReturnNothingOnReceive() + { + $queue = $this->createMock(PsrQueue::class); + + $consumer = $this->createMock(PsrConsumer::class); + $consumer + ->expects($this->once()) + ->method('receive') + ->with(5000) + ->willReturn(null) + ; + + $context = $this->createInteropContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theQueueName') + ->willReturn($queue) + ; + $context + ->expects($this->once()) + ->method('createConsumer') + ->with($this->identicalTo($queue)) + ->willReturn($consumer) + ; + + $driver = new InteropDriver($context); + + $this->assertNull($driver->popMessage('theQueueName')); + } + + public function testPopMessageReturnArrayWithBodyAndInteropMessage() + { + $queue = $this->createMock(PsrQueue::class); + $message = $this->createMock(PsrMessage::class); + $message + ->expects($this->once()) + ->method('getBody') + ->willReturn('theBody') + ; + + $consumer = $this->createMock(PsrConsumer::class); + $consumer + ->expects($this->once()) + ->method('receive') + ->with(6789) + ->willReturn($message) + ; + + $context = $this->createInteropContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theQueueName') + ->willReturn($queue) + ; + $context + ->expects($this->once()) + ->method('createConsumer') + ->with($this->identicalTo($queue)) + ->willReturn($consumer) + ; + + $driver = new InteropDriver($context); + + $this->assertSame( + ['theBody', $message], + $driver->popMessage('theQueueName', 6.789) + ); + } + + public function testAcknowledgeMessage() + { + $queue = $this->createMock(PsrQueue::class); + $message = $this->createMock(PsrMessage::class); + + $consumer = $this->createMock(PsrConsumer::class); + $consumer + ->expects($this->once()) + ->method('receive') + ->willReturn($message) + ; + $consumer + ->expects($this->once()) + ->method('acknowledge') + ->with($this->identicalTo($message)) + ; + + $context = $this->createInteropContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theQueueName') + ->willReturn($queue) + ; + $context + ->expects($this->once()) + ->method('createConsumer') + ->with($this->identicalTo($queue)) + ->willReturn($consumer) + ; + + $driver = new InteropDriver($context); + + $result = $driver->popMessage('theQueueName'); + + //guard + $this->assertSame($message, $result[1]); + + $driver->acknowledgeMessage('theQueueName', $result[1]); + } + + public function testPeekQueueMethodDoesNothingAndAlwaysReturnEmptyArray() + { + $driver = new InteropDriver($this->createInteropContextMock()); + + $this->assertSame([], $driver->peekQueue('aQueueName')); + } + + public function testRemoveQueueMethodDoesNothingAndAlwaysReturnNull() + { + $driver = new InteropDriver($this->createInteropContextMock()); + + $this->assertNull($driver->removeQueue('aQueueName')); + } + + public function testInfoMethodDoesNothingAndAlwaysReturnEmptyArray() + { + $driver = new InteropDriver($this->createInteropContextMock()); + + $this->assertNull($driver->removeQueue('aQueueName')); + } + + public function testCreateQueueMethodShouldDeclareAmqpQueue() + { + $queue = $this->createMock(AmqpQueue::class); + $queue + ->expects($this->once()) + ->method('addFlag') + ->with(AmqpQueue::FLAG_DURABLE) + ; + + $context = $this->createAmqpInteropContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theQueueName') + ->willReturn($queue) + ; + $context + ->expects($this->once()) + ->method('declareQueue') + ->with($this->identicalTo($queue)) + ; + + $driver = new InteropDriver($context); + + $this->assertNull($driver->createQueue('theQueueName')); + } + + public function testDeleteQueueMethodShouldCallDeleteQueueMethodOnAmqpContext() + { + $queue = $this->createMock(AmqpQueue::class); + $queue + ->expects($this->once()) + ->method('addFlag') + ->with(AmqpQueue::FLAG_DURABLE) + ; + + $context = $this->createAmqpInteropContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theQueueName') + ->willReturn($queue) + ; + $context + ->expects($this->once()) + ->method('deleteQueue') + ->with($this->identicalTo($queue)) + ; + + $driver = new InteropDriver($context); + + $this->assertNull($driver->removeQueue('theQueueName')); + } + + public function testCountMessagesMethodShouldUseCountFromAmqpDeclareQueueResult() + { + $queue = $this->createMock(AmqpQueue::class); + + $context = $this->createAmqpInteropContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theQueueName') + ->willReturn($queue) + ; + $context + ->expects($this->once()) + ->method('declareQueue') + ->with($this->identicalTo($queue)) + ->willReturn(123) + ; + + $driver = new InteropDriver($context); + + $this->assertSame(123, $driver->countMessages('theQueueName')); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|AmqpContext + */ + private function createAmqpInteropContextMock() + { + return $this->createMock(AmqpContext::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|PsrContext + */ + private function createInteropContextMock() + { + return $this->createMock(PsrContext::class); + } +}