diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b850a6..fb3d384 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +## 3.0.0-beta2 +**Maintainer**: Raffael Sahli \ +**Date**: Wed Sep 05 16:47:34 CEST 2018 + +* [FIX] Fixed naming of the collections +* [CHANGE] Using awaitData cursors now instead just tailable which will have a big impact on cpu usage +* [FIX] Cancelling a job which is out of the queue does now work as well + + +* [FEATURE] Possibility to timeout jobs ## 3.0.0-beta1 **Maintainer**: Raffael Sahli \ **Date**: Thu Aug 30 11:47:34 CEST 2018 diff --git a/README.md b/README.md index c35d93c..ac25ec2 100644 --- a/README.md +++ b/README.md @@ -404,15 +404,19 @@ $scheduler->setOptions([ | Name | Default | Type | Description | | --- | --- | --- | --- | | `job_queue` | `taskscheduler.jobs` | string | The MongoDB collection which acts as job message queue. | -| `job_queue_size` | `100000` | int | The maximum size of jobs, if reached the first jobs get overwritten by new ones. | +| `job_queue_size` | `100000` | int | The maximum size in bytes of the job collection, if reached the first jobs get overwritten by new ones. | | `event_queue` | `taskscheduler.events` | string | The MongoDB collection which acts as event message queue. | -| `event_queue_size` | `500000` | int | The maximum size of events, if reached the first events get overwritten by new ones. This value should usually be a multiplicated value of `job_queue_size` since a job can have more events. | +| `event_queue_size` | `500000` | int | The maximum size in bytes of the event collection, if reached the first events get overwritten by new ones. This value should usually be 5 times bigger than the value of `job_queue_size` since a job can have more events. | | `default_at` | `null` | ?int | Define a default execution time for **all** jobs. This relates only for newly added jobs. The default is immediatly or better saying as soon as there is a free slot. | | `default_interval` | `-1` | int | Define a default interval for **all** jobs. This relates only for newly added jobs. The default is `-1` which means no interval at all. | | `default_retry` | `0` | int | Define a default retry interval for **all** jobs. This relates only for newly added jobs. There are now retries by default for failed jobs. | | `default_retry_interval` | `300` | int | This options specifies the time (in secconds) between job retries. This relates only for newly added jobs. The default is `300` which is 5 minutes. | | `default_timeout` | `0` | int | Specify a default timeout for all jobs. This relates only for newly added jobs. Per default there is no timeout at all. | + +>**Note**: It is important to choose a queue size (job_queue_size and event_queue_size) which fits into your setup. + + ### Advanced queue node options You may change process management related options for queue nodes. diff --git a/src/MessageQueue.php b/src/MessageQueue.php index 84c8017..4fcca3b 100755 --- a/src/MessageQueue.php +++ b/src/MessageQueue.php @@ -77,7 +77,7 @@ public function next(IteratorIterator $cursor, callable $callback): void { try { $cursor->next(); - } catch (RuntimeException $e) { + } catch (RuntimeException | ServerException $e) { $this->logger->error('message queue cursor for ['.$this->name.'] failed to retrieve next message', [ 'category' => get_class($this), 'exception' => $e, @@ -94,7 +94,7 @@ public function getCursor(array $query = []): IteratorIterator { $options = [ 'typeMap' => Scheduler::TYPE_MAP, - 'cursorType' => Find::TAILABLE, + 'cursorType' => Find::TAILABLE_AWAIT, 'noCursorTimeout' => true, ]; diff --git a/src/Queue.php b/src/Queue.php index 60cfe93..81b7201 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -89,6 +89,13 @@ class Queue */ protected $forks = []; + /** + * Worker/Job mapping. + * + * @var array + */ + protected $job_map = []; + /** * Worker factory. * @@ -117,6 +124,8 @@ class Queue */ protected $process; + protected $started; + /** * Init queue. */ @@ -131,6 +140,7 @@ public function __construct(Scheduler $scheduler, Database $db, WorkerFactoryInt $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger); $this->events = new MessageQueue($db, $scheduler->getEventQueue(), $scheduler->getEventQueueSize(), $logger); + $this->started = new UTCDateTime(); } /** @@ -215,6 +225,10 @@ public function exitChild(int $sig, array $pid): self foreach ($this->forks as $id => $pid) { if ($pid === $pid['pi']) { unset($this->forks[$id]); + + if (isset($this->jobs[$id])) { + unset($this->jobs[$id]); + } } } @@ -262,6 +276,7 @@ protected function spawnWorker() } $this->forks[(string) $id] = $pid; + if (!$pid) { $this->factory->build($id)->start(); exit(); @@ -306,8 +321,11 @@ protected function main(): void ]); $cursor_events = $this->events->getCursor([ - 'status' => JobInterface::STATUS_CANCELED, - 'timestamp' => ['$gte' => new UTCDateTime()], + '$or' => [ + ['status' => JobInterface::STATUS_CANCELED], + ['status' => JobInterface::STATUS_PROCESSING], + ], + 'timestamp' => ['$gte' => $this->started], ]); $this->catchSignal(); @@ -332,7 +350,7 @@ protected function main(): void break; } } else { - $this->handleCancel($event); + $this->handleEvent($event); } if (null === $cursor_jobs->current()) { @@ -368,21 +386,30 @@ protected function main(): void /** * Handle cancel event. */ - protected function handleCancel(array $event): self + protected function handleEvent(array $event): self { - $process = $this->scheduler->getJob($event['job']); + if (JobInterface::STATUS_PROCESSING === $event['status']) { + $this->job_map[(string) $event['worker']] = $event['job']; + + return $this; + } - $this->logger->debug('received cancel event for job ['.$event['job'].'] running on worker ['.$process->getWorker().']', [ + $worker = array_search((string) $event['job'], $this->job_map, true); + + if (false === $worker) { + return $this; + } + + $this->logger->debug('received cancel event for job ['.$event['job'].'] running on worker ['.$worker.']', [ 'category' => get_class($this), ]); - $worker = $process->getWorker(); - if (isset($this->forks[(string) $worker])) { - $this->logger->debug('found running worker ['.$process->getWorker().'] on this queue node, terminate it now', [ + $this->logger->debug('found running worker ['.$worker.'] on this queue node, terminate it now', [ 'category' => get_class($this), ]); + unset($this->job_map[(string) $event['job']]); posix_kill($this->forks[(string) $worker], SIGKILL); } diff --git a/src/Scheduler.php b/src/Scheduler.php index b85a2c0..ae5dda5 100755 --- a/src/Scheduler.php +++ b/src/Scheduler.php @@ -71,14 +71,14 @@ class Scheduler * * @var string */ - protected $job_queue = 'takscheduler.jobs'; + protected $job_queue = 'taskscheduler.jobs'; /** * Event Collection name. * * @var string */ - protected $event_queue = 'takscheduler.events'; + protected $event_queue = 'taskscheduler.events'; /** * Default at (Secconds from now). diff --git a/src/Worker.php b/src/Worker.php index e537699..f9c9394 100644 --- a/src/Worker.php +++ b/src/Worker.php @@ -130,6 +130,7 @@ public function timeout(): ?ObjectId $this->db->{$this->scheduler->getEventQueue()}->insertOne([ 'job' => $this->current_job['_id'], + 'worker' => $this->id, 'status' => JobInterface::STATUS_TIMEOUT, 'timestamp' => new UTCDateTime(), ]); @@ -258,6 +259,7 @@ protected function terminate(int $sig): ?ObjectId $this->db->{$this->scheduler->getEventQueue()}->insertOne([ 'job' => $this->current_job['_id'], + 'worker' => $this->id, 'status' => JobInterface::STATUS_CANCELED, 'timestamp' => new UTCDateTime(), ]); @@ -320,6 +322,7 @@ protected function collectJob(array $job, int $status, $from_status = JobInterfa $this->db->{$this->scheduler->getEventQueue()}->insertOne([ 'job' => $job['_id'], + 'worker' => $this->id, 'status' => $status, 'timestamp' => new UTCDateTime(), ]); @@ -429,6 +432,7 @@ protected function processJob(array $job): ObjectId $this->db->{$this->scheduler->getEventQueue()}->insertOne([ 'job' => $job['_id'], + 'worker' => $this->id, 'status' => JobInterface::STATUS_FAILED, 'timestamp' => new UTCDateTime(), 'exception' => [ @@ -500,6 +504,7 @@ protected function executeJob(array $job): bool $this->db->{$this->scheduler->getEventQueue()}->insertOne([ 'job' => $job['_id'], + 'worker' => $this->id, 'status' => JobInterface::STATUS_DONE, 'timestamp' => new UTCDateTime(), ]); diff --git a/tests/QueueTest.php b/tests/QueueTest.php index 7c11628..659691e 100644 --- a/tests/QueueTest.php +++ b/tests/QueueTest.php @@ -13,14 +13,18 @@ namespace TaskScheduler\Testsuite; use Helmich\MongoMock\MockDatabase; +use MongoDB\BSON\UTCDateTime; use PHPUnit\Framework\TestCase; use Psr\Log\LoggerInterface; use ReflectionClass; use ReflectionMethod; use ReflectionProperty; use TaskScheduler\Exception\InvalidArgumentException; +use TaskScheduler\JobInterface; use TaskScheduler\Queue; use TaskScheduler\Scheduler; +use TaskScheduler\Testsuite\Mock\EndlessJobMock; +use TaskScheduler\Worker; use TaskScheduler\WorkerFactoryInterface; class QueueTest extends TestCase @@ -28,15 +32,16 @@ class QueueTest extends TestCase protected $queue; protected $scheduler; protected $called = 0; + protected $mongodb; public function setUp() { - $mongodb = new MockDatabase(); - $this->scheduler = new Scheduler($mongodb, $this->createMock(LoggerInterface::class)); + $this->mongodb = new MockDatabase(); + $this->scheduler = new Scheduler($this->mongodb, $this->createMock(LoggerInterface::class)); $called = &$this->called; $this->queue = $this->getMockBuilder(Queue::class) - ->setConstructorArgs([$this->scheduler, $mongodb, $this->createMock(WorkerFactoryInterface::class), $this->createMock(LoggerInterface::class)]) + ->setConstructorArgs([$this->scheduler, $this->mongodb, $this->createMock(WorkerFactoryInterface::class), $this->createMock(LoggerInterface::class)]) ->setMethods(['loop']) ->getMock(); $this->queue->method('loop') @@ -157,6 +162,30 @@ public function testDynamicForceStartWorkerIfIgnoreMaxChildren() $this->assertSame(3, $this->queue->count()); } + /*public function testCancelRunningJob() + { + $mongodb = $this->mongodb; + $job = $this->scheduler->addJob(EndlessJobMock::class, 'foo'); + + $this->queue->process(); + + $method = self::getMethod('getForks'); + $forks = $method->invokeArgs($this->queue, []); + reset($forks); + $worker = key($forks); + + $mongodb->{'taskscheduler.events'}->insertOne([ + 'job' => $job->getId(), + 'worker' => $worker, + 'status' => JobInterface::STATUS_PROCESSING, + 'timestamp' => new UTCDateTime(), + ]); + + $this->scheduler->cancelJob($job->getId()); + $this->called = 0; + $this->queue->process(); + }*/ + public function testMinChildrenGreaterThanMaxChildren() { $this->expectException(InvalidArgumentException::class); diff --git a/tests/SchedulerTest.php b/tests/SchedulerTest.php index 048df0f..b8e45aa 100644 --- a/tests/SchedulerTest.php +++ b/tests/SchedulerTest.php @@ -43,7 +43,9 @@ public function testNewJob() $job = $this->scheduler->addJob('test', ['foo' => 'bar']); $this->assertSame($job->getStatus(), JobInterface::STATUS_WAITING); $this->assertSame($job->getClass(), 'test'); + $this->assertSame(['foo' => 'bar'], $job->getData()); $this->assertInstanceOf(ObjectId::class, $job->getId()); + $this->assertInstanceOf(ObjectId::class, $job->getWorker()); } public function testNewJobTimestamps()