Skip to content

Commit

Permalink
beta4
Browse files Browse the repository at this point in the history
  • Loading branch information
raffis committed Sep 24, 2018
1 parent 6bc3eb7 commit 9ca2374
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 61 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
## 3.0.0
## 3.0.0-beta7
**Maintainer**: Raffael Sahli <[email protected]>\
**Date**:
**Date**: Fri Sept 24 15:24:34 CEST 2018

* [FIX] Locally queued job gets rescheduled if was previously overwritten by the capped collection size limit
* [FIX] Fixed retry_interval with timeout jobs


## 3.0.0-beta6
Expand Down
81 changes: 76 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ The documentation for v2 is available [here](https://github.com/gyselroth/mongod
* [Get jobs](#get-jobs)
* [Cancel job](#cancel-job)
* [Modify jobs](#modify-jobs)
* [Handling of failed jobs](#handling-of-failed-jobs)
* [Asynchronous programming](#asynchronous-programming)
* [Listen for Events](#listen-for-events)
* [Advanced job options](#advanced-job-options)
Expand All @@ -59,7 +60,7 @@ The documentation for v2 is available [here](https://github.com/gyselroth/mongod
* [Advanced queue node options](#advanced-queue-node-options)
* [Using a DIC (dependeny injection container)](#using-a-dic-dependeny-injection-container)
* [Data Persistency](#data-persistency)
* [Terminate queue nodes](#terminate-queue-nodes)
* [Signal handling](#signal-handling)

## Why?
PHP isn't a multithreaded language and neither can it handle (most) tasks asynchronous. Sure there is pthreads and pcntl but those are only usable in cli mode (or should only be used there). Using this library you are able to write your code async, schedule tasks and let them execute behind the scenes.
Expand Down Expand Up @@ -269,6 +270,73 @@ A canceled job will not get rescheduled. You will need to create a new job manua
#### Modify jobs
It is **not** possible to modify a scheduled job by design. You need to cancel the job and append a new one.

### Handling of failed jobs

A job is acknowledged as failed if the job throws an exception of any kind.
If we have a look at our mail job again, but this time it will throw an exception:

```php
class MailJob extends TaskScheduler\AbstractJob
{
/**
* {@inheritdoc}
*/
public function start(): bool
{
$transport = new Zend\Mail\Transport\Sendmail();
$mail = Message::fromString($this->data);
$this->transport->send($mail);
throw \Exception('i am an exception');
return true;
}
}
```

This will lead to a FAILED job as soon as this job gets executed.

>**Note**: It does not matter if you return `true` or `false`, only an uncaught exception will result to a FAILED job, however you should always return `true`.
The scheduler has an integreated handling of failed jobs. You may specify to automatically reschedule a job if it failed.
The following will reschedule the job up to 5 times (If it ended with status FAILED) with an interval of 30s.

```php
$scheduler->addJob(MailJob::class, $mail->toString(), [
TaskScheduler\Scheduler::OPTION_RETRY => 5,
TaskScheduler\Scheduler::OPTION_RETRY_INTERVAL => 30,
]);
```

This will queue our mail to be executed in one hour from now and it will re-schedule the job up to three times if it fails with an interval of one minute.

### Add job if not exists
What you also can do is adding the job only if it has not been queued yet.
Instead using `addJob()` you can use `addJobOnce()`, the scheduler then verifies if it got the same job already queued. If not, the job gets added.
The scheduler compares the type of job (`MailJob` in this case) and the data submitted (`$mail->toString()` in this case).

>**Note**: The job gets rescheduled if options get changed.
```php
$scheduler->addJobOnce(MailJob::class, $mail->toString(), [
TaskScheduler\Scheduler::OPTION_AT => time()+3600,
TaskScheduler\Scheduler::OPTION_RETRY => 3,
]);
```
By default `TaskScheduler\Scheduler::addJobOnce()` does compare the job class, the submitted data and the process status (either RUNNING, WAITING or POSTPONED).
If you do not want to check the data, you may set `TaskScheduler\Scheduler::OPTION_IGNORE_DATA` to `true`. This will tell the scheduler to only reschedule the job of the given class
if the data changed. This is quite useful if a job of the given class must only be queued once.


>**Note**: This option does not make sense in the mail example we're using here. A mail can have different content. But it may happen that you have job which clears a temporary storage every 24h:
```php
$scheduler->addJobOnce(MyApp\CleanTemp::class, ['max_age' => 3600], [
TaskScheduler\Scheduler::OPTION_IGNORE_DATA => true,


### Initialize scheduler

You need an instance of a MongoDB\Database and a Psr\Log\LoggerInterface compatible logger to initialize the scheduler.


### Asynchronous programming

Have a look at this example:
Expand Down Expand Up @@ -351,11 +419,11 @@ TaskScheduler\Scheduler::addJob()/TaskScheduler\Scheduler::addJobOnce() also acc
| `ignore_data` | `false` | bool | Only useful if set in a addJobOnce() call. If `true` the scheduler does not compare the jobs data to decide if a job needs to get rescheduled. |


>**Note**: Be careful with timeouts since it will kill your running job by force. You have been warned.
>**Note**: Be careful with timeouts since it will kill your running job by force. You have been warned. You shall always use a native timeout in a function if supported.
Let us add our mail job example again with some custom options:

>**Note:** We are using the OPTION_ constansts here, you may also just use the names documented above.
>**Note**: We are using the OPTION_ constansts here, you may also just use the names documented above.
```php
$mongodb = new MongoDB\Client('mongodb://localhost:27017');
Expand All @@ -381,7 +449,7 @@ What you also can do is adding the job only if it has not been queued yet.
Instead using `addJob()` you can use `addJobOnce()`, the scheduler then verifies if it got the same job already queued. If not, the job gets added.
The scheduler compares the type of job (`MailJob` in this case) and the data submitted (`$mail->toString()` in this case).

**Note:** The job gets rescheduled if options get changed.
>**Note**: The job gets rescheduled if options get changed.
```php
$scheduler->addJobOnce(MailJob::class, $mail->toString(), [
Expand Down Expand Up @@ -545,10 +613,13 @@ $queue = new TaskScheduler\Queue($scheduler, $mongodb, $worker_factory, $logger)
This library does not provide any data persistency! It is important to understand this fact. This library operates on a [MongoDB capped collection](https://docs.mongodb.com/manual/core/capped-collections) with
a fixed size limit by design. Meaning the newest job will overwrite the oldest job if the limit is reached. A side note here regarding postponed jobs (TaskScheduler\Scheduler::OPTION_AT), those jobs will get queued locally once received. If they fall out from the network queue, they will get executed anyway. If a worker dies they will get rescheduled if possible. **But** interval jobs are not meant to be persistent and there is no guarantee for that. It is best practice during bootstraping your queue node to schedule jobs if they are not already scheduled from a persitent data source (For example using [TaskScheduler\Scheduler::addJobOnce](#add-job-if-not-exists)).

>**Note**: This is planned to change in v4. v4 will feature persistency for jobs.
### Terminate queue nodes
### Signal handling

Terminating queue nodes is possible of course. They even manage to reschedule running jobs. You just need to send a SIGTERM to the process. The queue node then will transmit this to all running workers and they
will save their state and nicely exit. A worker also saves its state if the worker process directly receives a SIGTERM.
If a SIGKILL was used to terminate the queue node (or worker) the state can not be saved and you might get zombie jobs (Jobs with the state PROCESSING but no worker will actually process those jobs).
No good sysadmin will terminate running jobs by using SIGKILL, it is not acceptable and may only be used if you know what you are doing.

You should as well avoid using never ending blocking functions in your job, php can't handle signals if you do that.
77 changes: 46 additions & 31 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,6 @@ public function __construct(Scheduler $scheduler, Database $db, WorkerFactoryInt

/**
* Set options.
*
*
* @return Queue
*/
public function setOptions(array $config = []): self
{
Expand Down Expand Up @@ -214,8 +211,8 @@ public function exitChild(int $sig, array $pid): self
if ($pid === $pid['pi']) {
unset($this->forks[$id]);

if (isset($this->jobs[$id])) {
unset($this->jobs[$id]);
if (isset($this->job_map[$id])) {
unset($this->job_map[$id]);
}
}
}
Expand Down Expand Up @@ -302,8 +299,6 @@ protected function spawnWorker()

/**
* Get forks (array of pid's).
*
* @return int[]
*/
protected function getForks(): array
{
Expand All @@ -323,10 +318,6 @@ protected function main(): void
]);

$cursor_events = $this->events->getCursor([
'$or' => [
['status' => JobInterface::STATUS_CANCELED],
['status' => JobInterface::STATUS_PROCESSING],
],
'timestamp' => ['$gte' => new UTCDateTime()],
]);

Expand Down Expand Up @@ -386,36 +377,60 @@ protected function main(): void
}

/**
* Handle cancel event.
* Handle events.
*/
protected function handleEvent(array $event): self
{
if (JobInterface::STATUS_PROCESSING === $event['status']) {
$this->job_map[(string) $event['worker']] = $event['job'];
$this->logger->debug('handle event ['.$event['status'].'] for job ['.$event['job'].']', [
'category' => get_class($this),
]);

return $this;
}
switch ($event) {
case JobInterface::STATUS_WAITING:
case JobInterface::STATUS_POSTPONED:
break;
case JobInterface::STATUS_PROCESSING:
$this->job_map[(string) $event['worker']] = $event['job'];

return $this;
case JobInterface::STATUS_DONE:
case JobInterface::STATUS_FAILED:
case JobInterface::STATUS_TIMEOUT:
$worker = array_search((string) $event['job'], $this->job_map, true);

if (false === $worker) {
return $this;
}

$worker = array_search((string) $event['job'], $this->job_map, true);
unset($this->job_map[$worker]);

if (false === $worker) {
return $this;
}
return $this;

$this->logger->debug('received cancel event for job ['.$event['job'].'] running on worker ['.$worker.']', [
'category' => get_class($this),
]);
break;
case JobInterface::STATUS_CANCELED:
$worker = array_search((string) $event['job'], $this->job_map, true);

if (isset($this->forks[(string) $worker])) {
$this->logger->debug('found running worker ['.$worker.'] on this queue node, terminate it now', [
'category' => get_class($this),
]);
if (false === $worker) {
return $this;
}

unset($this->job_map[(string) $event['job']]);
posix_kill($this->forks[(string) $worker], SIGKILL);
}
$this->logger->debug('received cancel event for job ['.$event['job'].'] running on worker ['.$worker.']', [
'category' => get_class($this),
]);

return $this;
if (isset($this->forks[(string) $worker])) {
$this->logger->debug('found running worker ['.$worker.'] on this queue node, terminate it now', [
'category' => get_class($this),
]);

unset($this->job_map[(string) $event['job']]);
posix_kill($this->forks[(string) $worker], SIGKILL);
}

return $this;
default:
return $this;
}
}

/**
Expand Down
43 changes: 21 additions & 22 deletions src/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use MongoDB\BSON\ObjectId;
use MongoDB\BSON\UTCDateTime;
use MongoDB\Database;
use MongoDB\Driver\Exception\BulkWriteException;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use TaskScheduler\Exception\InvalidJobException;
Expand Down Expand Up @@ -136,7 +135,7 @@ public function timeout(): ?ObjectId
]);

--$job['options']['retry'];
$job['options']['at'] = time() + $job['options']['at'];
$job['options']['at'] = time() + $job['options']['retry_interval'];
$job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);

return $job->getId();
Expand Down Expand Up @@ -209,6 +208,11 @@ public function start(): void
}

$job = $cursor->current();

$this->logger->debug('found job ['.$job['_id'].'] in queue with status ['.$job['status'].']', [
'category' => get_class($this),
]);

$this->jobs->next($cursor, function () {
$this->start();
});
Expand All @@ -225,7 +229,7 @@ public function cleanup()
$this->saveState();

if (null === $this->current_job) {
$this->logger->debug('received cleanup call, no job is currently processing, exit now', [
$this->logger->debug('received cleanup call on worker ['.$this->id.'], no job is currently processing, exit now', [
'category' => get_class($this),
'pm' => $this->process,
]);
Expand All @@ -235,7 +239,7 @@ public function cleanup()
return null;
}

$this->logger->debug('received cleanup call, reschedule current processing job ['.$this->current_job['_id'].']', [
$this->logger->debug('received cleanup call on worker ['.$this->id.'], reschedule current processing job ['.$this->current_job['_id'].']', [
'category' => get_class($this),
'pm' => $this->process,
]);
Expand Down Expand Up @@ -264,21 +268,11 @@ public function cleanup()
protected function saveState(): self
{
foreach ($this->queue as $key => $job) {
try {
$options = $job['options'];
$options[Scheduler::OPTION_ID] = $job['_id'];
$this->scheduler->addJob($job['class'], $job['data'], $options);
unset($this->queue[$key]);
} catch (\Exception $e) {
if ($e instanceof BulkWriteException && 11000 === $e->getCode()) {
continue;
}

$this->logger->error('failed reschedule locally queued job ['.$job['_id'].']', [
'exception' => $e,
'category' => get_class($this),
]);
}
$this->db->selectCollection($this->scheduler->getJobQueue())->updateOne(
['_id' => $job['_id'], '$isolated' => true],
['$setOnInsert' => $job],
['upsert' => true]
);
}

return $this;
Expand Down Expand Up @@ -342,8 +336,13 @@ protected function collectJob(array $job, int $status, $from_status = JobInterfa
'$set' => $set,
]);

$this->logger->debug('collect job ['.$job['_id'].'] with status ['.$from_status.']', [
'category' => get_class($this),
'pm' => $this->process,
]);

if (1 === $result->getModifiedCount()) {
$this->logger->debug('job ['.$job['_id'].'] updated to status ['.$status.']', [
$this->logger->debug('job ['.$job['_id'].'] collected; update status to ['.$status.']', [
'category' => get_class($this),
'pm' => $this->process,
]);
Expand Down Expand Up @@ -440,7 +439,7 @@ protected function processJob(array $job): ObjectId
return $job['_id'];
}

$this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
$this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].'] on worker ['.$this->id.']', [
'category' => get_class($this),
'pm' => $this->process,
'options' => $job['options'],
Expand All @@ -456,7 +455,7 @@ protected function processJob(array $job): ObjectId
} catch (\Exception $e) {
pcntl_alarm(0);

$this->logger->error('failed execute job ['.$job['_id'].'] of type ['.$job['class'].']', [
$this->logger->error('failed execute job ['.$job['_id'].'] of type ['.$job['class'].'] on worker ['.$this->id.']', [
'category' => get_class($this),
'pm' => $this->process,
'exception' => $e,
Expand Down
2 changes: 1 addition & 1 deletion tests/WorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public function testRescheduleLocalQueuedJobWithTheSameIdIfJobWasOverwrittenInQu
$this->worker->cleanup();
$new = $this->scheduler->getJob($job->getId());
$this->assertSame($new->getId(), $job->getId());
$this->assertNotSame($new->toArray()['created'], $job->toArray()['created']);
$this->assertSame($new->toArray()['created'], $job->toArray()['created']);
$this->assertSame(JobInterface::STATUS_WAITING, $job->getStatus());
$this->assertSame($new->getOptions()['at'], $job->getOptions()['at']);
}
Expand Down

0 comments on commit 9ca2374

Please sign in to comment.