Skip to content

Commit

Permalink
AWS SQS FIFO Queue support (#318)
Browse files Browse the repository at this point in the history
* AWS SQS FIFO Queue support

* Only use FifoQueue Attribute when dealing with a Queue that ends with '.fifo'

* Add testItCreatesFifoQueue()

* Add testItPushesMessagesToFifoQueue()
  • Loading branch information
holtkamp authored and acrobat committed Sep 15, 2017
1 parent c1e6755 commit 15e2586
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 6 deletions.
51 changes: 47 additions & 4 deletions src/Driver/SqsDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
class SqsDriver extends AbstractPrefetchDriver
{
const AWS_SQS_FIFO_SUFFIX = '.fifo';

protected $sqs;
protected $queueUrls;

Expand Down Expand Up @@ -62,14 +64,48 @@ public function createQueue($queueName)
try {
$this->resolveUrl($queueName);
} catch (\InvalidArgumentException $e) {
$result = $this->sqs->createQueue([
$parameters = [
'QueueName' => $queueName,
]);
];

if($this->isFifoQueue($queueName)) {
$parameters['Attributes'] = [
'FifoQueue' => 'true',
];
}

$result = $this->sqs->createQueue($parameters);

$this->queueUrls[$queueName] = $result['QueueUrl'];
}
}

/**
* @param string $queueName
*
* @return bool
*/
private function isFifoQueue($queueName)
{
return $this->endsWith($queueName, self::AWS_SQS_FIFO_SUFFIX);
}

/**
* @param string $haystack
* @param string $needle
*
* @return bool
*/
private function endsWith($haystack, $needle)
{
$length = strlen($needle);
if ($length === 0) {
return true;
}

return (substr($haystack, -$length) === $needle);
}

/**
* {@inheritdoc}
*/
Expand All @@ -96,10 +132,17 @@ public function pushMessage($queueName, $message)
{
$queueUrl = $this->resolveUrl($queueName);

$this->sqs->sendMessage([
$parameters = [
'QueueUrl' => $queueUrl,
'MessageBody' => $message,
]);
];

if($this->isFifoQueue($queueName)){
$parameters['MessageGroupId'] = __METHOD__;
$parameters['MessageDeduplicationId'] = md5($message);
}

$this->sqs->sendMessage($parameters);
}

/**
Expand Down
60 changes: 58 additions & 2 deletions tests/Driver/SqsDriverTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
class SqsDriverTest extends \PHPUnit\Framework\TestCase
{
const DUMMY_QUEUE_NAME = 'my-queue';
const DUMMY_FIFO_QUEUE_NAME = 'my-queue.fifo';
const DUMMY_QUEUE_URL_PREFIX = 'https://sqs.eu-west-1.amazonaws.com/123123';

public function setUp()
Expand Down Expand Up @@ -62,6 +63,29 @@ public function testItCreatesQueue()
$this->driver->createQueue(self::DUMMY_QUEUE_NAME);
}

public function testItCreatesFifoQueue()
{
$this->sqs
->expects($this->once())
->method('createQueue')
->with($this->equalTo([
'QueueName' => self::DUMMY_FIFO_QUEUE_NAME,
'Attributes' => [
'FifoQueue' => 'true',
]
]))
->will($this->returnValue(
$this->wrapResult([
'QueueUrl' => self::DUMMY_QUEUE_URL_PREFIX,
])
));

// Calling this twice asserts that if queue exists
// there won't be attempt to create it.
$this->driver->createQueue(self::DUMMY_FIFO_QUEUE_NAME);
$this->driver->createQueue(self::DUMMY_FIFO_QUEUE_NAME);
}

public function testItDeletesQueue()
{
$this->assertSqsQueueUrl();
Expand Down Expand Up @@ -147,15 +171,35 @@ public function testItPrefetchesMessages()

public function testItPushesMessages()
{
$message = 'This is a message';

$this->assertSqsQueueUrl();
$this->sqs
->expects($this->once())
->method('sendMessage')
->with($this->equalTo([
'QueueUrl' => self::DUMMY_QUEUE_URL_PREFIX. '/'. self::DUMMY_QUEUE_NAME,
'MessageBody' => 'This is a message'
'MessageBody' => $message
]));
$this->driver->pushMessage('my-queue', 'This is a message');
$this->driver->pushMessage(self::DUMMY_QUEUE_NAME, $message);
}

public function testItPushesMessagesToFifoQueue()
{
$message = 'This is a message';

$this->assertSqsFifoQueueUrl();
$this->sqs
->expects($this->once())
->method('sendMessage')
->with($this->equalTo([
'QueueUrl' => self::DUMMY_QUEUE_URL_PREFIX. '/'. self::DUMMY_FIFO_QUEUE_NAME,
'MessageBody' => $message,
'MessageGroupId' => \Bernard\Driver\SqsDriver::class . '::pushMessage',
'MessageDeduplicationId' => md5($message),

]));
$this->driver->pushMessage(self::DUMMY_FIFO_QUEUE_NAME, $message);
}

public function testItPopMessages()
Expand Down Expand Up @@ -235,6 +279,18 @@ private function assertSqsQueueUrl()
])));
}

private function assertSqsFifoQueueUrl()
{
$this->sqs
->expects($this->once())
->method('getQueueUrl')
->with($this->equalTo(['QueueName' => self::DUMMY_FIFO_QUEUE_NAME]))
->will($this->returnValue($this->wrapResult([
'QueueUrl' => self::DUMMY_QUEUE_URL_PREFIX
. '/'. self::DUMMY_FIFO_QUEUE_NAME,
])));
}

private function wrapResult($data = [])
{
return class_exists('Aws\Common\Aws')
Expand Down

0 comments on commit 15e2586

Please sign in to comment.