From 49eeb4824b66a4736b198097110ae248043e3639 Mon Sep 17 00:00:00 2001 From: Christopher Pitt Date: Mon, 20 Jul 2015 12:17:03 +1200 Subject: [PATCH] Added additional core engine --- code/dataobjects/QueuedJobRule.php | 159 +++++++++++++++ code/jobs/DoormanProcessManager.php | 15 ++ code/jobs/DoormanQueuedJobTask.php | 152 +++++++++++++++ code/services/QueuedJobService.php | 2 +- code/tasks/ProcessJobQueueChildTask.php | 31 +++ code/tasks/ProcessJobQueueTask.php | 246 +++++++++++++++++++----- composer.json | 11 +- 7 files changed, 567 insertions(+), 49 deletions(-) create mode 100644 code/dataobjects/QueuedJobRule.php create mode 100644 code/jobs/DoormanProcessManager.php create mode 100644 code/jobs/DoormanQueuedJobTask.php create mode 100644 code/tasks/ProcessJobQueueChildTask.php diff --git a/code/dataobjects/QueuedJobRule.php b/code/dataobjects/QueuedJobRule.php new file mode 100644 index 00000000..a92de4e8 --- /dev/null +++ b/code/dataobjects/QueuedJobRule.php @@ -0,0 +1,159 @@ + 'Int', + 'Handler' => 'Varchar', + 'MinimumProcessorUsage' => 'Decimal', + 'MaximumProcessorUsage' => 'Decimal', + 'MinimumMemoryUsage' => 'Decimal', + 'MaximumMemoryUsage' => 'Decimal', + 'MinimumSiblingProcessorUsage' => 'Decimal', + 'MaximumSiblingProcessorUsage' => 'Decimal', + 'MinimumSiblingMemoryUsage' => 'Decimal', + 'MaximumSiblingMemoryUsage' => 'Decimal', + ); + + /** + * @inheritdoc + * + * @return int + */ + public function getProcesses() { + if($this->getField('Processes')) { + return $this->getField('Processes'); + } + + return 1; + } + + /** + * @inheritdoc + * + * @return null|string + */ + public function getHandler() { + if($this->getField('Handler')) { + return $this->getField('Handler'); + } + + return null; + } + + /** + * @return null|float + */ + public function getMinimumProcessorUsage() { + if($this->getField('MinimumProcessorUsage')) { + return $this->getField('MinimumProcessorUsage'); + } + + return null; + } + + /** + * @inheritdoc + * + * @return null|float + */ + public function getMaximumProcessorUsage() { + if($this->getField('MaximumProcessorUsage')) { + return $this->getField('MaximumProcessorUsage'); + } + + return null; + } + + /** + * @inheritdoc + * + * @return null|float + */ + public function getMinimumMemoryUsage() { + if($this->getField('MinimumMemoryUsage')) { + return $this->getField('MinimumMemoryUsage'); + } + + return null; + } + + /** + * @return null|float + */ + public function getMaximumMemoryUsage() { + if($this->getField('MaximumMemoryUsage')) { + return $this->getField('MaximumMemoryUsage'); + } + + return null; + } + + /** + * @inheritdoc + * + * @return null|float + */ + public function getMinimumSiblingProcessorUsage() { + if($this->getField('MinimumSiblingProcessorUsage')) { + return $this->getField('MinimumSiblingProcessorUsage'); + } + + return null; + } + + /** + * @inheritdoc + * + * @return null|float + */ + public function getMaximumSiblingProcessorUsage() { + if($this->getField('MaximumSiblingProcessorUsage')) { + return $this->getField('MaximumSiblingProcessorUsage'); + } + + return null; + } + + /** + * @inheritdoc + * + * @return null|float + */ + public function getMinimumSiblingMemoryUsage() { + if($this->getField('MinimumSiblingMemoryUsage')) { + return $this->getField('MinimumSiblingMemoryUsage'); + } + + return null; + } + + /** + * @inheritdoc + * + * @return null|float + */ + public function getMaximumSiblingMemoryUsage() { + if($this->getField('MaximumSiblingMemoryUsage')) { + return $this->getField('MaximumSiblingMemoryUsage'); + } + + return null; + } +} diff --git a/code/jobs/DoormanProcessManager.php b/code/jobs/DoormanProcessManager.php new file mode 100644 index 00000000..97446bdc --- /dev/null +++ b/code/jobs/DoormanProcessManager.php @@ -0,0 +1,15 @@ +id; + } + + /** + * @inheritdoc + * + * @param int $id + * + * @return $this + */ + public function setId($id) { + $this->id = $id; + + return $this; + } + + /** + * @return QueuedJobDescriptor + */ + public function getDescriptor() + { + return $this->descriptor; + } + + /** + * @param QueuedJobDescriptor $descriptor + */ + public function __construct(QueuedJobDescriptor $descriptor) { + $this->descriptor = $descriptor; + } + + /** + * @inheritdoc + * + * @return string + */ + public function serialize() { + return serialize(array( + 'descriptor' => $this->descriptor->ID, + )); + } + + /** + * @inheritdoc + */ + public function unserialize($serialized) { + $data = unserialize($serialized); + + if(!isset($data['descriptor'])) { + throw new InvalidArgumentException('Malformed data'); + } + + $descriptor = QueuedJobDescriptor::get() + ->filter('ID', $data['descriptor']) + ->first(); + + if(!$descriptor) { + throw new InvalidArgumentException('Descriptor not found'); + } + + $this->descriptor = $descriptor; + } + + /** + * @return string + */ + public function getHandler() { + return 'DoormanQueuedJobHandler'; + } + + /** + * @return array + */ + public function getData() { + return array( + 'descriptor' => $this->descriptor, + ); + } + + /** + * @return bool + */ + public function ignoresRules() { + if (method_exists($this->descriptor, 'ignoreRules')) { + return $this->descriptor->ignoreRules(); + } + + return false; + } + + /** + * @return bool + */ + public function stopsSiblings() { + if (method_exists($this->descriptor, 'stopsSiblings')) { + return $this->descriptor->stopsSiblings(); + } + + return false; + } + + /** + * @inheritdoc + * + * @return int + */ + public function getExpiresIn() { + if (method_exists($this->descriptor, 'getExpiresIn')) { + return $this->descriptor->getExpiresIn(); + } + + return -1; + } + + /** + * @inheritdoc + * + * @param int $startedAt + * + * @return bool + */ + public function shouldExpire($startedAt) { + if (method_exists($this->descriptor, 'shouldExpire')) { + return $this->descriptor->shouldExpire($startedAt); + } + + return true; + } +} diff --git a/code/services/QueuedJobService.php b/code/services/QueuedJobService.php index a6399a4f..58437974 100644 --- a/code/services/QueuedJobService.php +++ b/code/services/QueuedJobService.php @@ -397,7 +397,7 @@ protected function grabMutex(QueuedJobDescriptor $jobDescriptor) { return false; } - if(DB::getConn()->affectedRows() === 0) { + if(DB::getConn()->affectedRows() === 0 && $jobDescriptor->JobStatus !== QueuedJob::STATUS_INIT) { return false; } diff --git a/code/tasks/ProcessJobQueueChildTask.php b/code/tasks/ProcessJobQueueChildTask.php new file mode 100644 index 00000000..630d1639 --- /dev/null +++ b/code/tasks/ProcessJobQueueChildTask.php @@ -0,0 +1,31 @@ +getService()->runJob($task->getDescriptor()->ID); + } + } + + /** + * Returns an instance of the QueuedJobService. + * + * @return QueuedJobService + */ + protected function getService() { + return singleton('QueuedJobService'); + } +} diff --git a/code/tasks/ProcessJobQueueTask.php b/code/tasks/ProcessJobQueueTask.php index 591bc176..71eb9ac5 100644 --- a/code/tasks/ProcessJobQueueTask.php +++ b/code/tasks/ProcessJobQueueTask.php @@ -1,4 +1,5 @@ config()->engine === 'default') { + $this->runWithDefaultEngine($request); + } elseif($this->config()->engine === 'doorman') { + $this->runWithDoormanEngine($request); } else { - echo Convert::raw2xml($line) . "
"; + throw new InvalidArgumentException('ProcessJobQueueTask engine unrecognised'); } } - public function run($request) { - $service = singleton('QueuedJobService'); - /* @var $service QueuedJobService */ + /** + * @param SS_HttpRequest $request + */ + protected function runWithDefaultEngine($request) { + $service = $this->getService(); + + $queue = $this->getQueue($request); + + if($request->getVar('list')) { + for($i = 1; $i <= 3; $i++) { + $jobs = $service->getJobList($i); + $num = $jobs ? $jobs->Count() : 0; + $this->writeLogLine('Found ' . $num . ' jobs for mode ' . $i . '.'); + } + + return; + } + + $service->checkJobHealth(); + + $nextJob = null; - $datestamp = '['.date('Y-m-d H:i:s').']'; + // see if we've got an explicit job ID, otherwise we'll just check the queue directly + $job = $request->getVar('job'); + + if($job && strpos($job, '-')) { + $parts = explode('-', $job); + + $nextJob = DataObject::get_by_id('QueuedJobDescriptor', $parts[1]); + } else { + $nextJob = $service->getNextPendingJob($queue); + } + + $this->logDescriptorStatus($nextJob, $queue); + + if($nextJob instanceof QueuedJobDescriptor) { + $service->processJobQueue($queue); + } + } + + /** + * Returns an instance of the QueuedJobService. + * + * @return QueuedJobService + */ + protected function getService() { + return singleton('QueuedJobService'); + } + + /** + * Resolves the queue name to one of a few aliases. + * + * @todo Solve the "Queued"/"queued" mystery! + * + * @param SS_HTTPRequest $request + * + * @return string + */ + protected function getQueue($request) { $queue = $request->getVar('queue'); - if (!$queue) { + + if(!$queue) { $queue = 'Queued'; } - switch (strtolower($queue)) { + switch(strtolower($queue)) { case 'immediate': { $queue = QueuedJob::IMMEDIATE; break; @@ -51,46 +123,134 @@ public function run($request) { $queue = QueuedJob::LARGE; break; } - default: { - // leave it as whatever this queue name is - } } - $this->writeLogLine("$datestamp Processing queue $queue"); + return $queue; + } - if ($request->getVar('list')) { - for ($i = 1; $i <= 3; $i++) { - $jobs = $service->getJobList($i); - $num = $jobs ? $jobs->Count() : 0; - $this->writeLogLine("$datestamp Found $num jobs for mode $i"); - } - return; + /** + * Write in a format expected by the output medium (CLI/HTML). + * + * @param string $line Line to be written out, without the newline character. + * @param null|string $prefix + */ + private function writeLogLine($line, $prefix = null) { + if(!$prefix) { + $prefix = '[' . date('Y-m-d H:i:s') . '] '; } - // Cleanup or restart jobs before processing - $service->checkJobHealth(); - - /* @var $service QueuedJobService */ - $nextJob = null; - - // see if we've got an explicit job ID, otherwise we'll just check the queue directly - if ($request->getVar('job') && strpos($request->getVar('job'), '-')) { - list($junk, $jobId) = split('-', $request->getVar('job')); - $nextJob = DataObject::get_by_id('QueuedJobDescriptor', $jobId); + if(Director::is_cli()) { + echo $prefix . $line . "\n"; } else { - $nextJob = $service->getNextPendingJob($queue); + echo Convert::raw2xml($prefix . $line) . "
"; } + } - if ($nextJob) { - $this->writeLogLine("$datestamp Running $nextJob->JobTitle and others from $queue."); - $service->processJobQueue($queue); + /** + * Logs the status of the queued job descriptor. + * + * @param bool|null|QueuedJobDescriptor $descriptor + * @param string $queue + */ + protected function logDescriptorStatus($descriptor, $queue) { + if(is_null($descriptor)) { + $this->writeLogLine('No new jobs'); + } + + if($descriptor === false) { + $this->writeLogLine('Job is still running on ' . $queue); } - if (is_null($nextJob)) { - $this->writeLogLine("$datestamp No new jobs"); + if($descriptor instanceof QueuedJobDescriptor) { + $this->writeLogLine('Running ' . $descriptor->JobTitle . ' and others from ' . $queue . '.'); } - if ($nextJob === false) { - $this->writeLogLine("$datestamp Job is still running on $queue"); + } + + /** + * @param SS_HttpRequest $request + */ + protected function runWithDoormanEngine($request) { + // fix/prep any strange jobs! + + $service = $this->getService(); + $service->checkJobHealth(); + + // split jobs out into multiple tasks... + + $manager = new DoormanProcessManager(); + // $manager->setLogPath(__DIR__); + + if ($this->config()->processes > 0) { + $rule = new InMemoryRule(); + $rule->setProcesses($this->config()->processes); + + $manager->addRule($rule); } + + $descriptor = $this->getNextJobDescriptorWithoutMutex($request); + + while($manager->tick() || $descriptor) { + $this->logDescriptorStatus($descriptor, $this->getQueue($request)); + + if($descriptor instanceof QueuedJobDescriptor) { + $descriptor->JobStatus = QueuedJob::STATUS_INIT; + $descriptor->write(); + + $manager->addTask(new DoormanQueuedJobTask($descriptor)); + } + + $descriptor = $this->getNextJobDescriptorWithoutMutex($request); + + sleep(1); + }; + } + + /** + * @param SS_HTTPRequest $request + * + * @return null|QueuedJobDescriptor + */ + protected function getNextJobDescriptorWithoutMutex($request) { + $list = QueuedJobDescriptor::get() + ->filter('JobType', $this->getQueue($request)) + ->sort('ID', 'ASC'); + + $descriptor = $list + ->filter('JobStatus', QueuedJob::STATUS_WAIT) + ->first(); + + if($descriptor) { + return $descriptor; + } + + return $list + ->filter('JobStatus', QueuedJob::STATUS_NEW) + ->where(sprintf( + '"StartAfter" < \'%s\' OR "StartAfter" IS NULL', + SS_DateTime::now()->getValue() + )) + ->first(); + } + + /** + * Fetches the next queued job descriptor to be processed, or false for mutex lock + * or null for no outstanding jobs. + * + * @param SS_HTTPRequest $request + * @param QueuedJobService $service + * @param string $queue + * + * @return null|bool|DataObject + */ + protected function getNextJobDescriptor($request, $service, $queue) { + $job = $request->getVar('job'); + + if($job && strpos($job, '-')) { + $parts = explode('-', $job); + + return DataObject::get_by_id('QueuedJobDescriptor', $parts[1]); + } + + return $service->getNextPendingJob($queue); } } diff --git a/composer.json b/composer.json index a4beb39f..7c371109 100644 --- a/composer.json +++ b/composer.json @@ -5,16 +5,17 @@ "keywords": ["silverstripe", "jobs"], "license": "BSD-3-Clause", "authors": [ - { - "name": "Marcus Nyeholt", - "email": "marcus@silverstripe.com.au" - } + { + "name": "Marcus Nyeholt", + "email": "marcus@silverstripe.com.au" + } ], "require": { "silverstripe/framework": "~3.1", "silverstripe/cms": "~3.1", - "silverstripe/multivaluefield": "~2.0" + "silverstripe/multivaluefield": "~2.0", + "asyncphp/doorman": "~1.1" }, "extra": { "branch-alias": {