Skip to content

Commit

Permalink
feat: AdaptiveBuilder完善
Browse files Browse the repository at this point in the history
  • Loading branch information
chaz6chez committed Jun 11, 2024
1 parent 5d1ab68 commit ba53498
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/Builders/AdaptiveBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public function onWorkerStart(Worker $worker): void
}));
}
// main timer
$this->adaptiveTimerCreate($this->timerInterval / 1000, function () use($worker) {
$this->adaptiveTimerCreate(function () use($worker) {
try {
// consume
return $this->consume($worker);
Expand Down
72 changes: 49 additions & 23 deletions src/Builders/Traits/AdaptiveTimerMethod.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ trait AdaptiveTimerMethod
/** @var float 最大定时器间隔 ms */
protected float $maxTimerInterval = 0.0;

/** @var float|null 定时器最初间隔 */
private float|null $timerInitialInterval = null;

/** @var int|null 最后一次获取消息的毫秒时间戳 */
private static ?int $lastMessageMilliTimestamp = null;

Expand Down Expand Up @@ -120,12 +123,11 @@ public function setMaxTimerInterval(float $maxTimerInterval): void
/**
* 添加自适应退避定时器
*
* @param int $millisecond
* @param Closure $func
* @param mixed ...$args
* @return string
*/
public function adaptiveTimerCreate(int $millisecond, Closure $func, mixed ...$args): string
public function adaptiveTimerCreate(Closure $func, mixed ...$args): string
{
if (!Worker::$globalEvent) {
throw new WebmanRqueueException("Event driver error. ", -1);
Expand All @@ -135,33 +137,57 @@ public function adaptiveTimerCreate(int $millisecond, Closure $func, mixed ...$a
// 增加定时器
$id = spl_object_hash($func);
self::$timerIdMap[$id] = Worker::$globalEvent->add(
$millisecond / 1000,
$this->timerInitialInterval = $this->getTimerInterval(),
EventInterface::EV_TIMER,
$callback = function (...$args) use ($func, $millisecond, $id, &$callback)
$callback = function (...$args) use ($func, $id, &$callback)
{
// 获取毫秒时间戳
$nowMilliTimestamp = intval(microtime(true) * 1000);
if (\call_user_func($func, ...$args)) {
self::$lastMessageMilliTimestamp = $nowMilliTimestamp;
self::$isMaxTimerInterval = false;
// 是否开启
$enable = ($this->avoidIndex > 0 and $this->idleThreshold and $this->maxTimerInterval > $this->timerInitialInterval);
// 执行业务逻辑
try {
if ($res = \call_user_func($func, ...$args)) {
// 设置最后一条执行时间
self::$lastMessageMilliTimestamp = $nowMilliTimestamp;
}
} catch (\Throwable){
$res = false;
}
if (
// 设置了闲置阈值、退避指数、最大时间间隔大于定时器初始时间间隔
$this->avoidIndex > 0 and $this->idleThreshold and $this->maxTimerInterval > $millisecond and
// 闲置超过闲置阈值
$nowMilliTimestamp - self::$lastMessageMilliTimestamp > $this->avoidIndex and
// 非最大间隔
!self::$isMaxTimerInterval
) {
$interval = min($this->avoidIndex * $millisecond, $this->maxTimerInterval);
// 如果到达最大值
if ($interval >= $this->maxTimerInterval) {
self::$isMaxTimerInterval = true;
// 如果自适应开启
if ($enable) {
// 有消费
if ($res) {
// 归零
self::$isMaxTimerInterval = false;
// 初始化间隔与间隔不相同则需要重新设置定时时间
$setTimer = $this->getTimerInterval() !== $this->timerInitialInterval;
// 定时器初始化
$this->setTimerInterval($this->timerInitialInterval);
}
// 无消费
else {
$setTimer = false;
if (
$nowMilliTimestamp - self::$lastMessageMilliTimestamp > $this->avoidIndex and // 闲置超过闲置阈值
!self::$isMaxTimerInterval // 非最大间隔
) {
$interval = min($this->avoidIndex * $millisecond, $this->maxTimerInterval);
// 如果到达最大值
if ($interval >= $this->maxTimerInterval) {
self::$isMaxTimerInterval = true;
}
$setTimer = true;
$this->setTimerInterval($interval);
}
}
// 是否需要设置定时器
if ($setTimer) {
// 移除之前的定时器
Worker::$globalEvent->del(self::$timerIdMap[$id], EventInterface::EV_TIMER);
// 新建定时器
self::$timerIdMap[$id] = Worker::$globalEvent->add($this->getTimerInterval(), EventInterface::EV_TIMER, $callback);
}
// 移除之前的定时器
Worker::$globalEvent->del(self::$timerIdMap[$id], EventInterface::EV_TIMER);
// 新建定时器
self::$timerIdMap[$id] = Worker::$globalEvent->add($interval, EventInterface::EV_TIMER, $callback);
}
},
$args
Expand Down

0 comments on commit ba53498

Please sign in to comment.