Skip to content

Commit

Permalink
Merge pull request #320 from formapro-forks/queue-interop
Browse files Browse the repository at this point in the history
Add queue interop driver
  • Loading branch information
sagikazarmark authored Sep 3, 2017
2 parents 0a53868 + c6c0a39 commit c1e6755
Show file tree
Hide file tree
Showing 5 changed files with 479 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Currently these are the supported backends, with more coming with each release:
* Doctrine DBAL
* Pheanstalk
* PhpAmqp / RabbitMQ
* Queue interop

You can learn more on our website about Bernard and its [related projects][website] or just dive directly into [the
documentation][documentation].
Expand Down
8 changes: 6 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
"phpspec/phpspec": "^2.4",
"phpunit/phpunit": "^5.5|^6.0",
"iron-io/iron_mq": "~4.0",
"league/container": "~2.3"
"league/container": "~2.3",
"queue-interop/queue-interop": "^0.6",
"queue-interop/amqp-interop": "^0.6"
},
"suggest": {
"php-amqplib/php-amqplib": "Allow sending messages to an AMQP server using php-amqplib",
Expand All @@ -34,7 +36,9 @@
"pda/pheanstalk": "Allow sending messages to Beanstalk using pheanstalk",
"predis/predis": "Allow sending messages to Redis using predis",
"aws/aws-sdk-php": "Allow sending messages to AWS services like Simple Queue Service",
"mongodb/mongodb": "Allow sending messages to a MongoDB server via PHP Driver"
"mongodb/mongodb": "Allow sending messages to a MongoDB server via PHP Driver",
"queue-interop/queue-interop": "Allow sending messages using queue interop compatible transports",
"queue-interop/amqp-interop": "Allow sending messages using amqp interop compatible transports"
},

"autoload" : {
Expand Down
26 changes: 26 additions & 0 deletions doc/drivers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Several different types of drivers are supported. Currently these are available:
* `Redis Extension`_
* `Predis`_
* `Amazon SQS`_
* `Queue Interop`_

Google AppEngine
----------------
Expand Down Expand Up @@ -475,3 +476,28 @@ require a HTTP request to amazon to be resolved.
$driver = new SqsDriver($connection, array(
'queue-name' => 'queue-url',
));
Queue Interop
-------------

This driver adds ability to use any `queue interop <https://github.com/queue-interop/queue-interop#implementations>`_ compatible transport.
For example we choose enqueue/fs one to demonstrate how it is working.

.. code-block:: json
{
"require" : {
"enqueue/fs" : "^0.7"
}
}
.. code-block:: php
<?php
use Bernard\Driver\InteropDriver;
use Enqueue\Fs\FsConnectionFactory;
$context = (new FsConnectionFactory('file://'.__DIR__.'/queues'))->createContext();
$driver = new InteropDriver($context);
150 changes: 150 additions & 0 deletions src/Driver/InteropDriver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
<?php
namespace Bernard\Driver;

use Bernard\Driver;
use Interop\Amqp\AmqpContext;
use Interop\Amqp\AmqpQueue;
use Interop\Queue\PsrConsumer;
use Interop\Queue\PsrContext;

final class InteropDriver implements Driver
{
/**
* @var PsrContext
*/
private $context;

/**
* @var PsrConsumer[]
*/
private $consumers;

/**
* @param PsrContext $context
*/
public function __construct(PsrContext $context)
{
$this->context = $context;

$this->consumers = [];
}

/**
* {@inheritdoc}
*/
public function listQueues()
{
return [];
}

/**
* {@inheritdoc}
*/
public function createQueue($queueName)
{
if ($this->context instanceof AmqpContext) {
$this->context->declareQueue($this->createAmqpQueue($queueName));
}
}

/**
* {@inheritdoc}
*/
public function countMessages($queueName)
{
if ($this->context instanceof AmqpContext) {
return $this->context->declareQueue($this->createAmqpQueue($queueName));
}

return 0;
}

/**
* {@inheritdoc}
*/
public function pushMessage($queueName, $message)
{
$queue = $this->context->createQueue($queueName);
$message = $this->context->createMessage($message);

$this->context->createProducer()->send($queue, $message);
}

/**
* {@inheritdoc}
*/
public function popMessage($queueName, $duration = 5)
{
if ($message = $this->getQueueConsumer($queueName)->receive($duration * 1000)) {
return [$message->getBody(), $message];
}
}

/**
* {@inheritdoc}
*/
public function acknowledgeMessage($queueName, $receipt)
{
$this->getQueueConsumer($queueName)->acknowledge($receipt);
}

/**
* {@inheritdoc}
*/
public function peekQueue($queueName, $index = 0, $limit = 20)
{
return [];
}

/**
* {@inheritdoc}
*/
public function removeQueue($queueName)
{
if ($this->context instanceof AmqpContext) {
$queue = $this->createAmqpQueue($queueName);

$this->context->deleteQueue($queue);
}
}

/**
* {@inheritdoc}
*/
public function info()
{
return [];
}

/**
* @param string $queueName
*
* @return PsrConsumer
*/
private function getQueueConsumer($queueName)
{
if (false == array_key_exists($queueName, $this->consumers)) {
$queue = $this->context->createQueue($queueName);

$this->consumers[$queueName] = $this->context->createConsumer($queue);
}

return $this->consumers[$queueName];
}

/**
* @param string $queueName
*
* @return AmqpQueue
*/
private function createAmqpQueue($queueName)
{
/** @var AmqpContext $context */
$context = $this->context;

$queue = $context->createQueue($queueName);
$queue->addFlag(AmqpQueue::FLAG_DURABLE);

return $queue;
}
}
Loading

0 comments on commit c1e6755

Please sign in to comment.