Skip to content

Commit

Permalink
Merge pull request #28 from workbunny/feature/issue-27
Browse files Browse the repository at this point in the history
Feature/issue 27
  • Loading branch information
chaz6chez authored Nov 9, 2024
2 parents 2caf9a8 + 7f6ad18 commit bce4f36
Show file tree
Hide file tree
Showing 40 changed files with 913 additions and 111 deletions.
21 changes: 11 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,17 @@ composer require workbunny/webman-coroutine

## 文档

| 目录 | 地址 |
|:---:|:----------------------------------------------------------------------------------------------:|
| API | [Fucntion-APIs](https://workbunny.github.io/webman-coroutine/) |
| 教程 | [PHP 协程入门](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/coroutine.md) |
| - | [安装及配置](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/install.md) |
| - | [助手函数](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/helpers.md) |
| - | [`workerman`环境](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/workerman.md) |
| - | [`webman`框架](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/webman.md) |
| - | [`Utils`说明](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/utils.md) |
| - | [自定义拓展](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/custom.md) |
| 目录 | 地址 |
|:---:|:------------------------------------------------------------------------------------------------------:|
| API | [Fucntion-APIs](https://workbunny.github.io/webman-coroutine/) |
| 教程 | [PHP 协程入门](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/coroutine.md) |
| - | [安装及配置](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/install.md) |
| - | [助手函数](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/helpers.md) |
| - | [`workerman`环境](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/workerman.md) |
| - | [`webman`框架](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/webman.md) |
| - | [`Utils`说明](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/utils.md) |
| - | [自定义拓展](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/custom.md) |
| - | [协程的观测和管理](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/suspension-manage.md) |

## 参与开发

Expand Down
80 changes: 80 additions & 0 deletions docs/doc/suspension-manage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# 协程的观测和管理

`webman-coroutine`中提供的关于协程的部分来自`Utils/Coroutine``Factory::sleep()`

## `Utils/Coroutine` 协程工具

> `Utils/Coroutine`会在每次构造的时候将注入一个WeakMap,并储存`id``startTime`
- 获取当前进程所有协程

```php
$weakMap = \Workbunny\WebmanCoroutine\Utils\Coroutine\Coroutine::getCoroutinesWeakMap();
```
> Tips: 方法返回一个储存所有通过`Utils/Coroutine`创建的协程的`WeakMap`
- 退出当前进程所有协程

```php
$weakMap = \Workbunny\WebmanCoroutine\Utils\Coroutine\Coroutine::getCoroutinesWeakMap();
/**
* @var \Workbunny\WebmanCoroutine\Utils\Coroutine\Handlers\CoroutineInterface $coroutine
* @var array<string, mixed> $info ['id' => 协程id, 'startTime' => 开始时间]
*/
foreach ($weakMap as $coroutine => $info) {
$coroutine->kill(new \Workbunny\WebmanCoroutine\Exceptions\KilledException());
}
```
> Tips:
> - `kill`方法并不会立即退出协程,而是在该协程下一次唤起的时候触发,抛出一个异常
> - 各协程驱动各略有不同,如:swoole驱动在协程遇到文件IO事件时并不会立即退出,也不会在下次唤起时抛出异常
## `Handler::$suspension`挂起事件

> 所有基于`Factory::sleep()`创建的挂起事件既是`Handler::$suspension`,包括`sleep()``wait_for()``Factory::waitFor()`
- 获取当前进程所有挂起事件

```php
$weakMap = \Workbunny\WebmanCoroutine\Factory::getSuspensionsWeakMap();
```
> Tips: 方法返回一个储存所有通过`Factory`创建的协程的`WeakMap`
- 添加/设置一个挂起事件至`WeakMap`
```php
$weakMap = \Workbunny\WebmanCoroutine\Factory::setSuspensionsWeakMap();
```

- 退出当前进程所有挂起事件

```php
$weakMap = \Workbunny\WebmanCoroutine\Factory::getSuspensionsWeakMap();
/**
* @var mixed $suspension
* @var array<string, mixed> $info ['id' => 协程id, 'startTime' => 开始时间, 'event' => 挂起事件|NULL]
*/
foreach ($weakMap as $suspension => $info) {
\Workbunny\WebmanCoroutine\Factory::kill($suspension);
}
```

> Tips:
> - `kill`方法并不会立即退出协程,而是在该挂起下一次被唤起的时候触发,抛出一个`KilledException`
> - 各协程驱动各略有不同,如:`swoole`驱动在协程遇到文件IO事件时并不会立即退出,挂起事件会抛出`KilledException`
## 观测/管理实践

### 采样监听方案

1. 在服务进程启动时创建定时器
2. 定时器实现`Coroutine::getCoroutinesWeakMap()``Factory::getSuspensionsWeakMap()`的采样,并以进程pid为区分输出至日志

> Tips: 可以自定义实现对协程或挂起事件的startTime比对,合理杀死过长挂起的协程/事件
### 遥控管理方案

1. 在进程中实现命令对应的控制逻辑,例如:`kill``dump``check`
2. 在服务进程启动时通过`redis`/`apcu`对通道进行监听,注册命令对应的控制监听
3. 遥控cli程序通过对通道的`pub`发送指定命令进行控制

> Tips: `webman`/`workerman` 环境基于`apcu`的共享缓存插件推荐:https://www.workerman.net/plugin/133
2 changes: 2 additions & 0 deletions src/Events/SwooleEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ public function destroy()
foreach (Coroutine::listCoroutines() as $coroutine) {
Coroutine::cancel($coroutine);
}
// Wait for coroutines to exit
usleep(100000);
// 退出event loop
Event::exit();
}
Expand Down
41 changes: 41 additions & 0 deletions src/Exceptions/KilledException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php
/**
* @author workbunny/Chaz6chez
* @email [email protected]
*/
declare(strict_types=1);

namespace Workbunny\WebmanCoroutine\Exceptions;

use Throwable;

/**
* 协程或挂起事件被杀死
*/
class KilledException extends RuntimeException
{
/**
* @var string|null
*/
protected null|string $event;

/**
* @param string $message
* @param int $code
* @param string|null $event
* @param Throwable|null $previous
*/
public function __construct(string $message = "", int $code = 0, ?string $event = null, ?Throwable $previous = null)
{
$this->event = $event;
parent::__construct($message, $code, $previous);
}

/**
* @return string|null
*/
public function getEvent(): ?string
{
return $this->event;
}
}
33 changes: 31 additions & 2 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

namespace Workbunny\WebmanCoroutine;

use WeakMap;
use Workbunny\WebmanCoroutine\Events\SwooleEvent;
use Workbunny\WebmanCoroutine\Events\SwowEvent;
use Workbunny\WebmanCoroutine\Handlers\DefaultHandler;
Expand All @@ -20,7 +21,15 @@
use Workbunny\WebmanCoroutine\Handlers\SwowWorkerman5Handler;

/**
* 工厂化启动器
* 工厂化启动器
* @method static void isAvailable()
* @method static void initEnv()
* @method static void waitFor(?\Closure $action = null, int|float $timeout = -1, ?string $event = null)
* @method static void wakeup(string $event)
* @method static void sleep(int|float $timeout = 0, ?string $event = null)
* @method static void kill(object|int|string $suspensionOrSuspensionId, string $message = 'kill', int $exitCode = 0)
* @method static null|WeakMap getSuspensionsWeakMap()
* @method static void setSuspensionsWeakMap(object $object, string|int $id, ?string $event, float|int $startTime)
*/
class Factory
{
Expand All @@ -47,7 +56,7 @@ class Factory
self::WORKERMAN_SWOOLE => SwooleWorkerman5Handler::class,
self::WORKBUNNY_SWOOLE => SwooleHandler::class,
self::REVOLT_FIBER => RevoltHandler::class,
self::RIPPLE_FIBER => RippleHandler::class,
self::RIPPLE_FIBER_4 => RippleHandler::class,
self::RIPPLE_FIBER_5 => RippleWorkerman5Handler::class,
];

Expand Down Expand Up @@ -167,4 +176,24 @@ public static function init(?string $eventLoopClass): void
self::$_currentEventLoop = $eventLoopClass;
}
}

/**
* 代理调用HandlerInterface方法
*
* @param string $name
* @param array $arguments
* @return mixed
*/
public static function __callStatic(string $name, array $arguments)
{
if (($handler = Factory::getCurrentHandler()) === null) {
Factory::init(null);
/** @var HandlerInterface $handler */
$handler = Factory::getCurrentHandler();
}
if (method_exists($handler, $name)) {
return $handler::$name(...$arguments);
}
throw new \BadMethodCallException("Method $name not exists. ");
}
}
12 changes: 8 additions & 4 deletions src/Handlers/DefaultHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
class DefaultHandler implements HandlerInterface
{
use HandlerMethods;

/**
* 测试用,为保证覆盖生成时不会无限等待
*
Expand Down Expand Up @@ -73,12 +75,14 @@ public static function wakeup(string $event): void
{
}

/** @inheritDoc
* @param float|int $timeout
* @param string|null $event
*/
/** @inheritDoc */
public static function sleep(float|int $timeout = 0, ?string $event = null): void
{
usleep(max((int) $timeout * 1000 * 1000, 0));
}

/** @inheritdoc */
public static function kill(object|int|string $suspensionOrSuspensionId, string $message = 'kill', int $exitCode = 0): void
{
}
}
19 changes: 19 additions & 0 deletions src/Handlers/HandlerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
namespace Workbunny\WebmanCoroutine\Handlers;

use Throwable;
use WeakMap;
use Workbunny\WebmanCoroutine\Exceptions\TimeoutException;

/**
Expand Down Expand Up @@ -57,4 +58,22 @@ public static function wakeup(string $event): void;
* @return void
*/
public static function sleep(int|float $timeout = 0, ?string $event = null): void;

/**
* 协程强制终止
*
* @param object|int|string $suspensionOrSuspensionId
* @param string $message
* @param int $exitCode
* @return void
*/
public static function kill(object|int|string $suspensionOrSuspensionId, string $message = 'kill', int $exitCode = 0): void;

/**
* 获取所有挂起的对象
*
* @return WeakMap
* @link HandlerMethods::getSuspensionsWeakMap()
*/
public static function getSuspensionsWeakMap(): WeakMap;
}
34 changes: 34 additions & 0 deletions src/Handlers/HandlerMethods.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,44 @@

namespace Workbunny\WebmanCoroutine\Handlers;

use WeakMap;
use Workerman\Worker;

trait HandlerMethods
{
/**
* @var WeakMap<object, array>|null <挂起对象, ['id' => int|string, 'event' => string|null, 'startTime' => float|int]>
*/
protected static ?WeakMap $_suspensionsWeakMap = null;

/**
* 获取挂起对象
*
* @return WeakMap
*/
public static function getSuspensionsWeakMap(): WeakMap
{
return self::$_suspensionsWeakMap = static::$_suspensionsWeakMap ?: new WeakMap();
}

/**
* 添加挂起对象
*
* @param object $object
* @param string|int $id
* @param string|null $event
* @param float|int $startTime
* @return void
*/
public static function setSuspensionsWeakMap(object $object, string|int $id, ?string $event, float|int $startTime): void
{
static::getSuspensionsWeakMap()->offsetSet($object, [
'id' => $id,
'event' => $event,
'startTime' => $startTime
]);
}

/**
* @codeCoverageIgnore 为了测试可以mock
*
Expand Down
22 changes: 22 additions & 0 deletions src/Handlers/RevoltHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
namespace Workbunny\WebmanCoroutine\Handlers;

use Revolt\EventLoop;
use Workbunny\WebmanCoroutine\Exceptions\KilledException;
use Workbunny\WebmanCoroutine\Exceptions\TimeoutException;

use function Workbunny\WebmanCoroutine\package_installed;
Expand Down Expand Up @@ -77,6 +78,7 @@ public static function sleep(int|float $timeout = 0, ?string $event = null): voi
{
try {
$suspension = EventLoop::getSuspension();
static::setSuspensionsWeakMap($suspension, spl_object_hash($suspension), $event, microtime(true));
if ($event) {
static::$_suspensions[$event] = $suspension;
if ($timeout < 0) {
Expand Down Expand Up @@ -110,4 +112,24 @@ public static function sleep(int|float $timeout = 0, ?string $event = null): voi
}
}
}

/** @inheritdoc */
public static function kill(object|int|string $suspensionOrSuspensionId, string $message = 'kill', int $exitCode = 0): void
{
if ($suspensionOrSuspensionId instanceof EventLoop\Suspension) {
if ($info = static::getSuspensionsWeakMap()->offsetGet($suspensionOrSuspensionId)) {
$suspensionOrSuspensionId->throw(new KilledException($message, $exitCode, $info['event'] ?? null));
}
} else {
/**
* @var EventLoop\Suspension $object
* @var array $info
*/
foreach (static::getSuspensionsWeakMap() as $object => $info) {
if ($info['id'] === $suspensionOrSuspensionId) {
$object->throw(new KilledException($message, $exitCode, $info['event'] ?? null));
}
}
}
}
}
Loading

0 comments on commit bce4f36

Please sign in to comment.