Skip to content

Commit

Permalink
Merge pull request #249 from adrienbrault/patch-1
Browse files Browse the repository at this point in the history
Make PhpAmqpDriver lazily create a channel
  • Loading branch information
henrikbjorn authored Sep 9, 2016
2 parents 3b78158 + 9fd32fa commit ec54820
Showing 1 changed file with 19 additions and 9 deletions.
28 changes: 19 additions & 9 deletions src/Driver/PhpAmqpDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ public function __construct(AMQPStreamConnection $connection, $exchange, array $
$this->connection = $connection;
$this->exchange = $exchange;
$this->defaultMessageParams = $defaultMessageParams;

$this->channel = $this->connection->channel();
}

/**
Expand All @@ -58,9 +56,10 @@ public function listQueues()
*/
public function createQueue($queueName)
{
$this->channel->exchange_declare($this->exchange, 'direct', false, true, false);
$this->channel->queue_declare($queueName, false, true, false, false);
$this->channel->queue_bind($queueName, $this->exchange);
$channel = $this->getChannel();
$channel->exchange_declare($this->exchange, 'direct', false, true, false);
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $this->exchange);
}

/**
Expand All @@ -81,7 +80,7 @@ public function countMessages($queueName)
public function pushMessage($queueName, $message)
{
$amqpMessage = new AMQPMessage($message, $this->defaultMessageParams);
$this->channel->basic_publish($amqpMessage, $this->exchange);
$this->getChannel()->basic_publish($amqpMessage, $this->exchange);
}

/**
Expand All @@ -95,7 +94,7 @@ public function pushMessage($queueName, $message)
*/
public function popMessage($queueName, $interval = 5)
{
$message = $this->channel->basic_get($queueName);
$message = $this->getChannel()->basic_get($queueName);
if (!$message) {
// sleep for 10 ms to prevent hammering CPU
usleep(10000);
Expand All @@ -115,7 +114,7 @@ public function popMessage($queueName, $interval = 5)
*/
public function acknowledgeMessage($queueName, $receipt)
{
$this->channel->basic_ack($receipt);
$this->getChannel()->basic_ack($receipt);
}

/**
Expand Down Expand Up @@ -148,6 +147,17 @@ public function info()

public function __destruct()
{
$this->channel->close();
if (null !== $this->channel) {
$this->channel->close();
}
}

private function getChannel()
{
if (null === $this->channel) {
$this->channel = $this->connection->channel();
}

return $this->channel;
}
}

0 comments on commit ec54820

Please sign in to comment.