Skip to content

Commit

Permalink
Make PhpAmqpDriver lazily create a channel
Browse files Browse the repository at this point in the history
adrienbrault authored Jul 22, 2016

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent f30c018 commit 9fd32fa
Showing 1 changed file with 19 additions and 9 deletions.
28 changes: 19 additions & 9 deletions src/Driver/PhpAmqpDriver.php
Original file line number Diff line number Diff line change
@@ -38,8 +38,6 @@ public function __construct(AMQPStreamConnection $connection, $exchange, array $
$this->connection = $connection;
$this->exchange = $exchange;
$this->defaultMessageParams = $defaultMessageParams;

$this->channel = $this->connection->channel();
}

/**
@@ -58,9 +56,10 @@ public function listQueues()
*/
public function createQueue($queueName)
{
$this->channel->exchange_declare($this->exchange, 'direct', false, true, false);
$this->channel->queue_declare($queueName, false, true, false, false);
$this->channel->queue_bind($queueName, $this->exchange);
$channel = $this->getChannel();
$channel->exchange_declare($this->exchange, 'direct', false, true, false);
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $this->exchange);
}

/**
@@ -81,7 +80,7 @@ public function countMessages($queueName)
public function pushMessage($queueName, $message)
{
$amqpMessage = new AMQPMessage($message, $this->defaultMessageParams);
$this->channel->basic_publish($amqpMessage, $this->exchange);
$this->getChannel()->basic_publish($amqpMessage, $this->exchange);
}

/**
@@ -95,7 +94,7 @@ public function pushMessage($queueName, $message)
*/
public function popMessage($queueName, $interval = 5)
{
$message = $this->channel->basic_get($queueName);
$message = $this->getChannel()->basic_get($queueName);
if (!$message) {
// sleep for 10 ms to prevent hammering CPU
usleep(10000);
@@ -115,7 +114,7 @@ public function popMessage($queueName, $interval = 5)
*/
public function acknowledgeMessage($queueName, $receipt)
{
$this->channel->basic_ack($receipt);
$this->getChannel()->basic_ack($receipt);
}

/**
@@ -148,6 +147,17 @@ public function info()

public function __destruct()
{
$this->channel->close();
if (null !== $this->channel) {
$this->channel->close();
}
}

private function getChannel()
{
if (null === $this->channel) {
$this->channel = $this->connection->channel();
}

return $this->channel;
}
}

0 comments on commit 9fd32fa

Please sign in to comment.