From ba94a35da61ab88926a1e763039513e76fd78d88 Mon Sep 17 00:00:00 2001 From: Damian Mooyman Date: Wed, 13 May 2015 16:26:07 +1200 Subject: [PATCH] API Better restarting of stalled jobs (when StepsProcessed == 0) BUG Better memory detection BUG Correct usage of configurable options in QueuedJobService BUG Corrected "ResumeCount" to "ResumeCounts" --- README.md | 22 +++ code/dataobjects/QueuedJobDescriptor.php | 62 +++++- code/services/QueuedJobService.php | 229 ++++++++++++++++------- code/tasks/ProcessJobQueueTask.php | 5 +- tests/QueuedJobsTest.php | 160 ++++++++++++---- 5 files changed, 366 insertions(+), 112 deletions(-) diff --git a/README.md b/README.md index 1988336a..e9b56c43 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,28 @@ _must_ detect whether they're present or not before using them. See [this issue] and [this wiki page](https://github.com/silverstripe-australia/silverstripe-queuedjobs/wiki/Defining-queued-jobs) for more information +Ensure that notifications are configured so that you can get updates or stalled or broken jobs. You can +set the notification email address in your config as below: + + + :::yaml + Email: + queued_job_admin_email: support@mycompany.com + +## Memory limit configuration + +By default this task will run until either 128mb or the limit specified by php_ini('memory_limit') is reached. + +You can adjust this with the below config change + + + :::yaml + # Force memory limit to 256 megabytes + QueuedJobsService: + # Accepts b, k, m, or b suffixes + memory_limit: 256m + + ## Indexes ALTER TABLE `QueuedJobDescriptor` ADD INDEX ( `JobStatus` , `JobType` ) diff --git a/code/dataobjects/QueuedJobDescriptor.php b/code/dataobjects/QueuedJobDescriptor.php index a057edd2..2ea2189b 100644 --- a/code/dataobjects/QueuedJobDescriptor.php +++ b/code/dataobjects/QueuedJobDescriptor.php @@ -8,6 +8,23 @@ * this will determine which queue it is placed within so that some shorter jobs can execute immediately without needing * to wait for a potentially long running job. * + * @property string $JobTitle Name of job + * @property string $Signature Unique identifier for this job instance + * @property string $Implementation Classname of underlying job + * @property string $StartAfter Don't start until this date, if set + * @property string $JobStarted When this job was started + * @property string $JobFinished When this job was finished + * @property int $TotalSteps Number of steps + * @property int $StepsProcessed Number of completed steps + * @property int $LastProcessedCount Number at which StepsProcessed was last checked for stalled jobs + * @property int $ResumeCounts Number of times this job has been resumed + * @property string $SavedJobData serialised data for the job to use as storage + * @property string $SavedJobMessages List of messages saved for this job + * @property string $JobStatus Status of this job + * @property string $JobType Type of job + * + * @method Member RunAs() Member to run this job as + * * @author Marcus Nyeholt * @license BSD http://silverstripe.org/bsd-license/ */ @@ -22,7 +39,7 @@ class QueuedJobDescriptor extends DataObject { 'JobFinished' => 'SS_Datetime', 'TotalSteps' => 'Int', 'StepsProcessed' => 'Int', - 'LastProcessedCount' => 'Int', + 'LastProcessedCount' => 'Int(-1)', // -1 means never checked, 0 means checked but no work is done 'ResumeCounts' => 'Int', 'SavedJobData' => 'Text', 'SavedJobMessages' => 'Text', @@ -37,6 +54,7 @@ class QueuedJobDescriptor extends DataObject { private static $defaults = array( 'JobStatus' => 'New', 'ResumeCounts' => 0, + 'LastProcessedCount' => -1 // -1 means never checked, 0 means checked and none were processed ); private static $indexes = array( @@ -74,21 +92,47 @@ public function summaryFields() { ); return $columns; } - - public function pause() { - if ($this->JobStatus == QueuedJob::STATUS_WAIT || $this->JobStatus == QueuedJob::STATUS_RUN || $this->JobStatus == QueuedJob::STATUS_INIT) { + + /** + * Pause this job, but only if it is waiting, running, or init + * + * @param bool $force Pause this job even if it's not waiting, running, or init + * @return bool Return true if this job was paused + */ + public function pause($force = false) { + if($force || in_array( + $this->JobStatus, + array(QueuedJob::STATUS_WAIT, QueuedJob::STATUS_RUN, QueuedJob::STATUS_INIT) + )) { $this->JobStatus = QueuedJob::STATUS_PAUSED; $this->write(); + return true; } + return false; } - public function resume() { - if ($this->JobStatus == QueuedJob::STATUS_PAUSED || $this->JobStatus == QueuedJob::STATUS_BROKEN) { + /** + * Resume this job and schedules it for execution + * + * @param bool $force Resume this job even if it's not paused or broken + * @return bool Return true if this job was resumed + */ + public function resume($force = false) { + if($force || in_array($this->JobStatus, array(QueuedJob::STATUS_PAUSED, QueuedJob::STATUS_BROKEN))) { $this->JobStatus = QueuedJob::STATUS_WAIT; $this->ResumeCounts++; $this->write(); singleton('QueuedJobService')->startJob($this); + return true; } + return false; + } + + /** + * Restarts this job via a forced resume + */ + public function restart() { + $this->resume(true); } /** @@ -97,13 +141,17 @@ public function resume() { */ public function activateOnQueue() { // if it's an immediate job, lets cache it to disk to be picked up later - if ($this->JobType == QueuedJob::IMMEDIATE && !Config::inst()->get('QueuedJobService', 'use_shutdown_function')) { + if ($this->JobType == QueuedJob::IMMEDIATE + && !Config::inst()->get('QueuedJobService', 'use_shutdown_function') + ) { touch($this->getJobDir() . '/' . 'queuedjob-' . $this->ID); } } /** * Gets the path to the queuedjob cache directory + * + * @return string */ protected function getJobDir() { // make sure our temp dir is in place. This is what will be inotify watched diff --git a/code/services/QueuedJobService.php b/code/services/QueuedJobService.php index e0f1c45b..e9be6b65 100644 --- a/code/services/QueuedJobService.php +++ b/code/services/QueuedJobService.php @@ -27,12 +27,14 @@ class QueuedJobService { private static $stall_threshold = 3; /** - * how many meg of ram will we allow before pausing and releasing the memory? + * How much ram will we allow before pausing and releasing the memory? * - * This is set to a somewhat low default as some people may not be able to run - * on systems with a lot of ram (128MB by default) + * For instance, set to 134217728 (128MB) to pause this process if used memory exceeds + * this value. This needs to be set to a value lower than the php_ini max_memory as + * the system will otherwise crash before shutdown can be handled gracefully. * * @var int + * @config */ private static $memory_limit = 134217728; @@ -64,7 +66,7 @@ class QueuedJobService { */ public function __construct() { // bind a shutdown function to process all 'immediate' queued jobs if needed, but only in CLI mode - if (self::$use_shutdown_function && Director::is_cli()) { + if (Config::inst()->get(__CLASS__, 'use_shutdown_function') && Director::is_cli()) { if (class_exists('PHPUnit_Framework_TestCase') && SapphireTest::is_running_test()) { // do NOTHING } else { @@ -189,45 +191,42 @@ protected function copyDescriptorToJob($jobDescriptor, $job) { * Check the current job queues and see if any of the jobs currently in there should be started. If so, * return the next job that should be executed * + * @param string $type Job type * @return QueuedJobDescriptor */ public function getNextPendingJob($type=null) { - $type = $type ? (string) $type : QueuedJob::QUEUED; + // Filter jobs by type + $type = $type ?: QueuedJob::QUEUED; + $list = QueuedJobDescriptor::get() + ->filter('JobType', $type) + ->sort('ID', 'ASC'); // see if there's any blocked jobs that need to be resumed - $existingJob = DataList::create('QueuedJobDescriptor')->filter(array('JobStatus' => QueuedJob::STATUS_WAIT, 'JobType' => $type))->first(); - if ($existingJob && $existingJob->exists()) { - return $existingJob; + $waitingJob = $list + ->filter('JobStatus', QueuedJob::STATUS_WAIT) + ->first(); + if ($waitingJob) { + return $waitingJob; } - $list = QueuedJobDescriptor::get()->filter(array( - 'JobStatus' => array(QueuedJob::STATUS_INIT, QueuedJob::STATUS_RUN), - 'JobType' => $type - )); - // lets see if we have a currently running job - - $existingJob = $list->first(); - - // if there's an existing job either running or pending, the lets just return false to indicate + // If there's an existing job either running or pending, the lets just return false to indicate // that we're still executing - if ($existingJob && $existingJob->exists()) { + $runningJob = $list + ->filter('JobStatus', array(QueuedJob::STATUS_INIT, QueuedJob::STATUS_RUN)) + ->first(); + if ($runningJob) { return false; } - // otherwise, lets find any 'new' jobs that are waiting to execute - $filter = array( - 'JobStatus =' => 'New', - 'JobType =' => $type ? (string) $type : QueuedJob::QUEUED, - ); - - $where = '"StartAfter" < \'' . date('Y-m-d H:i:s').'\' OR "StartAfter" IS NULL'; - $list = QueuedJobDescriptor::get()->where($where); - $list = $list->filter(array('JobStatus' => 'New', 'JobType' => $type)); - $list = $list->sort('ID', 'ASC'); - - if ($list && $list->Count()) { - return $list->First(); - } + // Otherwise, lets find any 'new' jobs that are waiting to execute + $newJob = $list + ->filter('JobStatus', QueuedJob::STATUS_NEW) + ->where(sprintf( + '"StartAfter" < \'%s\' OR "StartAfter" IS NULL', + SS_DateTime::now()->getValue() + )) + ->first(); + return $newJob; } /** @@ -239,41 +238,30 @@ public function getNextPendingJob($type=null) { * fix them */ public function checkJobHealth() { - // first off, we want to find jobs that haven't changed since they were last checked (assuming they've actually - // processed a few steps...) - $stalledJobs = QueuedJobDescriptor::get()->filter(array( - 'JobStatus' => array(QueuedJob::STATUS_RUN, QueuedJob::STATUS_INIT), - 'StepsProcessed:GreaterThan' => 0, - )); - - $stalledJobs = $stalledJobs->where('"StepsProcessed"="LastProcessedCount"'); - - if ($stalledJobs) { - foreach ($stalledJobs as $stalledJob) { - if ($stalledJob->ResumeCount <= self::$stall_threshold) { - $stalledJob->ResumeCount++; - $stalledJob->pause(); - $stalledJob->resume(); - $msg = sprintf(_t('QueuedJobs.STALLED_JOB_MSG', 'A job named %s appears to have stalled. It will be stopped and restarted, please login to make sure it has continued'), $stalledJob->JobTitle); - } else { - $stalledJob->pause(); - $msg = sprintf(_t('QueuedJobs.STALLED_JOB_MSG', 'A job named %s appears to have stalled. It has been paused, please login to check it'), $stalledJob->JobTitle); - } - - singleton('QJUtils')->log($msg); - $mail = new Email(Config::inst()->get('Email', 'admin_email'), Config::inst()->get('Email', 'queued_job_admin_email'), _t('QueuedJobs.STALLED_JOB', 'Stalled job'), $msg); - $mail->send(); - } + // Select all jobs currently marked as running + $runningJobs = QueuedJobDescriptor::get() + ->filter( + 'JobStatus', + array( + QueuedJob::STATUS_RUN, + QueuedJob::STATUS_INIT + ) + ); + + // If no steps have been processed since the last run, consider it a broken job + // Only check jobs that have been viewed before. LastProcessedCount defaults to -1 on new jobs. + $stalledJobs = $runningJobs + ->filter('LastProcessedCount:GreaterThanOrEqual', 0) + ->where('"StepsProcessed" = "LastProcessedCount"'); + foreach ($stalledJobs as $stalledJob) { + $this->restartStalledJob($stalledJob); } // now, find those that need to be marked before the next check - $runningJobs = QueuedJobDescriptor::get()->filter('JobStatus', array(QueuedJob::STATUS_RUN, QueuedJob::STATUS_INIT)); - if ($runningJobs) { - // foreach job, mark it as having been incremented - foreach ($runningJobs as $job) { - $job->LastProcessedCount = $job->StepsProcessed; - $job->write(); - } + // foreach job, mark it as having been incremented + foreach ($runningJobs as $job) { + $job->LastProcessedCount = $job->StepsProcessed; + $job->write(); } // finally, find the list of broken jobs and send an email if there's some found @@ -289,6 +277,41 @@ public function checkJobHealth() { } } + /** + * Attempt to restart a stalled job + * + * @param QueuedJobDescriptor $stalledJob + * @return bool True if the job was successfully restarted + */ + protected function restartStalledJob($stalledJob) { + if ($stalledJob->ResumeCounts < Config::inst()->get(__CLASS__, 'stall_threshold')) { + $stalledJob->restart(); + $message = sprintf( + _t( + 'QueuedJobs.STALLED_JOB_MSG', + 'A job named %s appears to have stalled. It will be stopped and restarted, please login to make sure it has continued' + ), + $stalledJob->JobTitle + ); + } else { + $stalledJob->pause(); + $message = sprintf( + _t( + 'QueuedJobs.STALLED_JOB_MSG', + 'A job named %s appears to have stalled. It has been paused, please login to check it' + ), + $stalledJob->JobTitle + ); + } + + singleton('QJUtils')->log($message); + $from = Config::inst()->get('Email', 'admin_email'); + $to = Config::inst()->get('Email', 'queued_job_admin_email'); + $subject = _t('QueuedJobs.STALLED_JOB', 'Stalled job'); + $mail = new Email($from, $to, $subject, $message); + $mail->send(); + } + /** * Prepares the given jobDescriptor for execution. Returns the job that * will actually be run in a state ready for executing. @@ -490,7 +513,7 @@ public function runJob($jobId) { $stallCount++; } - if ($stallCount > self::$stall_threshold) { + if ($stallCount > Config::inst()->get(__CLASS__, 'stall_threshold')) { $broken = true; $job->addMessage(sprintf(_t('QueuedJobs.JOB_STALLED', "Job stalled after %s attempts - please check"), $stallCount), 'ERROR'); $jobDescriptor->JobStatus = QueuedJob::STATUS_BROKEN; @@ -499,7 +522,10 @@ public function runJob($jobId) { // now we'll be good and check our memory usage. If it is too high, we'll set the job to // a 'Waiting' state, and let the next processing run pick up the job. if ($this->isMemoryTooHigh()) { - $job->addMessage(sprintf(_t('QueuedJobs.MEMORY_RELEASE', 'Job releasing memory and waiting (%s used)'), $this->humanReadable(memory_get_usage()))); + $job->addMessage(sprintf( + _t('QueuedJobs.MEMORY_RELEASE', 'Job releasing memory and waiting (%s used)'), + $this->humanReadable($this->getMemoryUsage()) + )); $jobDescriptor->JobStatus = QueuedJob::STATUS_WAIT; $broken = true; } @@ -554,12 +580,75 @@ public function runJob($jobId) { } /** - * Is memory usage too high? + * Is memory usage too high? + * + * @return bool */ protected function isMemoryTooHigh() { - if (function_exists('memory_get_usage')) { - $memory = memory_get_usage(); - return memory_get_usage() > self::$memory_limit; + $used = $this->getMemoryUsage(); + $limit = $this->getMemoryLimit(); + return $limit && ($used > $limit); + } + + /** + * Get peak memory usage of this application + * + * @return float + */ + protected function getMemoryUsage() { + // Note we use real_usage = false http://stackoverflow.com/questions/15745385/memory-get-peak-usage-with-real-usage + // Also we use the safer peak memory usage + return (float)memory_get_peak_usage(false); + } + + /** + * Determines the memory limit (in bytes) for this application + * Limits to the smaller of memory_limit configured via php.ini or silverstripe config + * + * @return float Memory limit in bytes + */ + protected function getMemoryLimit() { + // Limit to smaller of explicit limit or php memory limit + $limit = $this->parseMemory(Config::inst()->get(__CLASS__, 'memory_limit')); + if($limit) { + return $limit; + } + + // Fallback to php memory limit + $phpLimit = $this->getPHPMemoryLimit(); + if($phpLimit) { + return $phpLimit; + } + } + + /** + * Calculate the current memory limit of the server + * + * @return float + */ + protected function getPHPMemoryLimit() { + return $this->parseMemory(trim(ini_get("memory_limit"))); + } + + /** + * Convert memory limit string to bytes. + * Based on implementation in install.php5 + * + * @param string $memString + * @return float + */ + protected function parseMemory($memString) { + switch(strtolower(substr($memString, -1))) { + case "b": + return round(substr($memString, 0, -1)); + case "k": + return round(substr($memString, 0, -1) * 1024); + case "m": + return round(substr($memString, 0, -1) * 1024 * 1024); + case "g": + return round(substr($memString, 0, -1) * 1024 * 1024 * 1024); + default: + return round($memString); } } diff --git a/code/tasks/ProcessJobQueueTask.php b/code/tasks/ProcessJobQueueTask.php index 03eb9a56..591bc176 100644 --- a/code/tasks/ProcessJobQueueTask.php +++ b/code/tasks/ProcessJobQueueTask.php @@ -66,6 +66,9 @@ public function run($request) { } return; } + + // Cleanup or restart jobs before processing + $service->checkJobHealth(); /* @var $service QueuedJobService */ $nextJob = null; @@ -78,8 +81,6 @@ public function run($request) { $nextJob = $service->getNextPendingJob($queue); } - $service->checkJobHealth(); - if ($nextJob) { $this->writeLogLine("$datestamp Running $nextJob->JobTitle and others from $queue."); $service->processJobQueue($queue); diff --git a/tests/QueuedJobsTest.php b/tests/QueuedJobsTest.php index cbdbdccf..31ffe01f 100644 --- a/tests/QueuedJobsTest.php +++ b/tests/QueuedJobsTest.php @@ -5,10 +5,31 @@ * * @author Marcus Nyeholt */ -class QueuedJobsTest extends SapphireTest -{ +class QueuedJobsTest extends SapphireTest { + + public function setUp() { + parent::setUp(); + + Config::nest(); + // Two restarts are allowed per job + Config::inst()->update('QueuedJobService', 'stall_threshold', 2); + } + + public function tearDown() { + Config::unnest(); + parent::tearDown(); + } + + + /** + * @return QueuedJobService + */ + protected function getService() { + return singleton("TestQJService"); + } + public function testQueueJob() { - $svc = singleton("QueuedJobService"); + $svc = $this->getService(); // lets create a new job and add it tio the queue $job = new TestQueuedJob(); @@ -32,7 +53,7 @@ public function testQueueJob() { } public function testJobRunAs() { - $svc = singleton("QueuedJobService"); + $svc = $this->getService(); $list = $svc->getJobList(); foreach ($list as $job) { $job->delete(); @@ -52,7 +73,7 @@ public function testJobRunAs() { } public function testQueueSignature() { - $svc = singleton("QueuedJobService"); + $svc = $this->getService(); // lets create a new job and add it tio the queue $job = new TestQueuedJob(); @@ -111,7 +132,7 @@ public function testResumeJob() { public function testInitialiseJob() { // okay, lets test it out on the actual service - $svc = singleton("TestQJService"); + $svc = $this->getService(); // lets create a new job and add it to the queue $job = new TestQueuedJob(); $id = $svc->queueJob($job); @@ -128,7 +149,7 @@ public function testInitialiseJob() { public function testStartJob() { // okay, lets test it out on the actual service - $svc = singleton("QueuedJobService"); + $svc = $this->getService(); // lets create a new job and add it to the queue $this->logInWithPermission('DUMMYUSER'); @@ -149,7 +170,7 @@ public function testStartJob() { public function testImmediateQueuedJob() { // okay, lets test it out on the actual service - $svc = singleton("QueuedJobService"); + $svc = $this->getService(); // lets create a new job and add it to the queue $job = new TestQueuedJob(QueuedJob::IMMEDIATE); @@ -172,7 +193,7 @@ public function testImmediateQueuedJob() { } public function testNextJob() { - $svc = singleton("TestQJService"); + $svc = $this->getService(); $list = $svc->getJobList(); foreach ($list as $job) { @@ -211,49 +232,122 @@ public function testNextJob() { $this->assertFalse($next); } - - public function testJobHealthCheck() { - $svc = singleton("QueuedJobService"); - // lets create a new job and add it to the queue + /** + * Verify that broken jobs are correctly verified for health and restarted as necessary + * + * Order of checkJobHealth() and getNextPendingJob() is important + * + * Execution of this job is broken into several "loops", each of which represents one invocation + * of ProcessJobQueueTask + */ + public function testJobHealthCheck() { + // Create a job and add it to the queue + $svc = $this->getService(); $job = new TestQueuedJob(QueuedJob::IMMEDIATE); $job->firstJob = true; $id = $svc->queueJob($job); - $descriptor = QueuedJobDescriptor::get()->byID($id); - - $descriptor->JobStatus = 'Running'; - $descriptor->StepsProcessed = 1; -// $descriptor->LastProcessedCount = 1; + // Verify initial state is new and LastProcessedCount is not marked yet + $this->assertEquals(QueuedJob::STATUS_NEW, $descriptor->JobStatus); + $this->assertEquals(0, $descriptor->StepsProcessed); + $this->assertEquals(-1, $descriptor->LastProcessedCount); + $this->assertEquals(0, $descriptor->ResumeCounts); + + // Loop 1 - Pick up new job and attempt to run it + // Job health should not attempt to cleanup unstarted jobs + $svc->checkJobHealth(); + $nextJob = $svc->getNextPendingJob(QueuedJob::IMMEDIATE); + + // Ensure that this is the next job ready to go + $descriptor = QueuedJobDescriptor::get()->byID($id); + $this->assertEquals($nextJob->ID, $descriptor->ID); + $this->assertEquals(QueuedJob::STATUS_NEW, $descriptor->JobStatus); + $this->assertEquals(0, $descriptor->StepsProcessed); + $this->assertEquals(-1, $descriptor->LastProcessedCount); + $this->assertEquals(0, $descriptor->ResumeCounts); + + // Run 1 - Start the job (no work is done) + $descriptor->JobStatus = QueuedJob::STATUS_INIT; $descriptor->write(); - + + // Assume that something bad happens at this point, the process dies during execution, and + // the task is re-initiated somewhere down the track + + // Loop 2 - Detect broken job, and mark it for future checking. $svc->checkJobHealth(); - + $nextJob = $svc->getNextPendingJob(QueuedJob::IMMEDIATE); + + // Note that we don't immediately try to restart it until StepsProcessed = LastProcessedCount $descriptor = QueuedJobDescriptor::get()->byID($id); - $this->assertEquals(1, $descriptor->LastProcessedCount); - + $this->assertFalse($nextJob); // Don't run it this round please! + $this->assertEquals(QueuedJob::STATUS_INIT, $descriptor->JobStatus); + $this->assertEquals(0, $descriptor->StepsProcessed); + $this->assertEquals(0, $descriptor->LastProcessedCount); + $this->assertEquals(0, $descriptor->ResumeCounts); + + // Loop 3 - We've previously marked this job as broken, so restart it this round + // If no more work has been done on the job at this point, assume that we are able to + // restart it $svc->checkJobHealth(); - + $nextJob = $svc->getNextPendingJob(QueuedJob::IMMEDIATE); + + // This job is resumed and exeuction is attempted this round $descriptor = QueuedJobDescriptor::get()->byID($id); + $this->assertEquals($nextJob->ID, $descriptor->ID); $this->assertEquals(QueuedJob::STATUS_WAIT, $descriptor->JobStatus); - - // the same for init broken jobs - $descriptor->JobTitle = 'Test job broken in init'; - $descriptor->JobStatus = QueuedJob::STATUS_INIT; - $descriptor->LastProcessedCount = 0; + $this->assertEquals(0, $descriptor->StepsProcessed); + $this->assertEquals(0, $descriptor->LastProcessedCount); + $this->assertEquals(1, $descriptor->ResumeCounts); + + // Run 2 - First restart (work is done) + $descriptor->JobStatus = QueuedJob::STATUS_RUN; + $descriptor->StepsProcessed++; // Essentially delays the next restart by 1 loop $descriptor->write(); - + + // Once again, at this point, assume the job fails and crashes + + // Loop 4 - Assuming a job has LastProcessedCount < StepsProcessed we are in the same + // situation as step 2. + // Because the last time the loop ran, StepsProcessed was incremented, + // this indicates that it's likely that another task could be working on this job, so + // don't run this. $svc->checkJobHealth(); - + $nextJob = $svc->getNextPendingJob(QueuedJob::IMMEDIATE); + $descriptor = QueuedJobDescriptor::get()->byID($id); + $this->assertFalse($nextJob); // Don't run jobs we aren't sure should be restarted + $this->assertEquals(QueuedJob::STATUS_RUN, $descriptor->JobStatus); + $this->assertEquals(1, $descriptor->StepsProcessed); $this->assertEquals(1, $descriptor->LastProcessedCount); - + $this->assertEquals(1, $descriptor->ResumeCounts); + + // Loop 5 - Job is again found to not have been restarted since last iteration, so perform second + // restart. The job should be attempted to run this loop $svc->checkJobHealth(); - + $nextJob = $svc->getNextPendingJob(QueuedJob::IMMEDIATE); + + // This job is resumed and exeuction is attempted this round $descriptor = QueuedJobDescriptor::get()->byID($id); + $this->assertEquals($nextJob->ID, $descriptor->ID); $this->assertEquals(QueuedJob::STATUS_WAIT, $descriptor->JobStatus); - + $this->assertEquals(1, $descriptor->StepsProcessed); + $this->assertEquals(1, $descriptor->LastProcessedCount); + $this->assertEquals(2, $descriptor->ResumeCounts); + + // Run 3 - Second and last restart (no work is done) + $descriptor->JobStatus = QueuedJob::STATUS_RUN; + $descriptor->write(); + + // Loop 6 - As no progress has been made since loop 3, we can mark this as dead + $svc->checkJobHealth(); + $nextJob = $svc->getNextPendingJob(QueuedJob::IMMEDIATE); + + // Since no StepsProcessed has been done, don't wait another loop to mark this as dead + $descriptor = QueuedJobDescriptor::get()->byID($id); + $this->assertEquals(QueuedJob::STATUS_PAUSED, $descriptor->JobStatus); + $this->assertEmpty($nextJob); } }