Skip to content

Commit

Permalink
feat: supported workerman 5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
chaz6chez committed Sep 4, 2024
1 parent 799ef4b commit 3346ed4
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 61 deletions.
131 changes: 70 additions & 61 deletions src/Builders/Traits/AdaptiveTimerMethod.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Workbunny\WebmanRqueue\Exceptions\WebmanRqueueException;
use Workerman\Events\EventInterface;
use Workerman\Worker;
use function Workbunny\WebmanRqueue\is_worker_version_5;

trait AdaptiveTimerMethod
{
Expand Down Expand Up @@ -184,67 +185,69 @@ public function adaptiveTimerCreate(Closure $func, mixed ...$args): string
$this->setTimerInitialInterval($this->getTimerInterval());
// 初始化时间
self::setLastMessageMilliTimestamp(self::getMilliTime());
// 创建定时器
self::$timerIdMap[$id] = Worker::$globalEvent->add($this->getTimerInitialInterval() / 1000, EventInterface::EV_TIMER,
$callback = function (...$args) use ($func, $id, &$callback)
{
// 获取毫秒时间戳
$nowMilliTimestamp = self::getMilliTime();
// 是否开启自适应
$enable = (
// 设置了退避指数
$this->getAvoidIndex() > 0 and
// 定时器间隔小于最大间隔
$this->getMaxTimerInterval() > $this->getTimerInitialInterval()
);
// 执行回调
try {
if ($result = \call_user_func($func, ...$args)) {
// 设置执行时间
self::setLastMessageMilliTimestamp($nowMilliTimestamp);
}
} catch (\Throwable){
// 异常为负反馈
$result = false;
$callback = function (...$args) use ($func, $id, &$callback)
{
// 获取毫秒时间戳
$nowMilliTimestamp = self::getMilliTime();
// 是否开启自适应
$enable = (
// 设置了退避指数
$this->getAvoidIndex() > 0 and
// 定时器间隔小于最大间隔
$this->getMaxTimerInterval() > $this->getTimerInitialInterval()
);
// 执行回调
try {
if ($result = \call_user_func($func, ...$args)) {
// 设置执行时间
self::setLastMessageMilliTimestamp($nowMilliTimestamp);
}
// 如果自适应开启
if ($enable) {
// 正反馈
if ($result) {
// 归零
self::$isMaxTimerInterval = false;
// 重新设置定时器
$setTimer = true;
// 定时器初始化
$this->setTimerInterval($this->getTimerInitialInterval());
}
// 负反馈
else {
$setTimer = false;
if (
$nowMilliTimestamp - self::getLastMessageMilliTimestamp() > $this->getIdleThreshold() and // 闲置超过闲置阈值
!self::isMaxTimerInterval() // 非最大间隔
) {
$interval = min($this->getAvoidIndex() * $this->getTimerInterval(), $this->getMaxTimerInterval());
// 如果到达最大值
if ($interval >= $this->getMaxTimerInterval()) {
self::$isMaxTimerInterval = true;
}
$setTimer = true;
$this->setTimerInterval($interval);
} catch (\Throwable){
// 异常为负反馈
$result = false;
}
// 如果自适应开启
if ($enable) {
// 正反馈
if ($result) {
// 归零
self::$isMaxTimerInterval = false;
// 重新设置定时器
$setTimer = true;
// 定时器初始化
$this->setTimerInterval($this->getTimerInitialInterval());
}
// 负反馈
else {
$setTimer = false;
if (
$nowMilliTimestamp - self::getLastMessageMilliTimestamp() > $this->getIdleThreshold() and // 闲置超过闲置阈值
!self::isMaxTimerInterval() // 非最大间隔
) {
$interval = min($this->getAvoidIndex() * $this->getTimerInterval(), $this->getMaxTimerInterval());
// 如果到达最大值
if ($interval >= $this->getMaxTimerInterval()) {
self::$isMaxTimerInterval = true;
}
$setTimer = true;
$this->setTimerInterval($interval);
}
// 重置定时器
if ($setTimer) {
// 移除旧定时器
self::adaptiveTimerDelete($id);
// 创建新定时器
self::$timerIdMap[$id] = Worker::$globalEvent->add($this->getTimerInterval() / 1000, EventInterface::EV_TIMER, $callback);
}
}
},
$args
);
// 重置定时器
if ($setTimer) {
// 移除旧定时器
self::adaptiveTimerDelete($id);
// 创建新定时器
self::$timerIdMap[$id] = is_worker_version_5() and method_exists(Worker::$globalEvent, 'delay')
? Worker::$globalEvent->delay(floatval($this->getTimerInitialInterval() / 1000), $callback, $args)
: Worker::$globalEvent->add(floatval($this->getTimerInterval() / 1000), EventInterface::EV_TIMER, $callback);
}
}
};
// 创建定时器
self::$timerIdMap[$id] = is_worker_version_5() and method_exists(Worker::$globalEvent, 'delay')
? Worker::$globalEvent->delay(floatval($this->getTimerInitialInterval() / 1000), $callback, $args)
: Worker::$globalEvent->add(floatval($this->getTimerInitialInterval() / 1000), EventInterface::EV_TIMER, $callback, $args);
return $id;
}

Expand All @@ -261,14 +264,20 @@ public function adaptiveTimerDelete(?string $id = null): void
}
if ($id === null) {
foreach(self::$timerIdMap as $id) {
Worker::$globalEvent->del(
$id, EventInterface::EV_TIMER);
if (is_worker_version_5() and method_exists(Worker::$globalEvent, 'offDelay')) {
Worker::$globalEvent->offDelay($id);
} else {
Worker::$globalEvent->del($id, EventInterface::EV_TIMER);
}
}
self::$timerIdMap = [];
} else {
if ($id = self::$timerIdMap[$id] ?? null) {
Worker::$globalEvent->del(
$id, EventInterface::EV_TIMER);
if (is_worker_version_5() and method_exists(Worker::$globalEvent, 'offDelay')) {
Worker::$globalEvent->offDelay($id);
} else {
Worker::$globalEvent->del($id, EventInterface::EV_TIMER);
}
unset(self::$timerIdMap[$id]);
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/helpers.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
use Workbunny\WebmanRqueue\Builders\AbstractBuilder;
use Workbunny\WebmanRqueue\Builders\QueueBuilder;
use Workbunny\WebmanRqueue\Exceptions\WebmanRqueueException;
use Workerman\Worker;

/**
* 同步生产
* @param QueueBuilder $builder
* @param string $body
* @param array $headers
* @param bool $temp
* @return int|false
* @throws WebmanRqueueException
*/
Expand All @@ -35,6 +37,15 @@ function sync_publish_get_ids(AbstractBuilder $builder, string $body, array $hea
return $builder->publishGetIds($body, $headers, temp: $temp);
}

/**
* Version 5.x uses a new event interface
* @return bool
*/
function is_worker_version_5(): bool
{
return version_compare(Worker::VERSION, '5.0.0', '>=');
}

/**
* @param string|null $key
* @param mixed|null $default
Expand Down

0 comments on commit 3346ed4

Please sign in to comment.