From a484e9f62c9c4390523e03d414002161d85cc14e Mon Sep 17 00:00:00 2001 From: Pader Date: Fri, 9 Oct 2020 19:55:47 +0800 Subject: [PATCH 1/3] Basic queue ability. --- src/Client.php | 124 +++++++++++++++++++++++++++++++++++------------- src/Queue.php | 50 +++++++++++++++++++ src/Server.php | 46 +++++++++++++++++- test/server.php | 14 ++++-- 4 files changed, 195 insertions(+), 39 deletions(-) create mode 100644 src/Queue.php diff --git a/src/Client.php b/src/Client.php index 215929d..72d2e4a 100644 --- a/src/Client.php +++ b/src/Client.php @@ -63,6 +63,12 @@ class Client */ protected static $_events = array(); + /** + * All queue callback. + * @var callable + */ + protected static $_queues = array(); + /** * @var bool */ @@ -121,16 +127,11 @@ public static function onRemoteMessage($connection, $data) $data = unserialize($data); $event = $data['channel']; $event_data = $data['data']; - if(!empty(self::$_events[$event])) - { + if (!empty(self::$_events[$event])) { call_user_func(self::$_events[$event], $event_data); - } - elseif(!empty(Client::$onMessage)) - { + } elseif (!empty(Client::$onMessage)) { call_user_func(Client::$onMessage, $event, $event_data); - } - else - { + } else { throw new \Exception("event:$event have not callback"); } } @@ -204,15 +205,11 @@ public static function clearTimer() */ public static function on($event, $callback) { - if (!self::$_isWorkermanEnv) { - throw new \Exception('Channel\\Client not support on method when it is not in the workerman environment.'); - } - if(!is_callable($callback)) - { - throw new \Exception('callback is not callable'); + if (!is_callable($callback)) { + throw new \Exception('callback is not callable for event.'); } self::$_events[$event] = $callback; - self::subscribe(array($event)); + self::subscribe($event); } /** @@ -222,19 +219,13 @@ public static function on($event, $callback) */ public static function subscribe($events) { - if (!self::$_isWorkermanEnv) { - throw new \Exception('Channel\\Client not support subscribe method when it is not in the workerman environment.'); - } - self::connect(self::$_remoteIp, self::$_remotePort); $events = (array)$events; - foreach($events as $event) - { - if(!isset(self::$_events[$event])) - { + self::send(array('type' => 'subscribe', 'channels'=>$events)); + foreach ($events as $event) { + if(!isset(self::$_events[$event])) { self::$_events[$event] = null; } } - self::$_remoteConnection->send(serialize(array('type' => 'subscribe', 'channels'=>(array)$events))); } /** @@ -244,16 +235,11 @@ public static function subscribe($events) */ public static function unsubscribe($events) { - if (!self::$_isWorkermanEnv) { - throw new \Exception('Channel\\Client not support unsubscribe method when it is not in the workerman environment.'); - } - self::connect(self::$_remoteIp, self::$_remotePort); $events = (array)$events; - foreach($events as $event) - { + self::send(array('type' => 'unsubscribe', 'channels'=>$events)); + foreach($events as $event) { unset(self::$_events[$event]); } - self::$_remoteConnection->send(serialize(array('type' => 'unsubscribe', 'channels'=>$events))); } /** @@ -262,14 +248,86 @@ public static function unsubscribe($events) * @param mixed $data */ public static function publish($events, $data) + { + self::sendAnyway(array('type' => 'publish', 'channels' => (array)$events, 'data' => $data)); + } + + /** + * Watch a channel of queue + * @param $channel + * @param $callback + * @throws \Exception + */ + public static function watch($channel, $callback) + { + if (!is_callable($callback)) { + throw new \Exception('callback is not callable for watch.'); + } + + self::send(array('type' => 'watch', 'channels'=>$channel)); + + self::$_events[$channel] = static function($data) use ($callback) { + try { + call_user_func($callback, $data); + } catch (\Exception $e) { + throw $e; + } catch (\Error $e) { + throw $e; + } finally { + self::send(array('type' => 'pull')); + } + }; + + self::send(array('type' => 'pull')); + } + + /** + * Unwatch a channel of queue + * @param string $channel + * @throws \Exception + */ + public static function unwatch($channel) + { + self::send(array('type' => 'unwatch', 'channels'=>$channel)); + if (isset(self::$_events[$channel])) { + unset(self::$_events[$channel]); + } + } + + public static function push($channel, $data) + { + self::sendAnyway(array('type' => 'push', 'channels' => $channel, 'data' => $data)); + } + + /** + * Send through workerman environment + * @param $data + * @throws \Exception + */ + protected static function send($data) + { + if (!self::$_isWorkermanEnv) { + throw new \Exception("Channel\\Client not support {$data['type']} method when it is not in the workerman environment."); + } + self::connect(self::$_remoteIp, self::$_remotePort); + self::$_remoteConnection->send(serialize($data)); + } + + /** + * Send from any environment + * @param $data + * @throws \Exception + */ + protected static function sendAnyway($data) { self::connect(self::$_remoteIp, self::$_remotePort); + $body = serialize($data); if (self::$_isWorkermanEnv) { - self::$_remoteConnection->send(serialize(array('type' => 'publish', 'channels' => (array)$events, 'data' => $data))); + self::$_remoteConnection->send($body); } else { - $body = serialize(array('type' => 'publish', 'channels'=>(array)$events, 'data' => $data)); $buffer = pack('N', 4+strlen($body)) . $body; fwrite(self::$_remoteConnection, $buffer); } } + } diff --git a/src/Queue.php b/src/Queue.php new file mode 100644 index 0000000..6df92c0 --- /dev/null +++ b/src/Queue.php @@ -0,0 +1,50 @@ +name = $name; + $this->queue = new \SplQueue(); + } + + /** + * @param TcpConnection $connection + */ + public function addWatch($connection) + { + $this->watcher[$connection->id] = $connection; + } + + /** + * @param TcpConnection $connection + */ + public function removeWatch($connection) + { + if (isset($connection->watchs[$this->name])) { + unset($connection->watchs[$this->name]); + } + if (isset($this->watcher[$connection->id])) { + unset($this->watcher[$connection->id]); + } + if (isset($this->pulls[$connection->id])) { + unset($this->pulls[$connection->id]); + } + } + + public function isEmpty() + { + return empty($this->watcher) && $this->queue->isEmpty(); + } + +} \ No newline at end of file diff --git a/src/Server.php b/src/Server.php index 3ad2ee5..3ed16e1 100644 --- a/src/Server.php +++ b/src/Server.php @@ -12,6 +12,18 @@ class Server */ protected $_worker = null; + /** + * Queues + * + * Fields of array element: + * watcher: TcpConnections[] + * pulls: TcpConnections[] + * queue: SplQueue + * + * @var array + */ + protected $_queues = array(); + /** * Construct. * @param string $ip @@ -50,7 +62,7 @@ public function onClose($connection) /** * onMessage. - * @param TcpConnection $connection + * @param \Workerman\Connection\TcpConnection $connection * @param string $data */ public function onMessage($connection, $data) @@ -96,13 +108,43 @@ public function onMessage($connection, $data) { continue; } - $buffer = serialize(array('channel'=>$channel, 'data' => $data['data']))."\n"; + $buffer = serialize(array('type'=>'event', 'channel'=>$channel, 'data' => $data['data']))."\n"; foreach($worker->channels[$channel] as $connection) { $connection->send($buffer); } } break; + case 'watch': + $queue = $this->getQueue(); + $connection->watchs[$channels] = $channels; + $queue->addWatch($connection); + break; + case 'unwatch': + if (isset($this->_queues[$channels])) { + if (isset($connection->watchs[$channels])) { + unset($connection->watchs[$channels]); + } + if (isset($worker->queues[$channels]['watcher'][$connection->id])) { + + } + } + $this->initQueue($channels); + break; + case 'push': + break; + case 'pull': + $connection->pulling = true; + break; + } + } + + public function getQueue($channel) + { + if (isset($this->queues[$channel])) { + return $this->_queues[$channel]; } + return $this->_queues[$channel] = new Queue($channel); } + } diff --git a/test/server.php b/test/server.php index 451750c..6e652f0 100644 --- a/test/server.php +++ b/test/server.php @@ -1,21 +1,27 @@ onWorkerStart = function() { - Channel\Client::on('test event', function($event_data){ + Client::connect(); + + Client::on('test event', function($event_data){ echo 'test event triggered event_data :'; var_dump($event_data); }); - Timer::add(5, function(){ - Channel\Client::publish('test event', 'some data'); + + Timer::add(2, function(){ + Client::publish('test event', 'some data'); }); }; From 421004210b2d55d36cc7e8778a19e7488ef16cd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=A8=E5=9F=B9?= Date: Sat, 10 Oct 2020 00:41:44 +0800 Subject: [PATCH 2/3] Finish simple queue message. --- README.md | 42 +++++++++++++++++++ src/Client.php | 101 ++++++++++++++++++++++++++++++++------------- src/Queue.php | 51 ++++++++++++++++++++--- src/Server.php | 109 +++++++++++++++++++++++++------------------------ test/queue.php | 53 ++++++++++++++++++++++++ 5 files changed, 268 insertions(+), 88 deletions(-) create mode 100644 test/queue.php diff --git a/README.md b/README.md index 70edbda..83815ec 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,11 @@ # Channel 基于订阅的多进程通讯组件,用于workerman进程间通讯或者服务器集群通讯,类似redis订阅发布机制。基于workerman开发。 +Channel 提供两种通讯形式,分别是发布订阅的事件机制和消息队列机制。 +它们的主要区别是: +事件机制是消息发出后,所有订阅该事件的客户端都能收到消息。 +消息队列机制是消息发出后,所有订阅该消息的客户端只有一个会收到消息,如果客户端忙消息会进行排队直到有客户端闲置后重新取到消息。 + # 手册地址 [Channel手册](http://doc.workerman.net/components/channel.html) @@ -47,3 +52,40 @@ if(!defined('GLOBAL_START')) Worker::runAll(); } ```` + +## 消息队列示例 +```php +use Workerman\Worker; +use Workerman\Timer; + +$worker = new Worker(); +$worker->name = 'Producer'; +$worker->onWorkerStart = function() +{ + Client::connect(); + + $count = 0; + Timer::add(1, function() { + Client::enqueue('queue', 'Hello World '.time()); + }); +}; + +$mq = new Worker(); +$mq->name = 'Consumer'; +$mq->count = 4; +$mq->onWorkerStart = function($worker) { + Client::connect(); + + //订阅消息 queue + Client::watch('queue', function($data) use ($worker) { + echo "Worker {$worker->id} get queue: $data\n"; + }); + + //10 秒后取消订阅该消息 + Timer::add(10, function() { + Client::unwatch('queue'); + }, [], false); +}; + +Worker::runAll(); +``` \ No newline at end of file diff --git a/src/Client.php b/src/Client.php index 72d2e4a..20c4e1a 100644 --- a/src/Client.php +++ b/src/Client.php @@ -118,21 +118,33 @@ public static function connect($ip = '127.0.0.1', $port = 2206) /** * onRemoteMessage. - * @param TcpConnection $connection + * @param \Workerman\Connection\TcpConnection $connection * @param string $data * @throws \Exception */ public static function onRemoteMessage($connection, $data) { $data = unserialize($data); + $type = $data['type']; $event = $data['channel']; $event_data = $data['data']; - if (!empty(self::$_events[$event])) { - call_user_func(self::$_events[$event], $event_data); - } elseif (!empty(Client::$onMessage)) { - call_user_func(Client::$onMessage, $event, $event_data); + + $callback = null; + + if ($type == 'event') { + if (!empty(self::$_events[$event])) { + call_user_func(self::$_events[$event], $event_data); + } elseif (!empty(Client::$onMessage)) { + call_user_func(Client::$onMessage, $event, $event_data); + } else { + throw new \Exception("event:$event have not callback"); + } } else { - throw new \Exception("event:$event have not callback"); + if (isset(self::$_queues[$event])) { + call_user_func(self::$_queues[$event], $event_data); + } else { + throw new \Exception("queue:$event have not callback"); + } } } @@ -254,31 +266,44 @@ public static function publish($events, $data) /** * Watch a channel of queue - * @param $channel - * @param $callback + * @param string|array $channels + * @param callable $callback + * @param boolean $autoReserve Auto reserve after callback finished. + * But sometime you may don't want reserve immediately, or in some asynchronous job, + * you want reserve in finished callback, so you should set $autoReserve to false + * and call Client::reserve() after watch() and in finish callback manually. * @throws \Exception */ - public static function watch($channel, $callback) + public static function watch($channels, $callback, $autoReserve=true) { if (!is_callable($callback)) { throw new \Exception('callback is not callable for watch.'); } - self::send(array('type' => 'watch', 'channels'=>$channel)); - - self::$_events[$channel] = static function($data) use ($callback) { - try { - call_user_func($callback, $data); - } catch (\Exception $e) { - throw $e; - } catch (\Error $e) { - throw $e; - } finally { - self::send(array('type' => 'pull')); - } - }; + if ($autoReserve) { + $callback = static function($data) use ($callback) { + try { + call_user_func($callback, $data); + } catch (\Exception $e) { + throw $e; + } catch (\Error $e) { + throw $e; + } finally { + self::reserve(); + } + }; + } + + $channels = (array)$channels; + self::send(array('type' => 'watch', 'channels'=>$channels)); - self::send(array('type' => 'pull')); + foreach ($channels as $channel) { + self::$_queues[$channel] = $callback; + } + + if ($autoReserve) { + self::reserve(); + } } /** @@ -286,17 +311,35 @@ public static function watch($channel, $callback) * @param string $channel * @throws \Exception */ - public static function unwatch($channel) + public static function unwatch($channels) { - self::send(array('type' => 'unwatch', 'channels'=>$channel)); - if (isset(self::$_events[$channel])) { - unset(self::$_events[$channel]); + $channels = (array)$channels; + self::send(array('type' => 'unwatch', 'channels'=>$channels)); + foreach ($channels as $channel) { + if (isset(self::$_queues[$channel])) { + unset(self::$_queues[$channel]); + } } } - public static function push($channel, $data) + /** + * Put data to queue + * @param string|array $channels + * @param mixed $data + * @throws \Exception + */ + public static function enqueue($channels, $data) + { + self::sendAnyway(array('type' => 'enqueue', 'channels' => (array)$channels, 'data' => $data)); + } + + /** + * Start reserve queue manual + * @throws \Exception + */ + public static function reserve() { - self::sendAnyway(array('type' => 'push', 'channels' => $channel, 'data' => $data)); + self::send(array('type' => 'reserve')); } /** diff --git a/src/Queue.php b/src/Queue.php index 6df92c0..3b1b56e 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -9,7 +9,7 @@ class Queue public $name = 'default'; public $watcher = array(); - public $pulls = array(); + public $consumer = array(); protected $queue = null; public function __construct($name) @@ -23,7 +23,10 @@ public function __construct($name) */ public function addWatch($connection) { - $this->watcher[$connection->id] = $connection; + if (!isset($this->watcher[$connection->id])) { + $this->watcher[$connection->id] = $connection; + $connection->watchs[] = $this->name; + } } /** @@ -31,17 +34,53 @@ public function addWatch($connection) */ public function removeWatch($connection) { - if (isset($connection->watchs[$this->name])) { - unset($connection->watchs[$this->name]); + if (isset($connection->watchs) && in_array($this->name, $connection->watchs)) { + $idx = array_search($this->name, $connection->watchs); + unset($connection->watchs[$idx]); } if (isset($this->watcher[$connection->id])) { unset($this->watcher[$connection->id]); } - if (isset($this->pulls[$connection->id])) { - unset($this->pulls[$connection->id]); + if (isset($this->consumer[$connection->id])) { + unset($this->consumer[$connection->id]); } } + /** + * @param TcpConnection $connection + */ + public function addConsumer($connection) + { + if (isset($this->watcher[$connection->id]) && !isset($this->consumer[$connection->id])) { + $this->consumer[$connection->id] = $connection; + } + $this->dispatch(); + } + + public function enqueue($data) + { + $this->queue->enqueue($data); + $this->dispatch(); + } + + private function dispatch() + { + if ($this->queue->isEmpty() || count($this->consumer) == 0) { + return; + } + + while (!$this->queue->isEmpty()) { + $data = $this->queue->dequeue(); + $idx = key($this->consumer); + $connection = $this->consumer[$idx]; + unset($this->consumer[$idx]); + $connection->send(serialize(array('type'=>'queue', 'channel'=>$this->name, 'data' => $data))); + if (count($this->consumer) == 0) { + break; + } + } + } + public function isEmpty() { return empty($this->watcher) && $this->queue->isEmpty(); diff --git a/src/Server.php b/src/Server.php index 3ed16e1..46c889d 100644 --- a/src/Server.php +++ b/src/Server.php @@ -1,6 +1,8 @@ name = 'ChannelServer'; $worker->channels = array(); $worker->onMessage = array($this, 'onMessage') ; - $worker->onClose = array($this, 'onClose'); + $worker->onClose = array($this, 'onClose'); $this->_worker = $worker; } @@ -46,17 +42,24 @@ public function __construct($ip = '0.0.0.0', $port = 2206) */ public function onClose($connection) { - if(empty($connection->channels)) - { - return; + if (!empty($connection->channels)) { + foreach ($connection->channels as $channel) { + unset($this->_worker->channels[$channel][$connection->id]); + if (empty($this->_worker->channels[$channel])) { + unset($this->_worker->channels[$channel]); + } + } } - foreach($connection->channels as $channel) - { - unset($this->_worker->channels[$channel][$connection->id]); - if(empty($this->_worker->channels[$channel])) - { - unset($this->_worker->channels[$channel]); - } + + if (!empty($connection->watchs)) { + foreach ($connection->watchs as $channel) { + if (isset($this->_queues[$channel])) { + $this->_queues[$channel]->removeWatch($connection); + if ($this->_queues[$channel]->isEmpty()) { + unset($this->_queues[$channel]); + } + } + } } } @@ -74,77 +77,77 @@ public function onMessage($connection, $data) $worker = $this->_worker; $data = unserialize($data); $type = $data['type']; - $channels = $data['channels']; switch($type) { case 'subscribe': - foreach($channels as $channel) + foreach($data['channels'] as $channel) { $connection->channels[$channel] = $channel; $worker->channels[$channel][$connection->id] = $connection; } break; case 'unsubscribe': - foreach($channels as $channel) - { - if(isset($connection->channels[$channel])) - { + foreach($data['channels'] as $channel) { + if (isset($connection->channels[$channel])) { unset($connection->channels[$channel]); } - if(isset($worker->channels[$channel][$connection->id])) - { + if (isset($worker->channels[$channel][$connection->id])) { unset($worker->channels[$channel][$connection->id]); - if(empty($worker->channels[$channel])) - { + if (empty($worker->channels[$channel])) { unset($worker->channels[$channel]); } } } break; case 'publish': - foreach($channels as $channel) - { - if(empty($worker->channels[$channel])) - { + foreach ($data['channels'] as $channel) { + if (empty($worker->channels[$channel])) { continue; } - $buffer = serialize(array('type'=>'event', 'channel'=>$channel, 'data' => $data['data']))."\n"; - foreach($worker->channels[$channel] as $connection) - { + $buffer = serialize(array('type' => 'event', 'channel' => $channel, 'data' => $data['data']))."\n"; + foreach ($worker->channels[$channel] as $connection) { $connection->send($buffer); } } break; case 'watch': - $queue = $this->getQueue(); - $connection->watchs[$channels] = $channels; - $queue->addWatch($connection); + foreach ($data['channels'] as $channel) { + $this->getQueue($channel)->addWatch($connection); + } break; case 'unwatch': - if (isset($this->_queues[$channels])) { - if (isset($connection->watchs[$channels])) { - unset($connection->watchs[$channels]); - } - if (isset($worker->queues[$channels]['watcher'][$connection->id])) { - - } - } - $this->initQueue($channels); + foreach ($data['channels'] as $channel) { + if (isset($this->_queues[$channel])) { + $this->_queues[$channel]->removeWatch($connection); + if ($this->_queues[$channel]->isEmpty()) { + unset($this->_queues[$channel]); + } + } + } break; - case 'push': + case 'enqueue': + foreach ($data['channels'] as $channel) { + $this->getQueue($channel)->enqueue($data['data']); + } break; - case 'pull': - $connection->pulling = true; + case 'reserve': + if (isset($connection->watchs)) { + foreach ($connection->watchs as $channel) { + if (isset($this->_queues[$channel])) { + $this->_queues[$channel]->addConsumer($connection); + } + } + } break; } } - public function getQueue($channel) + private function getQueue($channel) { - if (isset($this->queues[$channel])) { + if (isset($this->_queues[$channel])) { return $this->_queues[$channel]; } - return $this->_queues[$channel] = new Queue($channel); + return ($this->_queues[$channel] = new Queue($channel)); } } diff --git a/test/queue.php b/test/queue.php new file mode 100644 index 0000000..cce6ae3 --- /dev/null +++ b/test/queue.php @@ -0,0 +1,53 @@ +name = 'Event'; +$worker->onWorkerStart = function() +{ + Client::connect(); + + $count = 0; + $timerId = Timer::add(0.01, function() use (&$timerId, &$count) { + Client::publish('test event', 'some data'); + $count++; + Client::enqueue('task-queue', time()); + if ($count == 1000) { + Timer::del($timerId); + } + }); + + Timer::add(10, function() { + Client::enqueue('task-queue', 'hello every 10 seconds'); + }); +}; + +$mq = new Worker(); +$mq->name = 'Queue'; +$mq->count = 4; +$mq->onWorkerStart = function($worker) { + Client::connect(); + $countDown = 20; + $id = 1; + Client::watch('task-queue', function($data) use ($worker, &$countDown, &$id) { + echo "[$id] Worker {$worker->id} get queue: $data\n"; + sleep(0.2); + $countDown--; + $id++; + if ($worker->id > 1 && $countDown == 0) { + Client::unwatch('task-queue'); + } + Timer::add(1, [Client::class, 'reserve'], [], false); + }); +}; + +Worker::runAll(); From 4ca112464c98ce7cb8640b4025e3d837f409983d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=A8=E5=9F=B9?= Date: Sat, 10 Oct 2020 00:47:50 +0800 Subject: [PATCH 3/3] Update readme. --- README.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 83815ec..8b5b472 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,11 @@ 基于订阅的多进程通讯组件,用于workerman进程间通讯或者服务器集群通讯,类似redis订阅发布机制。基于workerman开发。 Channel 提供两种通讯形式,分别是发布订阅的事件机制和消息队列机制。 + 它们的主要区别是: -事件机制是消息发出后,所有订阅该事件的客户端都能收到消息。 -消息队列机制是消息发出后,所有订阅该消息的客户端只有一个会收到消息,如果客户端忙消息会进行排队直到有客户端闲置后重新取到消息。 +- 事件机制是消息发出后,所有订阅该事件的客户端都能收到消息。 +- 消息队列机制是消息发出后,所有订阅该消息的客户端只有一个会收到消息,如果客户端忙消息会进行排队直到有客户端闲置后重新取到消息。 +- 需要注意的是 Channel 只是提供一种通讯方式,本身并不提供消息确认、重试、延迟、持久化等功能,请根据实际情况合理使用。 # 手册地址 [Channel手册](http://doc.workerman.net/components/channel.html)