From 74a37b21517792bb0cf1f3fe30707d6ee25fea27 Mon Sep 17 00:00:00 2001 From: s-aebischer Date: Wed, 6 Apr 2022 10:45:03 +0200 Subject: [PATCH] #41 get correct worker for process --- src/Queue.php | 97 ++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 88 insertions(+), 9 deletions(-) diff --git a/src/Queue.php b/src/Queue.php index fda780e..14aee6c 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -25,9 +25,24 @@ class Queue use EventsTrait; /** - * Queue options. + * Orphaned job timeout. */ - public const OPTION_ORPHANED_TIMEOUT = 'orphaned_timeout'; + public const OPTION_ORPHANED_JOB_TIMEOUT = 'orphaned_job_timeout'; + + /** + * Orphaned worker timeout. + */ + public const OPTION_ORPHANED_WORKER_TIMEOUT = 'orphaned_worker_timeout'; + + /** + * Minimum waiting jobs for worker timout. + */ + public const OPTION_WAITING_JOBS_FOR_WORKER_TIMEOUT = 'waiting_jobs_for_worker_timeout'; + + /** + * Amount of processing jobs for worker timout. + */ + public const OPTION_PROCESSING_JOBS_FOR_WORKER_TIMEOUT = 'processing_for_jobs_worker_timeout'; /** * Database. @@ -72,11 +87,32 @@ class Queue protected $scheduler; /** - * Orphaned timeout. + * Orphaned job timeout. + * + * @var int + */ + protected $orphaned_job_timeout = 30; + + /** + * Orphaned worker timeout. + * + * @var int + */ + protected $orphaned_worker_timeout = 600; + + /** + * Minimum waiting jobs for worker timeout. + * + * @var int + */ + protected $waiting_jobs_for_worker_timeout = 5; + + /** + * Amount of processing jobs for worker timeout. * * @var int */ - protected $orphaned_timeout = 30; + protected $processing_for_jobs_worker_timeout = 0; /** * Init queue. @@ -98,7 +134,10 @@ public function setOptions(array $config = []): self { foreach ($config as $option => $value) { switch ($option) { - case self::OPTION_ORPHANED_TIMEOUT: + case self::OPTION_ORPHANED_JOB_TIMEOUT: + case self::OPTION_ORPHANED_WORKER_TIMEOUT: + case self::OPTION_WAITING_JOBS_FOR_WORKER_TIMEOUT: + case self::OPTION_PROCESSING_JOBS_FOR_WORKER_TIMEOUT: if (!is_int($value)) { throw new InvalidArgumentException($option.' needs to be an integer'); } @@ -240,14 +279,19 @@ protected function main(): void $this->handleJob((array) $job); } - $start = time(); + $start_job = $start_worker = time(); $cursor_watch->rewind(); while ($this->loop()) { if (!$cursor_watch->valid()) { - if (time() - $start >= $this->orphaned_timeout) { + if (time() - $start_job >= $this->orphaned_job_timeout) { $this->rescheduleOrphanedJobs(); - $start = time(); + $start_job = time(); + } + + if (time() - $start_worker >= $this->orphaned_worker_timeout) { + $this->checkForOrphanedWorkers(); + $start_worker = time(); } $cursor_watch->next(); @@ -274,7 +318,7 @@ protected function rescheduleOrphanedJobs(): self $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([ 'status' => JobInterface::STATUS_PROCESSING, - 'alive' => ['$lt' => new UTCDateTime((time() - $this->orphaned_timeout) * 1000)], + 'alive' => ['$lt' => new UTCDateTime((time() - $this->orphaned_job_timeout) * 1000)], ], [ '$set' => ['status' => JobInterface::STATUS_WAITING, 'worker' => null], ]); @@ -287,6 +331,41 @@ protected function rescheduleOrphanedJobs(): self return $this; } + protected function checkForOrphanedWorkers(): self + { + $this->logger->debug('looking for orphaned workers', [ + 'category' => get_class($this), + ]); + + $waiting = $this->db->{$this->scheduler->getJobQueue()}->find([ + 'status' => JobInterface::STATUS_WAITING, + ])->toArray(); + + $waiting = count($waiting); + + $this->logger->debug('found [{jobs}] waiting jobs', [ + 'category' => get_class($this), + 'jobs' => $waiting, + ]); + + $processing = $this->db->{$this->scheduler->getJobQueue()}->find([ + 'status' => JobInterface::STATUS_PROCESSING, + ])->toArray(); + + $processing = count($processing); + + $this->logger->debug('found [{jobs}] processing jobs', [ + 'category' => get_class($this), + 'jobs' => $processing, + ]); + + if ($waiting > $this->waiting_jobs_for_worker_timeout && $processing === $this->processing_for_jobs_worker_timeout) { + $this->exitWorkerManager(SIGCHLD, ['pid' => $this->manager_pid]); + } + + return $this; + } + /** * Handle job. */