Skip to content

Commit

Permalink
#41 get correct worker for process
Browse files Browse the repository at this point in the history
  • Loading branch information
s-aebischer committed Apr 6, 2022
1 parent 661db58 commit 74a37b2
Showing 1 changed file with 88 additions and 9 deletions.
97 changes: 88 additions & 9 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,24 @@ class Queue
use EventsTrait;

/**
* Queue options.
* Orphaned job timeout.
*/
public const OPTION_ORPHANED_TIMEOUT = 'orphaned_timeout';
public const OPTION_ORPHANED_JOB_TIMEOUT = 'orphaned_job_timeout';

/**
* Orphaned worker timeout.
*/
public const OPTION_ORPHANED_WORKER_TIMEOUT = 'orphaned_worker_timeout';

/**
* Minimum waiting jobs for worker timout.
*/
public const OPTION_WAITING_JOBS_FOR_WORKER_TIMEOUT = 'waiting_jobs_for_worker_timeout';

/**
* Amount of processing jobs for worker timout.
*/
public const OPTION_PROCESSING_JOBS_FOR_WORKER_TIMEOUT = 'processing_for_jobs_worker_timeout';

/**
* Database.
Expand Down Expand Up @@ -72,11 +87,32 @@ class Queue
protected $scheduler;

/**
* Orphaned timeout.
* Orphaned job timeout.
*
* @var int
*/
protected $orphaned_job_timeout = 30;

/**
* Orphaned worker timeout.
*
* @var int
*/
protected $orphaned_worker_timeout = 600;

/**
* Minimum waiting jobs for worker timeout.
*
* @var int
*/
protected $waiting_jobs_for_worker_timeout = 5;

/**
* Amount of processing jobs for worker timeout.
*
* @var int
*/
protected $orphaned_timeout = 30;
protected $processing_for_jobs_worker_timeout = 0;

/**
* Init queue.
Expand All @@ -98,7 +134,10 @@ public function setOptions(array $config = []): self
{
foreach ($config as $option => $value) {
switch ($option) {
case self::OPTION_ORPHANED_TIMEOUT:
case self::OPTION_ORPHANED_JOB_TIMEOUT:
case self::OPTION_ORPHANED_WORKER_TIMEOUT:
case self::OPTION_WAITING_JOBS_FOR_WORKER_TIMEOUT:
case self::OPTION_PROCESSING_JOBS_FOR_WORKER_TIMEOUT:
if (!is_int($value)) {
throw new InvalidArgumentException($option.' needs to be an integer');
}
Expand Down Expand Up @@ -240,14 +279,19 @@ protected function main(): void
$this->handleJob((array) $job);
}

$start = time();
$start_job = $start_worker = time();

$cursor_watch->rewind();
while ($this->loop()) {
if (!$cursor_watch->valid()) {
if (time() - $start >= $this->orphaned_timeout) {
if (time() - $start_job >= $this->orphaned_job_timeout) {
$this->rescheduleOrphanedJobs();
$start = time();
$start_job = time();
}

if (time() - $start_worker >= $this->orphaned_worker_timeout) {
$this->checkForOrphanedWorkers();
$start_worker = time();
}

$cursor_watch->next();
Expand All @@ -274,7 +318,7 @@ protected function rescheduleOrphanedJobs(): self

$result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
'status' => JobInterface::STATUS_PROCESSING,
'alive' => ['$lt' => new UTCDateTime((time() - $this->orphaned_timeout) * 1000)],
'alive' => ['$lt' => new UTCDateTime((time() - $this->orphaned_job_timeout) * 1000)],
], [
'$set' => ['status' => JobInterface::STATUS_WAITING, 'worker' => null],
]);
Expand All @@ -287,6 +331,41 @@ protected function rescheduleOrphanedJobs(): self
return $this;
}

protected function checkForOrphanedWorkers(): self
{
$this->logger->debug('looking for orphaned workers', [
'category' => get_class($this),
]);

$waiting = $this->db->{$this->scheduler->getJobQueue()}->find([
'status' => JobInterface::STATUS_WAITING,
])->toArray();

$waiting = count($waiting);

$this->logger->debug('found [{jobs}] waiting jobs', [
'category' => get_class($this),
'jobs' => $waiting,
]);

$processing = $this->db->{$this->scheduler->getJobQueue()}->find([
'status' => JobInterface::STATUS_PROCESSING,
])->toArray();

$processing = count($processing);

$this->logger->debug('found [{jobs}] processing jobs', [
'category' => get_class($this),
'jobs' => $processing,
]);

if ($waiting > $this->waiting_jobs_for_worker_timeout && $processing === $this->processing_for_jobs_worker_timeout) {
$this->exitWorkerManager(SIGCHLD, ['pid' => $this->manager_pid]);
}

return $this;
}

/**
* Handle job.
*/
Expand Down

0 comments on commit 74a37b2

Please sign in to comment.