Skip to content

Commit

Permalink
3.0.0-beta2
Browse files Browse the repository at this point in the history
  • Loading branch information
raffis committed Sep 5, 2018
1 parent 1d6f673 commit b1b11b0
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 18 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## 3.0.0-beta2
**Maintainer**: Raffael Sahli <[email protected]>\
**Date**: Wed Sep 05 16:47:34 CEST 2018

* [FIX] Fixed naming of the collections
* [CHANGE] Using awaitData cursors now instead just tailable which will have a big impact on cpu usage
* [FIX] Cancelling a job which is out of the queue does now work as well


* [FEATURE] Possibility to timeout jobs
## 3.0.0-beta1
**Maintainer**: Raffael Sahli <[email protected]>\
**Date**: Thu Aug 30 11:47:34 CEST 2018
Expand Down
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -404,15 +404,19 @@ $scheduler->setOptions([
| Name | Default | Type | Description |
| --- | --- | --- | --- |
| `job_queue` | `taskscheduler.jobs` | string | The MongoDB collection which acts as job message queue. |
| `job_queue_size` | `100000` | int | The maximum size of jobs, if reached the first jobs get overwritten by new ones. |
| `job_queue_size` | `100000` | int | The maximum size in bytes of the job collection, if reached the first jobs get overwritten by new ones. |
| `event_queue` | `taskscheduler.events` | string | The MongoDB collection which acts as event message queue. |
| `event_queue_size` | `500000` | int | The maximum size of events, if reached the first events get overwritten by new ones. This value should usually be a multiplicated value of `job_queue_size` since a job can have more events. |
| `event_queue_size` | `500000` | int | The maximum size in bytes of the event collection, if reached the first events get overwritten by new ones. This value should usually be 5 times bigger than the value of `job_queue_size` since a job can have more events. |
| `default_at` | `null` | ?int | Define a default execution time for **all** jobs. This relates only for newly added jobs. The default is immediatly or better saying as soon as there is a free slot. |
| `default_interval` | `-1` | int | Define a default interval for **all** jobs. This relates only for newly added jobs. The default is `-1` which means no interval at all. |
| `default_retry` | `0` | int | Define a default retry interval for **all** jobs. This relates only for newly added jobs. There are now retries by default for failed jobs. |
| `default_retry_interval` | `300` | int | This options specifies the time (in secconds) between job retries. This relates only for newly added jobs. The default is `300` which is 5 minutes. |
| `default_timeout` | `0` | int | Specify a default timeout for all jobs. This relates only for newly added jobs. Per default there is no timeout at all. |


>**Note**: It is important to choose a queue size (job_queue_size and event_queue_size) which fits into your setup.

### Advanced queue node options

You may change process management related options for queue nodes.
Expand Down
4 changes: 2 additions & 2 deletions src/MessageQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public function next(IteratorIterator $cursor, callable $callback): void
{
try {
$cursor->next();
} catch (RuntimeException $e) {
} catch (RuntimeException | ServerException $e) {
$this->logger->error('message queue cursor for ['.$this->name.'] failed to retrieve next message', [
'category' => get_class($this),
'exception' => $e,
Expand All @@ -94,7 +94,7 @@ public function getCursor(array $query = []): IteratorIterator
{
$options = [
'typeMap' => Scheduler::TYPE_MAP,
'cursorType' => Find::TAILABLE,
'cursorType' => Find::TAILABLE_AWAIT,
'noCursorTimeout' => true,
];

Expand Down
45 changes: 36 additions & 9 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ class Queue
*/
protected $forks = [];

/**
* Worker/Job mapping.
*
* @var array
*/
protected $job_map = [];

/**
* Worker factory.
*
Expand Down Expand Up @@ -117,6 +124,8 @@ class Queue
*/
protected $process;

protected $started;

/**
* Init queue.
*/
Expand All @@ -131,6 +140,7 @@ public function __construct(Scheduler $scheduler, Database $db, WorkerFactoryInt

$this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
$this->events = new MessageQueue($db, $scheduler->getEventQueue(), $scheduler->getEventQueueSize(), $logger);
$this->started = new UTCDateTime();
}

/**
Expand Down Expand Up @@ -215,6 +225,10 @@ public function exitChild(int $sig, array $pid): self
foreach ($this->forks as $id => $pid) {
if ($pid === $pid['pi']) {
unset($this->forks[$id]);

if (isset($this->jobs[$id])) {
unset($this->jobs[$id]);
}
}
}

Expand Down Expand Up @@ -262,6 +276,7 @@ protected function spawnWorker()
}

$this->forks[(string) $id] = $pid;

if (!$pid) {
$this->factory->build($id)->start();
exit();
Expand Down Expand Up @@ -306,8 +321,11 @@ protected function main(): void
]);

$cursor_events = $this->events->getCursor([
'status' => JobInterface::STATUS_CANCELED,
'timestamp' => ['$gte' => new UTCDateTime()],
'$or' => [
['status' => JobInterface::STATUS_CANCELED],
['status' => JobInterface::STATUS_PROCESSING],
],
'timestamp' => ['$gte' => $this->started],
]);

$this->catchSignal();
Expand All @@ -332,7 +350,7 @@ protected function main(): void
break;
}
} else {
$this->handleCancel($event);
$this->handleEvent($event);
}

if (null === $cursor_jobs->current()) {
Expand Down Expand Up @@ -368,21 +386,30 @@ protected function main(): void
/**
* Handle cancel event.
*/
protected function handleCancel(array $event): self
protected function handleEvent(array $event): self
{
$process = $this->scheduler->getJob($event['job']);
if (JobInterface::STATUS_PROCESSING === $event['status']) {
$this->job_map[(string) $event['worker']] = $event['job'];

return $this;
}

$this->logger->debug('received cancel event for job ['.$event['job'].'] running on worker ['.$process->getWorker().']', [
$worker = array_search((string) $event['job'], $this->job_map, true);

if (false === $worker) {
return $this;
}

$this->logger->debug('received cancel event for job ['.$event['job'].'] running on worker ['.$worker.']', [
'category' => get_class($this),
]);

$worker = $process->getWorker();

if (isset($this->forks[(string) $worker])) {
$this->logger->debug('found running worker ['.$process->getWorker().'] on this queue node, terminate it now', [
$this->logger->debug('found running worker ['.$worker.'] on this queue node, terminate it now', [
'category' => get_class($this),
]);

unset($this->job_map[(string) $event['job']]);
posix_kill($this->forks[(string) $worker], SIGKILL);
}

Expand Down
4 changes: 2 additions & 2 deletions src/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ class Scheduler
*
* @var string
*/
protected $job_queue = 'takscheduler.jobs';
protected $job_queue = 'taskscheduler.jobs';

/**
* Event Collection name.
*
* @var string
*/
protected $event_queue = 'takscheduler.events';
protected $event_queue = 'taskscheduler.events';

/**
* Default at (Secconds from now).
Expand Down
5 changes: 5 additions & 0 deletions src/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public function timeout(): ?ObjectId

$this->db->{$this->scheduler->getEventQueue()}->insertOne([
'job' => $this->current_job['_id'],
'worker' => $this->id,
'status' => JobInterface::STATUS_TIMEOUT,
'timestamp' => new UTCDateTime(),
]);
Expand Down Expand Up @@ -258,6 +259,7 @@ protected function terminate(int $sig): ?ObjectId

$this->db->{$this->scheduler->getEventQueue()}->insertOne([
'job' => $this->current_job['_id'],
'worker' => $this->id,
'status' => JobInterface::STATUS_CANCELED,
'timestamp' => new UTCDateTime(),
]);
Expand Down Expand Up @@ -320,6 +322,7 @@ protected function collectJob(array $job, int $status, $from_status = JobInterfa

$this->db->{$this->scheduler->getEventQueue()}->insertOne([
'job' => $job['_id'],
'worker' => $this->id,
'status' => $status,
'timestamp' => new UTCDateTime(),
]);
Expand Down Expand Up @@ -429,6 +432,7 @@ protected function processJob(array $job): ObjectId

$this->db->{$this->scheduler->getEventQueue()}->insertOne([
'job' => $job['_id'],
'worker' => $this->id,
'status' => JobInterface::STATUS_FAILED,
'timestamp' => new UTCDateTime(),
'exception' => [
Expand Down Expand Up @@ -500,6 +504,7 @@ protected function executeJob(array $job): bool

$this->db->{$this->scheduler->getEventQueue()}->insertOne([
'job' => $job['_id'],
'worker' => $this->id,
'status' => JobInterface::STATUS_DONE,
'timestamp' => new UTCDateTime(),
]);
Expand Down
35 changes: 32 additions & 3 deletions tests/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,35 @@
namespace TaskScheduler\Testsuite;

use Helmich\MongoMock\MockDatabase;
use MongoDB\BSON\UTCDateTime;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use ReflectionClass;
use ReflectionMethod;
use ReflectionProperty;
use TaskScheduler\Exception\InvalidArgumentException;
use TaskScheduler\JobInterface;
use TaskScheduler\Queue;
use TaskScheduler\Scheduler;
use TaskScheduler\Testsuite\Mock\EndlessJobMock;
use TaskScheduler\Worker;
use TaskScheduler\WorkerFactoryInterface;

class QueueTest extends TestCase
{
protected $queue;
protected $scheduler;
protected $called = 0;
protected $mongodb;

public function setUp()
{
$mongodb = new MockDatabase();
$this->scheduler = new Scheduler($mongodb, $this->createMock(LoggerInterface::class));
$this->mongodb = new MockDatabase();
$this->scheduler = new Scheduler($this->mongodb, $this->createMock(LoggerInterface::class));

$called = &$this->called;
$this->queue = $this->getMockBuilder(Queue::class)
->setConstructorArgs([$this->scheduler, $mongodb, $this->createMock(WorkerFactoryInterface::class), $this->createMock(LoggerInterface::class)])
->setConstructorArgs([$this->scheduler, $this->mongodb, $this->createMock(WorkerFactoryInterface::class), $this->createMock(LoggerInterface::class)])
->setMethods(['loop'])
->getMock();
$this->queue->method('loop')
Expand Down Expand Up @@ -157,6 +162,30 @@ public function testDynamicForceStartWorkerIfIgnoreMaxChildren()
$this->assertSame(3, $this->queue->count());
}

/*public function testCancelRunningJob()
{
$mongodb = $this->mongodb;
$job = $this->scheduler->addJob(EndlessJobMock::class, 'foo');
$this->queue->process();
$method = self::getMethod('getForks');
$forks = $method->invokeArgs($this->queue, []);
reset($forks);
$worker = key($forks);
$mongodb->{'taskscheduler.events'}->insertOne([
'job' => $job->getId(),
'worker' => $worker,
'status' => JobInterface::STATUS_PROCESSING,
'timestamp' => new UTCDateTime(),
]);
$this->scheduler->cancelJob($job->getId());
$this->called = 0;
$this->queue->process();
}*/

public function testMinChildrenGreaterThanMaxChildren()
{
$this->expectException(InvalidArgumentException::class);
Expand Down
2 changes: 2 additions & 0 deletions tests/SchedulerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public function testNewJob()
$job = $this->scheduler->addJob('test', ['foo' => 'bar']);
$this->assertSame($job->getStatus(), JobInterface::STATUS_WAITING);
$this->assertSame($job->getClass(), 'test');
$this->assertSame(['foo' => 'bar'], $job->getData());
$this->assertInstanceOf(ObjectId::class, $job->getId());
$this->assertInstanceOf(ObjectId::class, $job->getWorker());
}

public function testNewJobTimestamps()
Expand Down

0 comments on commit b1b11b0

Please sign in to comment.