diff --git a/src/Dispatcher.php b/src/Dispatcher.php index a56f8e6..4c73214 100644 --- a/src/Dispatcher.php +++ b/src/Dispatcher.php @@ -136,6 +136,11 @@ class Dispatcher extends AbstractMaster implements ConsumerHandlerInterface */ private $shutdownError; + /** + * @var int 关闭的超时时间,超过后dispatcher强制自行退出. + */ + private $shutdownTimeoutSec = 30; + public function __construct(AMQPConnectionInterface $connection, WorkerFactoryInterface $factory) { parent::__construct(); @@ -382,6 +387,20 @@ public function setPatrolPeriod(int $seconds): self return $this; } + /** + * 设置退出超时时间. + * + * @param int $sec + * + * @return Dispatcher + */ + public function setShutdownTimeoutSec(int $sec): self + { + $this->shutdownTimeoutSec = $sec; + + return $this; + } + /** * @return array */ @@ -565,16 +584,22 @@ private function flushCached() */ private function informWorkersQuit() { - foreach ($this->workersInfo as $workerID => $each) { - try { - $this->sendLastMessage($workerID); - } catch (\Throwable $e) { - // TODO 如果没有成功向所有进程发送关闭,考虑是否需要做重试机制 - $this->errorlessEmit('error', ['shuttingDown', $e]); + $startInformTime = time(); + $informCount = 0; + do { + // 每10秒进行一次通知 + if ((time() - $startInformTime) / 10 >= $informCount) { + $informCount++; + foreach ($this->workersInfo as $workerID => $each) { + try { + $this->sendLastMessage($workerID); + } catch (\Throwable $e) { + // TODO 如果没有成功向所有进程发送关闭,考虑是否需要做重试机制 + $this->errorlessEmit('error', ['shuttingDown', $e]); + } + } } - } - do { if ($this->countWorkers() === 0) { $this->state = self::STATE_SHUTDOWN; break; @@ -582,7 +607,8 @@ private function informWorkersQuit() $this->process(0.5); $this->waitChildren(); - } while (true); + // 防止子进程无响应,这里循环一定时间后直接退出 + } while ((time() - $startInformTime) < $this->shutdownTimeoutSec); } private function waitChildren()