Skip to content

Commit

Permalink
Merge pull request #7 from xpader/queue
Browse files Browse the repository at this point in the history
Make Channel message queue ability.
  • Loading branch information
walkor authored Oct 19, 2020
2 parents 7f9f6a4 + 4ca1124 commit bf6d837
Show file tree
Hide file tree
Showing 6 changed files with 408 additions and 70 deletions.
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# Channel
基于订阅的多进程通讯组件,用于workerman进程间通讯或者服务器集群通讯,类似redis订阅发布机制。基于workerman开发。

Channel 提供两种通讯形式,分别是发布订阅的事件机制和消息队列机制。

它们的主要区别是:
- 事件机制是消息发出后,所有订阅该事件的客户端都能收到消息。
- 消息队列机制是消息发出后,所有订阅该消息的客户端只有一个会收到消息,如果客户端忙消息会进行排队直到有客户端闲置后重新取到消息。
- 需要注意的是 Channel 只是提供一种通讯方式,本身并不提供消息确认、重试、延迟、持久化等功能,请根据实际情况合理使用。

# 手册地址
[Channel手册](http://doc.workerman.net/components/channel.html)

Expand Down Expand Up @@ -47,3 +54,40 @@ if(!defined('GLOBAL_START'))
Worker::runAll();
}
````

## 消息队列示例
```php
use Workerman\Worker;
use Workerman\Timer;

$worker = new Worker();
$worker->name = 'Producer';
$worker->onWorkerStart = function()
{
Client::connect();

$count = 0;
Timer::add(1, function() {
Client::enqueue('queue', 'Hello World '.time());
});
};

$mq = new Worker();
$mq->name = 'Consumer';
$mq->count = 4;
$mq->onWorkerStart = function($worker) {
Client::connect();

//订阅消息 queue
Client::watch('queue', function($data) use ($worker) {
echo "Worker {$worker->id} get queue: $data\n";
});

//10 秒后取消订阅该消息
Timer::add(10, function() {
Client::unwatch('queue');
}, [], false);
};

Worker::runAll();
```
175 changes: 138 additions & 37 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ class Client
*/
protected static $_events = array();

/**
* All queue callback.
* @var callable
*/
protected static $_queues = array();

/**
* @var bool
*/
Expand Down Expand Up @@ -112,26 +118,33 @@ public static function connect($ip = '127.0.0.1', $port = 2206)

/**
* onRemoteMessage.
* @param TcpConnection $connection
* @param \Workerman\Connection\TcpConnection $connection
* @param string $data
* @throws \Exception
*/
public static function onRemoteMessage($connection, $data)
{
$data = unserialize($data);
$type = $data['type'];
$event = $data['channel'];
$event_data = $data['data'];
if(!empty(self::$_events[$event]))
{
call_user_func(self::$_events[$event], $event_data);
}
elseif(!empty(Client::$onMessage))
{
call_user_func(Client::$onMessage, $event, $event_data);
}
else
{
throw new \Exception("event:$event have not callback");

$callback = null;

if ($type == 'event') {
if (!empty(self::$_events[$event])) {
call_user_func(self::$_events[$event], $event_data);
} elseif (!empty(Client::$onMessage)) {
call_user_func(Client::$onMessage, $event, $event_data);
} else {
throw new \Exception("event:$event have not callback");
}
} else {
if (isset(self::$_queues[$event])) {
call_user_func(self::$_queues[$event], $event_data);
} else {
throw new \Exception("queue:$event have not callback");
}
}
}

Expand Down Expand Up @@ -204,15 +217,11 @@ public static function clearTimer()
*/
public static function on($event, $callback)
{
if (!self::$_isWorkermanEnv) {
throw new \Exception('Channel\\Client not support on method when it is not in the workerman environment.');
}
if(!is_callable($callback))
{
throw new \Exception('callback is not callable');
if (!is_callable($callback)) {
throw new \Exception('callback is not callable for event.');
}
self::$_events[$event] = $callback;
self::subscribe(array($event));
self::subscribe($event);
}

/**
Expand All @@ -222,19 +231,13 @@ public static function on($event, $callback)
*/
public static function subscribe($events)
{
if (!self::$_isWorkermanEnv) {
throw new \Exception('Channel\\Client not support subscribe method when it is not in the workerman environment.');
}
self::connect(self::$_remoteIp, self::$_remotePort);
$events = (array)$events;
foreach($events as $event)
{
if(!isset(self::$_events[$event]))
{
self::send(array('type' => 'subscribe', 'channels'=>$events));
foreach ($events as $event) {
if(!isset(self::$_events[$event])) {
self::$_events[$event] = null;
}
}
self::$_remoteConnection->send(serialize(array('type' => 'subscribe', 'channels'=>(array)$events)));
}

/**
Expand All @@ -244,16 +247,11 @@ public static function subscribe($events)
*/
public static function unsubscribe($events)
{
if (!self::$_isWorkermanEnv) {
throw new \Exception('Channel\\Client not support unsubscribe method when it is not in the workerman environment.');
}
self::connect(self::$_remoteIp, self::$_remotePort);
$events = (array)$events;
foreach($events as $event)
{
self::send(array('type' => 'unsubscribe', 'channels'=>$events));
foreach($events as $event) {
unset(self::$_events[$event]);
}
self::$_remoteConnection->send(serialize(array('type' => 'unsubscribe', 'channels'=>$events)));
}

/**
Expand All @@ -263,13 +261,116 @@ public static function unsubscribe($events)
*/
public static function publish($events, $data)
{
self::sendAnyway(array('type' => 'publish', 'channels' => (array)$events, 'data' => $data));
}

/**
* Watch a channel of queue
* @param string|array $channels
* @param callable $callback
* @param boolean $autoReserve Auto reserve after callback finished.
* But sometime you may don't want reserve immediately, or in some asynchronous job,
* you want reserve in finished callback, so you should set $autoReserve to false
* and call Client::reserve() after watch() and in finish callback manually.
* @throws \Exception
*/
public static function watch($channels, $callback, $autoReserve=true)
{
if (!is_callable($callback)) {
throw new \Exception('callback is not callable for watch.');
}

if ($autoReserve) {
$callback = static function($data) use ($callback) {
try {
call_user_func($callback, $data);
} catch (\Exception $e) {
throw $e;
} catch (\Error $e) {
throw $e;
} finally {
self::reserve();
}
};
}

$channels = (array)$channels;
self::send(array('type' => 'watch', 'channels'=>$channels));

foreach ($channels as $channel) {
self::$_queues[$channel] = $callback;
}

if ($autoReserve) {
self::reserve();
}
}

/**
* Unwatch a channel of queue
* @param string $channel
* @throws \Exception
*/
public static function unwatch($channels)
{
$channels = (array)$channels;
self::send(array('type' => 'unwatch', 'channels'=>$channels));
foreach ($channels as $channel) {
if (isset(self::$_queues[$channel])) {
unset(self::$_queues[$channel]);
}
}
}

/**
* Put data to queue
* @param string|array $channels
* @param mixed $data
* @throws \Exception
*/
public static function enqueue($channels, $data)
{
self::sendAnyway(array('type' => 'enqueue', 'channels' => (array)$channels, 'data' => $data));
}

/**
* Start reserve queue manual
* @throws \Exception
*/
public static function reserve()
{
self::send(array('type' => 'reserve'));
}

/**
* Send through workerman environment
* @param $data
* @throws \Exception
*/
protected static function send($data)
{
if (!self::$_isWorkermanEnv) {
throw new \Exception("Channel\\Client not support {$data['type']} method when it is not in the workerman environment.");
}
self::connect(self::$_remoteIp, self::$_remotePort);
self::$_remoteConnection->send(serialize($data));
}

/**
* Send from any environment
* @param $data
* @throws \Exception
*/
protected static function sendAnyway($data)
{
self::connect(self::$_remoteIp, self::$_remotePort);
$body = serialize($data);
if (self::$_isWorkermanEnv) {
self::$_remoteConnection->send(serialize(array('type' => 'publish', 'channels' => (array)$events, 'data' => $data)));
self::$_remoteConnection->send($body);
} else {
$body = serialize(array('type' => 'publish', 'channels'=>(array)$events, 'data' => $data));
$buffer = pack('N', 4+strlen($body)) . $body;
fwrite(self::$_remoteConnection, $buffer);
}
}

}
89 changes: 89 additions & 0 deletions src/Queue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?php

namespace Channel;

use Workerman\Connection\TcpConnection;

class Queue
{

public $name = 'default';
public $watcher = array();
public $consumer = array();
protected $queue = null;

public function __construct($name)
{
$this->name = $name;
$this->queue = new \SplQueue();
}

/**
* @param TcpConnection $connection
*/
public function addWatch($connection)
{
if (!isset($this->watcher[$connection->id])) {
$this->watcher[$connection->id] = $connection;
$connection->watchs[] = $this->name;
}
}

/**
* @param TcpConnection $connection
*/
public function removeWatch($connection)
{
if (isset($connection->watchs) && in_array($this->name, $connection->watchs)) {
$idx = array_search($this->name, $connection->watchs);
unset($connection->watchs[$idx]);
}
if (isset($this->watcher[$connection->id])) {
unset($this->watcher[$connection->id]);
}
if (isset($this->consumer[$connection->id])) {
unset($this->consumer[$connection->id]);
}
}

/**
* @param TcpConnection $connection
*/
public function addConsumer($connection)
{
if (isset($this->watcher[$connection->id]) && !isset($this->consumer[$connection->id])) {
$this->consumer[$connection->id] = $connection;
}
$this->dispatch();
}

public function enqueue($data)
{
$this->queue->enqueue($data);
$this->dispatch();
}

private function dispatch()
{
if ($this->queue->isEmpty() || count($this->consumer) == 0) {
return;
}

while (!$this->queue->isEmpty()) {
$data = $this->queue->dequeue();
$idx = key($this->consumer);
$connection = $this->consumer[$idx];
unset($this->consumer[$idx]);
$connection->send(serialize(array('type'=>'queue', 'channel'=>$this->name, 'data' => $data)));
if (count($this->consumer) == 0) {
break;
}
}
}

public function isEmpty()
{
return empty($this->watcher) && $this->queue->isEmpty();
}

}
Loading

0 comments on commit bf6d837

Please sign in to comment.