Skip to content

Commit

Permalink
set parent job state to failed when orphaned child job is updated
Browse files Browse the repository at this point in the history
  • Loading branch information
s-aebischer committed Nov 22, 2022
1 parent 653f94b commit d1c2fcc
Showing 1 changed file with 31 additions and 17 deletions.
48 changes: 31 additions & 17 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use League\Event\Emitter;
use MongoDB\BSON\UTCDateTime;
use MongoDB\Database;
use MongoDB\UpdateResult;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use TaskScheduler\Exception\InvalidArgumentException;
Expand Down Expand Up @@ -350,25 +351,15 @@ protected function rescheduleOrphanedJobs(): self
]);

msg_send($this->queue, WorkerManager::TYPE_WORKER_ORPHANED_JOB, $orphaned_proc->toArray());
}
} else {
$result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
'_id' => $orphaned_proc->getId(),
], [
'$set' => ['status' => JobInterface::STATUS_FAILED],
]);

if ($this->container !== null) {
$instance = $this->container->get($orphaned_proc->getClass());
} else {
$this->failJobAndNotifyJobClass($orphaned_proc);

if (method_exists($instance, 'notification')) {
$instance->notification(JobInterface::STATUS_FAILED, $orphaned_proc->toArray());
} else {
$this->logger->info('method notification() does not exists on instance', [
'category' => get_class($this),
]);
}
$this->logger->warning('set state of parent job ['.$orphaned_proc->getId().'] to failed', [
'category' => get_class($this),
]);
}
} else {
$result = $this->failJobAndNotifyJobClass($orphaned_proc);

$this->logger->warning('found [{jobs}] orphaned parent job with jobId ['.$orphaned_proc->getId().'], reset state to failed', [
'category' => get_class($this),
Expand All @@ -380,6 +371,29 @@ protected function rescheduleOrphanedJobs(): self
return $this;
}

protected function failJobAndNotifyJobClass(Process $job): UpdateResult
{
$result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
'_id' => $job->getId(),
], [
'$set' => ['status' => JobInterface::STATUS_FAILED],
]);

if ($this->container !== null) {
$instance = $this->container->get($job->getClass());

if (method_exists($instance, 'notification')) {
$instance->notification(JobInterface::STATUS_FAILED, $job->toArray());
} else {
$this->logger->info('method notification() does not exists on instance', [
'category' => get_class($this),
]);
}
}

return $result;
}

protected function checkEndlessRunningWorkers(): self
{
$this->logger->debug('looking for endless running workers', [
Expand Down

0 comments on commit d1c2fcc

Please sign in to comment.