Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add queue interop driver #320

Merged
merged 3 commits into from
Sep 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Currently these are the supported backends, with more coming with each release:
* Doctrine DBAL
* Pheanstalk
* PhpAmqp / RabbitMQ
* Queue interop

You can learn more on our website about Bernard and its [related projects][website] or just dive directly into [the
documentation][documentation].
Expand Down
8 changes: 6 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
"phpspec/phpspec": "^2.4",
"phpunit/phpunit": "^5.5|^6.0",
"iron-io/iron_mq": "~4.0",
"league/container": "~2.3"
"league/container": "~2.3",
"queue-interop/queue-interop": "^0.6",
"queue-interop/amqp-interop": "^0.6"
},
"suggest": {
"php-amqplib/php-amqplib": "Allow sending messages to an AMQP server using php-amqplib",
Expand All @@ -34,7 +36,9 @@
"pda/pheanstalk": "Allow sending messages to Beanstalk using pheanstalk",
"predis/predis": "Allow sending messages to Redis using predis",
"aws/aws-sdk-php": "Allow sending messages to AWS services like Simple Queue Service",
"mongodb/mongodb": "Allow sending messages to a MongoDB server via PHP Driver"
"mongodb/mongodb": "Allow sending messages to a MongoDB server via PHP Driver",
"queue-interop/queue-interop": "Allow sending messages using queue interop compatible transports",
"queue-interop/amqp-interop": "Allow sending messages using amqp interop compatible transports"
},

"autoload" : {
Expand Down
26 changes: 26 additions & 0 deletions doc/drivers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Several different types of drivers are supported. Currently these are available:
* `Redis Extension`_
* `Predis`_
* `Amazon SQS`_
* `Queue Interop`_

Google AppEngine
----------------
Expand Down Expand Up @@ -475,3 +476,28 @@ require a HTTP request to amazon to be resolved.
$driver = new SqsDriver($connection, array(
'queue-name' => 'queue-url',
));

Queue Interop
-------------

This driver adds ability to use any `queue interop <https://github.com/queue-interop/queue-interop#implementations>`_ compatible transport.
For example we choose enqueue/fs one to demonstrate how it is working.

.. code-block:: json

{
"require" : {
"enqueue/fs" : "^0.7"
}
}

.. code-block:: php

<?php

use Bernard\Driver\InteropDriver;
use Enqueue\Fs\FsConnectionFactory;

$context = (new FsConnectionFactory('file://'.__DIR__.'/queues'))->createContext();

$driver = new InteropDriver($context);
150 changes: 150 additions & 0 deletions src/Driver/InteropDriver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
<?php
namespace Bernard\Driver;

use Bernard\Driver;
use Interop\Amqp\AmqpContext;
use Interop\Amqp\AmqpQueue;
use Interop\Queue\PsrConsumer;
use Interop\Queue\PsrContext;

final class InteropDriver implements Driver
{
/**
* @var PsrContext
*/
private $context;

/**
* @var PsrConsumer[]
*/
private $consumers;

/**
* @param PsrContext $context
*/
public function __construct(PsrContext $context)
{
$this->context = $context;

$this->consumers = [];
}

/**
* {@inheritdoc}
*/
public function listQueues()
{
return [];
}

/**
* {@inheritdoc}
*/
public function createQueue($queueName)
{
if ($this->context instanceof AmqpContext) {
$this->context->declareQueue($this->createAmqpQueue($queueName));
}
}

/**
* {@inheritdoc}
*/
public function countMessages($queueName)
{
if ($this->context instanceof AmqpContext) {
return $this->context->declareQueue($this->createAmqpQueue($queueName));
}

return 0;
}

/**
* {@inheritdoc}
*/
public function pushMessage($queueName, $message)
{
$queue = $this->context->createQueue($queueName);
$message = $this->context->createMessage($message);

$this->context->createProducer()->send($queue, $message);
}

/**
* {@inheritdoc}
*/
public function popMessage($queueName, $duration = 5)
{
if ($message = $this->getQueueConsumer($queueName)->receive($duration * 1000)) {
return [$message->getBody(), $message];
}
}

/**
* {@inheritdoc}
*/
public function acknowledgeMessage($queueName, $receipt)
{
$this->getQueueConsumer($queueName)->acknowledge($receipt);
}

/**
* {@inheritdoc}
*/
public function peekQueue($queueName, $index = 0, $limit = 20)
{
return [];
}

/**
* {@inheritdoc}
*/
public function removeQueue($queueName)
{
if ($this->context instanceof AmqpContext) {
$queue = $this->createAmqpQueue($queueName);

$this->context->deleteQueue($queue);
}
}

/**
* {@inheritdoc}
*/
public function info()
{
return [];
}

/**
* @param string $queueName
*
* @return PsrConsumer
*/
private function getQueueConsumer($queueName)
{
if (false == array_key_exists($queueName, $this->consumers)) {
$queue = $this->context->createQueue($queueName);

$this->consumers[$queueName] = $this->context->createConsumer($queue);
}

return $this->consumers[$queueName];
}

/**
* @param string $queueName
*
* @return AmqpQueue
*/
private function createAmqpQueue($queueName)
{
/** @var AmqpContext $context */
$context = $this->context;

$queue = $context->createQueue($queueName);
$queue->addFlag(AmqpQueue::FLAG_DURABLE);

return $queue;
}
}
Loading