From 1bc0c8d7d87fb7a996b383916bc2a0cf5e95ea05 Mon Sep 17 00:00:00 2001 From: MohammadReza Honarkhah Date: Sat, 26 Nov 2016 17:58:25 +0330 Subject: [PATCH] change structure for my project --- README.md | 2 +- composer.json | 8 +++- config/amqp.php | 69 ++++++++++++++++---------------- src/Amqp.php | 45 +++++++++++++++------ src/AmqpServiceProvider.php | 11 +++--- src/Context.php | 8 ++-- src/Request.php | 78 +++++++++++++++++++------------------ 7 files changed, 124 insertions(+), 97 deletions(-) diff --git a/README.md b/README.md index 373ee6a..106d2bf 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ $ php composer update or ``` -$ php composer require bschmitt/laravel-amqp +$ php composer require bschmitt/laravel-amqp dev-master ``` ## Integration diff --git a/composer.json b/composer.json index 5a8dc0c..de387d9 100644 --- a/composer.json +++ b/composer.json @@ -1,5 +1,5 @@ { - "name": "bschmitt/laravel-amqp", + "name": "mammutgroup/laravel-amqp", "description": "AMQP wrapper for Laravel and Lumen to publish and consume messages", "keywords": [ "laravel", @@ -16,12 +16,16 @@ "license": "MIT", "support": { "issues": "https://github.com/bschmitt/laravel-amqp/issues", - "source": "https://github.com/bschmitt/laravel-amqp" + "source": "https://github.com/mammutgroup/laravel-amqp" }, "authors": [ { "name": "Björn Schmitt", "email": "code@bjoern.io" + }, + { + "name": "MohammadReza Honarkhah", + "email": "m.honar@gmail.com" } ], "require": { diff --git a/config/amqp.php b/config/amqp.php index 3933f89..799c172 100644 --- a/config/amqp.php +++ b/config/amqp.php @@ -15,44 +15,41 @@ | AMQP properties separated by key |-------------------------------------------------------------------------- */ - 'properties' => [ - 'production' => [ - 'host' => 'localhost', - 'port' => 5672, - 'username' => '', - 'password' => '', - 'vhost' => '/', - 'connect_options' => [], - 'ssl_options' => [], - - 'exchange' => 'amq.topic', - 'exchange_type' => 'topic', - 'exchange_passive' => false, - 'exchange_durable' => true, - 'exchange_auto_delete' => false, - 'exchange_internal' => false, - 'exchange_nowait' => false, - 'exchange_properties' => [], - - 'queue_force_declare' => false, - 'queue_passive' => false, - 'queue_durable' => true, - 'queue_exclusive' => false, - 'queue_auto_delete' => false, - 'queue_nowait' => false, - 'queue_properties' => ['x-ha-policy' => ['S', 'all']], - - 'consumer_tag' => '', - 'consumer_no_local' => false, - 'consumer_no_ack' => false, - 'consumer_exclusive' => false, - 'consumer_nowait' => false, - 'timeout' => 0, - 'persistent' => false, + 'host' => env('RABBITMQ_HOST', 'localhost'), + 'port' => env('RABBITMQ_PORT', 5672), + 'username' => env('RABBITMQ_LOGIN', 'guest'), + 'password' => env('RABBITMQ_PASSWORD', 'guest'), + 'vhost' => env('RABBITMQ_VHOST', '/'), + 'connect_options' => [], + 'ssl_options' => [], + + 'exchange' => 'amq.'.env('RABBITMQ_EXCHANGE_TYPE', 'direct'), + 'exchange_type' => env('RABBITMQ_EXCHANGE_TYPE', 'direct'), + 'exchange_passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false), + 'exchange_durable' => env('RABBITMQ_EXCHANGE_DURABLE', true), + 'exchange_auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false), + 'exchange_internal' => false, + 'exchange_nowait' => false, + 'exchange_properties' => [], + + + 'queue_force_declare' => env('RABBITMQ_QUEUE_DECLARE_BIND', false), + 'queue_passive' => env('RABBITMQ_QUEUE_PASSIVE', false), + 'queue_durable' => env('RABBITMQ_QUEUE_DURABLE', true), + 'queue_exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false), + 'queue_auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false), + 'queue_nowait' => false, + 'queue_properties' => ['x-ha-policy' => ['S', 'all']], + + 'consumer_tag' => '', + 'consumer_no_local' => false, + 'consumer_no_ack' => false, + 'consumer_exclusive' => false, + 'consumer_nowait' => false, + 'timeout' => 0, + 'persistent' => false, ], - ], - ]; diff --git a/src/Amqp.php b/src/Amqp.php index f6a9257..3470787 100644 --- a/src/Amqp.php +++ b/src/Amqp.php @@ -1,9 +1,6 @@ @@ -13,21 +10,24 @@ class Amqp /** * @param string $routing - * @param mixed $message - * @param array $properties + * @param mixed $message + * @param array $properties */ public function publish($routing, $message, array $properties = []) { $properties['routing'] = $routing; /* @var Publisher $publisher */ - $publisher = App::make('Bschmitt\Amqp\Publisher'); + $publisher = \App::make('Bschmitt\Amqp\Publisher'); $publisher ->mergeProperties($properties) ->setup(); + $messageProperties = ['content_type' => 'text/plain', 'delivery_mode' => 2]; + $messageProperties = array_merge($messageProperties, $publisher->getProperty('message_properties')); + if (is_string($message)) { - $message = new Message($message, ['content_type' => 'text/plain', 'delivery_mode' => 2]); + $message = new Message($message, $messageProperties); } $publisher->publish($routing, $message); @@ -35,9 +35,9 @@ public function publish($routing, $message, array $properties = []) } /** - * @param string $queue + * @param string $queue * @param Closure $callback - * @param array $properties + * @param array $properties * @throws Exception\Configuration */ public function consume($queue, Closure $callback, $properties = []) @@ -45,7 +45,7 @@ public function consume($queue, Closure $callback, $properties = []) $properties['queue'] = $queue; /* @var Consumer $consumer */ - $consumer = App::make('Bschmitt\Amqp\Consumer'); + $consumer = \App::make('Bschmitt\Amqp\Consumer'); $consumer ->mergeProperties($properties) ->setup(); @@ -54,9 +54,32 @@ public function consume($queue, Closure $callback, $properties = []) Request::shutdown($consumer->getChannel(), $consumer->getConnection()); } + public function declareExchange($exchange, $exchangeType) + { + + $properties['exchange'] = $exchange; + $properties['exchange_type'] = $exchangeType; + + $request = \App::make('Bschmitt\Amqp\Request'); + $request->mergeProperties($properties); + $request->exchangeDeclare($exchange, $exchangeType); + return $exchange; + } + + public function declareQueue($queue, $exchange, $routingKey) + { + $properties['queue'] = $queue; + $properties['routing_key'] = $routingKey; + + $request = \App::make('Bschmitt\Amqp\Request'); + $request->mergeProperties($properties); + + $request->queueDeclare($queue, $exchange, $routingKey); + } + /** * @param string $body - * @param array $properties + * @param array $properties * @return \Bschmitt\Amqp\Message */ public function message($body, $properties = []) diff --git a/src/AmqpServiceProvider.php b/src/AmqpServiceProvider.php index 8c86535..08925db 100644 --- a/src/AmqpServiceProvider.php +++ b/src/AmqpServiceProvider.php @@ -6,7 +6,6 @@ class AmqpServiceProvider extends ServiceProvider { - /** * Indicates if loading of the provider is deferred. * @@ -25,8 +24,10 @@ public function boot() if (!class_exists('Amqp')) { class_alias('Bschmitt\Amqp\Facades\Amqp', 'Amqp'); } + $this->publishes([ + __DIR__.'/../config/amqp.php' => config_path('amqp.php'), + ]); } - /** * Register the application services. * @@ -34,13 +35,14 @@ class_alias('Bschmitt\Amqp\Facades\Amqp', 'Amqp'); */ public function register() { + $this->app->singleton('Bschmitt\Amqp\Publisher', function ($app) { return new Publisher(config()); }); $this->app->singleton('Bschmitt\Amqp\Consumer', function ($app) { return new Consumer(config()); }); - + } /** @@ -50,7 +52,6 @@ public function register() */ public function provides() { - return ['Amqp', 'Bschmitt\Amqp\Publisher', 'Bschmitt\Amqp\Consumer']; + return ['Amqp','Bschmitt\Amqp\Publisher' , 'Bschmitt\Amqp\Consumer']; } - } diff --git a/src/Context.php b/src/Context.php index ab3d523..4638dee 100644 --- a/src/Context.php +++ b/src/Context.php @@ -1,7 +1,7 @@ @@ -32,7 +32,7 @@ public function __construct(Repository $config) protected function extractProperties(Repository $config) { if ($config->has(self::REPOSITORY_KEY)) { - $data = $config->get(self::REPOSITORY_KEY); + $data = $config->get(self::REPOSITORY_KEY); $this->properties = $data['properties'][$data['use']]; } } @@ -59,9 +59,9 @@ public function getProperties() * @param string $key * @return mixed */ - public function getProperty($key) + public function getProperty($key,$default = null) { - return array_key_exists($key, $this->properties) ? $this->properties[$key] : NULL; + return Arr::get($this->properties, $key, $default); } /** diff --git a/src/Request.php b/src/Request.php index 5faf523..5c74b8e 100644 --- a/src/Request.php +++ b/src/Request.php @@ -52,7 +52,7 @@ public function setup() $this->connect(); $exchange = $this->getProperty('exchange'); - + if (empty($exchange)) { throw new Exception\Configuration('Please check your settings, exchange is not defined.'); } @@ -65,46 +65,15 @@ public function setup() auto_delete: false //the exchange won't be deleted once the channel is closed. */ - $this->channel->exchange_declare( - $exchange, - $this->getProperty('exchange_type'), - $this->getProperty('exchange_passive'), - $this->getProperty('exchange_durable'), - $this->getProperty('exchange_auto_delete'), - $this->getProperty('exchange_internal'), - $this->getProperty('exchange_nowait'), - $this->getProperty('exchange_properties') - ); - + if ($this->getProperty('exchange_declare')) { + $this->exchangeDeclare($exchange); + } + $routing = $this->getProperty('routing'); $queue = $this->getProperty('queue'); - if (!empty($queue) || $this->getProperty('queue_force_declare')) { - - /* - name: $queue - passive: false - durable: true // the queue will survive server restarts - exclusive: false // queue is deleted when connection closes - auto_delete: false //the queue won't be deleted once the channel is closed. - nowait: false // Doesn't wait on replies for certain things. - parameters: array // Extra data, like high availability params - */ - - /** @var ['queue name', 'message count',] queueInfo */ - $this->queueInfo = $this->channel->queue_declare( - $queue, - $this->getProperty('queue_passive'), - $this->getProperty('queue_durable'), - $this->getProperty('queue_exclusive'), - $this->getProperty('queue_auto_delete'), - $this->getProperty('queue_nowait'), - $this->getProperty('queue_properties') - ); - - $this->channel->queue_bind($queue ?: $this->queueInfo[0], $exchange, $this->getProperty('routing')); - + if (!empty($queue) && $this->getProperty('queue_force_declare')) { + $this->queueDeclare($queue, $exchange, $routing); } - // clear at shutdown register_shutdown_function([get_class(), 'shutdown'], $this->channel, $this->connection); } @@ -136,6 +105,39 @@ public function getQueueMessageCount() return 0; } + public function queueDeclare($queue, $exchange, $routing){ + if(empty($this->channel)){ + $this->connect(); + } + + $this->channel->queue_declare( + $queue, + $this->getProperty('queue_passive'), + $this->getProperty('queue_durable'), + $this->getProperty('queue_exclusive'), + $this->getProperty('queue_auto_delete'), + $this->getProperty('queue_nowait'), + $this->getProperty('queue_properties') + ); + $this->channel->queue_bind($queue, $exchange, $routing); + + } + public function exchangeDeclare($exchange) + { + if(empty($this->channel)){ + $this->connect(); + } + $this->channel->exchange_declare( + $exchange, + $this->getProperty('exchange_type'), + $this->getProperty('exchange_passive'), + $this->getProperty('exchange_durable'), + $this->getProperty('exchange_auto_delete'), + $this->getProperty('exchange_internal'), + $this->getProperty('exchange_nowait'), + $this->getProperty('exchange_properties') + ); + } /** * @param AMQPChannel $channel * @param AMQPStreamConnection $connection