Skip to content

Commit

Permalink
#41 set job status failed when worker is running endless
Browse files Browse the repository at this point in the history
  • Loading branch information
s-aebischer committed Apr 7, 2022
1 parent 74a37b2 commit 49cd147
Showing 1 changed file with 32 additions and 44 deletions.
76 changes: 32 additions & 44 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,19 @@ class Queue
use EventsTrait;

/**
* Orphaned job timeout.
* Orphaned timeout.
*/
public const OPTION_ORPHANED_JOB_TIMEOUT = 'orphaned_job_timeout';
public const OPTION_ORPHANED_TIMEOUT = 'orphaned_timeout';

/**
* Orphaned worker timeout.
* Endless worker timeout.
*/
public const OPTION_ORPHANED_WORKER_TIMEOUT = 'orphaned_worker_timeout';
public const OPTION_ENDLESS_WORKER_TIMEOUT = 'endless_worker_timeout';

/**
* Minimum waiting jobs for worker timout.
* Minimum waiting jobs for endless worker timout.
*/
public const OPTION_WAITING_JOBS_FOR_WORKER_TIMEOUT = 'waiting_jobs_for_worker_timeout';

/**
* Amount of processing jobs for worker timout.
*/
public const OPTION_PROCESSING_JOBS_FOR_WORKER_TIMEOUT = 'processing_for_jobs_worker_timeout';
public const OPTION_WAITING_JOBS_FOR_ENDLESS_WORKER = 'waiting_jobs_for_endless_worker';

/**
* Database.
Expand Down Expand Up @@ -87,32 +82,25 @@ class Queue
protected $scheduler;

/**
* Orphaned job timeout.
*
* @var int
*/
protected $orphaned_job_timeout = 30;

/**
* Orphaned worker timeout.
* Orphaned timeout.
*
* @var int
*/
protected $orphaned_worker_timeout = 600;
protected $orphaned_timeout = 30;

/**
* Minimum waiting jobs for worker timeout.
* Endless worker timeout.
*
* @var int
*/
protected $waiting_jobs_for_worker_timeout = 5;
protected $endless_worker_timeout = 600;

/**
* Amount of processing jobs for worker timeout.
* Minimum waiting jobs endless worker restart.
*
* @var int
*/
protected $processing_for_jobs_worker_timeout = 0;
protected $waiting_jobs_for_endless_worker = 5;

/**
* Init queue.
Expand All @@ -134,10 +122,9 @@ public function setOptions(array $config = []): self
{
foreach ($config as $option => $value) {
switch ($option) {
case self::OPTION_ORPHANED_JOB_TIMEOUT:
case self::OPTION_ORPHANED_WORKER_TIMEOUT:
case self::OPTION_WAITING_JOBS_FOR_WORKER_TIMEOUT:
case self::OPTION_PROCESSING_JOBS_FOR_WORKER_TIMEOUT:
case self::OPTION_ORPHANED_TIMEOUT:
case self::OPTION_ENDLESS_WORKER_TIMEOUT:
case self::OPTION_WAITING_JOBS_FOR_ENDLESS_WORKER:
if (!is_int($value)) {
throw new InvalidArgumentException($option.' needs to be an integer');
}
Expand Down Expand Up @@ -284,13 +271,13 @@ protected function main(): void
$cursor_watch->rewind();
while ($this->loop()) {
if (!$cursor_watch->valid()) {
if (time() - $start_job >= $this->orphaned_job_timeout) {
if (time() - $start_job >= $this->orphaned_timeout) {
$this->rescheduleOrphanedJobs();
$start_job = time();
}

if (time() - $start_worker >= $this->orphaned_worker_timeout) {
$this->checkForOrphanedWorkers();
if (time() - $start_worker >= $this->endless_worker_timeout) {
$this->checkEndlessRunningWorkers();
$start_worker = time();
}

Expand Down Expand Up @@ -318,7 +305,7 @@ protected function rescheduleOrphanedJobs(): self

$result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
'status' => JobInterface::STATUS_PROCESSING,
'alive' => ['$lt' => new UTCDateTime((time() - $this->orphaned_job_timeout) * 1000)],
'alive' => ['$lt' => new UTCDateTime((time() - $this->orphaned_timeout) * 1000)],
], [
'$set' => ['status' => JobInterface::STATUS_WAITING, 'worker' => null],
]);
Expand All @@ -331,35 +318,36 @@ protected function rescheduleOrphanedJobs(): self
return $this;
}

protected function checkForOrphanedWorkers(): self
protected function checkEndlessRunningWorkers(): self
{
$this->logger->debug('looking for orphaned workers', [
$this->logger->debug('looking for endless running workers', [
'category' => get_class($this),
]);

$waiting = $this->db->{$this->scheduler->getJobQueue()}->find([
'status' => JobInterface::STATUS_WAITING,
])->toArray();

$waiting = count($waiting);

$this->logger->debug('found [{jobs}] waiting jobs', [
'category' => get_class($this),
'jobs' => $waiting,
]);

$processing = $this->db->{$this->scheduler->getJobQueue()}->find([
'status' => JobInterface::STATUS_PROCESSING,
])->toArray();

$waiting = count($waiting);
$processing = count($processing);

$this->logger->debug('found [{jobs}] processing jobs', [
$this->logger->debug('found [{jobs_waiting}] waiting jobs and [{jobs_processing}] processing jobs', [
'category' => get_class($this),
'jobs' => $processing,
'jobs_waiting' => $waiting,
'jobs_processing' => $processing,
]);

if ($waiting > $this->waiting_jobs_for_worker_timeout && $processing === $this->processing_for_jobs_worker_timeout) {
if ($waiting > $this->waiting_jobs_for_endless_worker && 0 === $processing) {
$this->db->{$this->scheduler->getJobQueue()}->updateMany([
'status' => JobInterface::STATUS_WAITING,
], [
'$set' => ['status' => JobInterface::STATUS_FAILED, 'worker' => null],
]);

$this->exitWorkerManager(SIGCHLD, ['pid' => $this->manager_pid]);
}

Expand Down

0 comments on commit 49cd147

Please sign in to comment.