Skip to content

Commit

Permalink
change structure for my project
Browse files Browse the repository at this point in the history
  • Loading branch information
honarkhah committed Nov 26, 2016
1 parent 11083c8 commit 1bc0c8d
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 97 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ $ php composer update
or

```
$ php composer require bschmitt/laravel-amqp
$ php composer require bschmitt/laravel-amqp dev-master
```

## Integration
Expand Down
8 changes: 6 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bschmitt/laravel-amqp",
"name": "mammutgroup/laravel-amqp",
"description": "AMQP wrapper for Laravel and Lumen to publish and consume messages",
"keywords": [
"laravel",
Expand All @@ -16,12 +16,16 @@
"license": "MIT",
"support": {
"issues": "https://github.com/bschmitt/laravel-amqp/issues",
"source": "https://github.com/bschmitt/laravel-amqp"
"source": "https://github.com/mammutgroup/laravel-amqp"
},
"authors": [
{
"name": "Björn Schmitt",
"email": "[email protected]"
},
{
"name": "MohammadReza Honarkhah",
"email": "[email protected]"
}
],
"require": {
Expand Down
69 changes: 33 additions & 36 deletions config/amqp.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,41 @@
| AMQP properties separated by key
|--------------------------------------------------------------------------
*/

'properties' => [

'production' => [
'host' => 'localhost',
'port' => 5672,
'username' => '',
'password' => '',
'vhost' => '/',
'connect_options' => [],
'ssl_options' => [],

'exchange' => 'amq.topic',
'exchange_type' => 'topic',
'exchange_passive' => false,
'exchange_durable' => true,
'exchange_auto_delete' => false,
'exchange_internal' => false,
'exchange_nowait' => false,
'exchange_properties' => [],

'queue_force_declare' => false,
'queue_passive' => false,
'queue_durable' => true,
'queue_exclusive' => false,
'queue_auto_delete' => false,
'queue_nowait' => false,
'queue_properties' => ['x-ha-policy' => ['S', 'all']],

'consumer_tag' => '',
'consumer_no_local' => false,
'consumer_no_ack' => false,
'consumer_exclusive' => false,
'consumer_nowait' => false,
'timeout' => 0,
'persistent' => false,
'host' => env('RABBITMQ_HOST', 'localhost'),
'port' => env('RABBITMQ_PORT', 5672),
'username' => env('RABBITMQ_LOGIN', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'vhost' => env('RABBITMQ_VHOST', '/'),
'connect_options' => [],
'ssl_options' => [],

'exchange' => 'amq.'.env('RABBITMQ_EXCHANGE_TYPE', 'direct'),
'exchange_type' => env('RABBITMQ_EXCHANGE_TYPE', 'direct'),
'exchange_passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
'exchange_durable' => env('RABBITMQ_EXCHANGE_DURABLE', true),
'exchange_auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
'exchange_internal' => false,
'exchange_nowait' => false,
'exchange_properties' => [],


'queue_force_declare' => env('RABBITMQ_QUEUE_DECLARE_BIND', false),
'queue_passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
'queue_durable' => env('RABBITMQ_QUEUE_DURABLE', true),
'queue_exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
'queue_auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
'queue_nowait' => false,
'queue_properties' => ['x-ha-policy' => ['S', 'all']],

'consumer_tag' => '',
'consumer_no_local' => false,
'consumer_no_ack' => false,
'consumer_exclusive' => false,
'consumer_nowait' => false,
'timeout' => 0,
'persistent' => false,
],

],

];
45 changes: 34 additions & 11 deletions src/Amqp.php
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
<?php namespace Bschmitt\Amqp;

use App;
use Closure;
use Bschmitt\Amqp\Request;
use Bschmitt\Amqp\Message;

/**
* @author Björn Schmitt <[email protected]>
Expand All @@ -13,39 +10,42 @@ class Amqp

/**
* @param string $routing
* @param mixed $message
* @param array $properties
* @param mixed $message
* @param array $properties
*/
public function publish($routing, $message, array $properties = [])
{
$properties['routing'] = $routing;

/* @var Publisher $publisher */
$publisher = App::make('Bschmitt\Amqp\Publisher');
$publisher = \App::make('Bschmitt\Amqp\Publisher');
$publisher
->mergeProperties($properties)
->setup();

$messageProperties = ['content_type' => 'text/plain', 'delivery_mode' => 2];
$messageProperties = array_merge($messageProperties, $publisher->getProperty('message_properties'));

if (is_string($message)) {
$message = new Message($message, ['content_type' => 'text/plain', 'delivery_mode' => 2]);
$message = new Message($message, $messageProperties);
}

$publisher->publish($routing, $message);
Request::shutdown($publisher->getChannel(), $publisher->getConnection());
}

/**
* @param string $queue
* @param string $queue
* @param Closure $callback
* @param array $properties
* @param array $properties
* @throws Exception\Configuration
*/
public function consume($queue, Closure $callback, $properties = [])
{
$properties['queue'] = $queue;

/* @var Consumer $consumer */
$consumer = App::make('Bschmitt\Amqp\Consumer');
$consumer = \App::make('Bschmitt\Amqp\Consumer');
$consumer
->mergeProperties($properties)
->setup();
Expand All @@ -54,9 +54,32 @@ public function consume($queue, Closure $callback, $properties = [])
Request::shutdown($consumer->getChannel(), $consumer->getConnection());
}

public function declareExchange($exchange, $exchangeType)
{

$properties['exchange'] = $exchange;
$properties['exchange_type'] = $exchangeType;

$request = \App::make('Bschmitt\Amqp\Request');
$request->mergeProperties($properties);
$request->exchangeDeclare($exchange, $exchangeType);
return $exchange;
}

public function declareQueue($queue, $exchange, $routingKey)
{
$properties['queue'] = $queue;
$properties['routing_key'] = $routingKey;

$request = \App::make('Bschmitt\Amqp\Request');
$request->mergeProperties($properties);

$request->queueDeclare($queue, $exchange, $routingKey);
}

/**
* @param string $body
* @param array $properties
* @param array $properties
* @return \Bschmitt\Amqp\Message
*/
public function message($body, $properties = [])
Expand Down
11 changes: 6 additions & 5 deletions src/AmqpServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

class AmqpServiceProvider extends ServiceProvider
{

/**
* Indicates if loading of the provider is deferred.
*
Expand All @@ -25,22 +24,25 @@ public function boot()
if (!class_exists('Amqp')) {
class_alias('Bschmitt\Amqp\Facades\Amqp', 'Amqp');
}
$this->publishes([
__DIR__.'/../config/amqp.php' => config_path('amqp.php'),
]);
}

/**
* Register the application services.
*
* @return void
*/
public function register()
{

$this->app->singleton('Bschmitt\Amqp\Publisher', function ($app) {
return new Publisher(config());
});
$this->app->singleton('Bschmitt\Amqp\Consumer', function ($app) {
return new Consumer(config());
});

}

/**
Expand All @@ -50,7 +52,6 @@ public function register()
*/
public function provides()
{
return ['Amqp', 'Bschmitt\Amqp\Publisher', 'Bschmitt\Amqp\Consumer'];
return ['Amqp','Bschmitt\Amqp\Publisher' , 'Bschmitt\Amqp\Consumer'];
}

}
8 changes: 4 additions & 4 deletions src/Context.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php namespace Bschmitt\Amqp;

use Illuminate\Config\Repository;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Illuminate\Support\Arr;

/**
* @author Björn Schmitt <[email protected]>
Expand Down Expand Up @@ -32,7 +32,7 @@ public function __construct(Repository $config)
protected function extractProperties(Repository $config)
{
if ($config->has(self::REPOSITORY_KEY)) {
$data = $config->get(self::REPOSITORY_KEY);
$data = $config->get(self::REPOSITORY_KEY);
$this->properties = $data['properties'][$data['use']];
}
}
Expand All @@ -59,9 +59,9 @@ public function getProperties()
* @param string $key
* @return mixed
*/
public function getProperty($key)
public function getProperty($key,$default = null)
{
return array_key_exists($key, $this->properties) ? $this->properties[$key] : NULL;
return Arr::get($this->properties, $key, $default);
}

/**
Expand Down
78 changes: 40 additions & 38 deletions src/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public function setup()
$this->connect();

$exchange = $this->getProperty('exchange');

if (empty($exchange)) {
throw new Exception\Configuration('Please check your settings, exchange is not defined.');
}
Expand All @@ -65,46 +65,15 @@ public function setup()
auto_delete: false //the exchange won't be deleted once the channel is closed.
*/

$this->channel->exchange_declare(
$exchange,
$this->getProperty('exchange_type'),
$this->getProperty('exchange_passive'),
$this->getProperty('exchange_durable'),
$this->getProperty('exchange_auto_delete'),
$this->getProperty('exchange_internal'),
$this->getProperty('exchange_nowait'),
$this->getProperty('exchange_properties')
);

if ($this->getProperty('exchange_declare')) {
$this->exchangeDeclare($exchange);
}
$routing = $this->getProperty('routing');
$queue = $this->getProperty('queue');

if (!empty($queue) || $this->getProperty('queue_force_declare')) {

/*
name: $queue
passive: false
durable: true // the queue will survive server restarts
exclusive: false // queue is deleted when connection closes
auto_delete: false //the queue won't be deleted once the channel is closed.
nowait: false // Doesn't wait on replies for certain things.
parameters: array // Extra data, like high availability params
*/

/** @var ['queue name', 'message count',] queueInfo */
$this->queueInfo = $this->channel->queue_declare(
$queue,
$this->getProperty('queue_passive'),
$this->getProperty('queue_durable'),
$this->getProperty('queue_exclusive'),
$this->getProperty('queue_auto_delete'),
$this->getProperty('queue_nowait'),
$this->getProperty('queue_properties')
);

$this->channel->queue_bind($queue ?: $this->queueInfo[0], $exchange, $this->getProperty('routing'));

if (!empty($queue) && $this->getProperty('queue_force_declare')) {
$this->queueDeclare($queue, $exchange, $routing);
}

// clear at shutdown
register_shutdown_function([get_class(), 'shutdown'], $this->channel, $this->connection);
}
Expand Down Expand Up @@ -136,6 +105,39 @@ public function getQueueMessageCount()
return 0;
}

public function queueDeclare($queue, $exchange, $routing){
if(empty($this->channel)){
$this->connect();
}

$this->channel->queue_declare(
$queue,
$this->getProperty('queue_passive'),
$this->getProperty('queue_durable'),
$this->getProperty('queue_exclusive'),
$this->getProperty('queue_auto_delete'),
$this->getProperty('queue_nowait'),
$this->getProperty('queue_properties')
);
$this->channel->queue_bind($queue, $exchange, $routing);

}
public function exchangeDeclare($exchange)
{
if(empty($this->channel)){
$this->connect();
}
$this->channel->exchange_declare(
$exchange,
$this->getProperty('exchange_type'),
$this->getProperty('exchange_passive'),
$this->getProperty('exchange_durable'),
$this->getProperty('exchange_auto_delete'),
$this->getProperty('exchange_internal'),
$this->getProperty('exchange_nowait'),
$this->getProperty('exchange_properties')
);
}
/**
* @param AMQPChannel $channel
* @param AMQPStreamConnection $connection
Expand Down

0 comments on commit 1bc0c8d

Please sign in to comment.