From d1c2fccee274ab03687af96f54ffe454972e807f Mon Sep 17 00:00:00 2001 From: s-aebischer Date: Tue, 22 Nov 2022 12:20:51 +0100 Subject: [PATCH] set parent job state to failed when orphaned child job is updated --- src/Queue.php | 48 +++++++++++++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/src/Queue.php b/src/Queue.php index 93bd0ab..671edd5 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -15,6 +15,7 @@ use League\Event\Emitter; use MongoDB\BSON\UTCDateTime; use MongoDB\Database; +use MongoDB\UpdateResult; use Psr\Container\ContainerInterface; use Psr\Log\LoggerInterface; use TaskScheduler\Exception\InvalidArgumentException; @@ -350,25 +351,15 @@ protected function rescheduleOrphanedJobs(): self ]); msg_send($this->queue, WorkerManager::TYPE_WORKER_ORPHANED_JOB, $orphaned_proc->toArray()); - } - } else { - $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([ - '_id' => $orphaned_proc->getId(), - ], [ - '$set' => ['status' => JobInterface::STATUS_FAILED], - ]); - - if ($this->container !== null) { - $instance = $this->container->get($orphaned_proc->getClass()); + } else { + $this->failJobAndNotifyJobClass($orphaned_proc); - if (method_exists($instance, 'notification')) { - $instance->notification(JobInterface::STATUS_FAILED, $orphaned_proc->toArray()); - } else { - $this->logger->info('method notification() does not exists on instance', [ - 'category' => get_class($this), - ]); - } + $this->logger->warning('set state of parent job ['.$orphaned_proc->getId().'] to failed', [ + 'category' => get_class($this), + ]); } + } else { + $result = $this->failJobAndNotifyJobClass($orphaned_proc); $this->logger->warning('found [{jobs}] orphaned parent job with jobId ['.$orphaned_proc->getId().'], reset state to failed', [ 'category' => get_class($this), @@ -380,6 +371,29 @@ protected function rescheduleOrphanedJobs(): self return $this; } + protected function failJobAndNotifyJobClass(Process $job): UpdateResult + { + $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([ + '_id' => $job->getId(), + ], [ + '$set' => ['status' => JobInterface::STATUS_FAILED], + ]); + + if ($this->container !== null) { + $instance = $this->container->get($job->getClass()); + + if (method_exists($instance, 'notification')) { + $instance->notification(JobInterface::STATUS_FAILED, $job->toArray()); + } else { + $this->logger->info('method notification() does not exists on instance', [ + 'category' => get_class($this), + ]); + } + } + + return $result; + } + protected function checkEndlessRunningWorkers(): self { $this->logger->debug('looking for endless running workers', [