-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add new retry and backoff timer feature
- Loading branch information
1 parent
987de78
commit 97cfe2c
Showing
5 changed files
with
389 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
<?php | ||
|
||
namespace SilverStripe\Forager\Extensions; | ||
|
||
use SilverStripe\Core\Config\Configurable; | ||
use SilverStripe\Core\Extension; | ||
use SilverStripe\Forager\Jobs\BatchJob; | ||
use SilverStripe\ORM\FieldType\DBDatetime; | ||
use Symbiote\QueuedJobs\DataObjects\QueuedJobDescriptor; | ||
use Symbiote\QueuedJobs\Services\AbstractQueuedJob; | ||
use Symbiote\QueuedJobs\Services\QueuedJob; | ||
use Throwable; | ||
|
||
class JobRetryExtension extends Extension | ||
{ | ||
|
||
use Configurable; | ||
|
||
/** | ||
* By default, we're only going to apply this retry logic to Jobs that are part of this module. If you have some | ||
* bespoke Jobs that you'd like to apply this logic to, then you can add them to this configuration | ||
* | ||
* Using key/value because this allows you to remove existing values through configuration. EG (in yaml): | ||
* SilverStripe\Forager\Extensions\JobRetryExtension: | ||
* job_class_allowlist: | ||
* BatchJob: null | ||
* | ||
* The above will remove BatchJob from the allowlist | ||
*/ | ||
private static array $job_class_allowlist = [ | ||
'BatchJob' => BatchJob::class, | ||
]; | ||
|
||
/** | ||
* Using key/value because this allows you to remove existing values through configuration. EG (in yaml): | ||
* SilverStripe\Forager\Extensions\JobRetryExtension: | ||
* status_code_allowlist: | ||
* RequestTimeout: null | ||
* | ||
* The above will remove RequestTimeout from the allowlist | ||
*/ | ||
private static array $status_code_allowlist = [ | ||
'RequestTimeout' => 429, | ||
'GatewayTimeout' => 504, | ||
]; | ||
|
||
/** | ||
* The maximum number of times that we'll attempt this job before we let it be marked as Broken | ||
*/ | ||
private static int $max_attempts = 4; | ||
|
||
/** | ||
* We calculate the overall backoff time based on backoff_time, the backoff_multiplier, and the number of attempts | ||
* that have been performed. The goal is for us to have an increasing backoff duration after each failed attempt | ||
* | ||
* Using default values, you should expect the following backoff times | ||
* - 1 minute after the first attempt | ||
* - 5 minutes after the second attempt | ||
* - 25 minutes after the third attempt | ||
*/ | ||
private static int $backoff_time = 12; | ||
|
||
/** | ||
* The backoff_multiplier is multiplied by the power of the number of attempts that has the job has had. EG: | ||
* - pow(5, 1) after the first attempt | ||
* - pow(5, 2) after the second attempt | ||
* - pow(5, 3) after the third attempt | ||
*/ | ||
private static int $backoff_multiplier = 5; | ||
|
||
public function updateJobDescriptorAndJobOnException( | ||
QueuedJobDescriptor $descriptor, | ||
QueuedJob $job, | ||
Throwable $exception | ||
): void { | ||
// I've left the type casting for the $job argument as the QueuedJob interface because this extension point is | ||
// invoked by QueuedJobService for *every* job it runs, so we need to remain compatible with that. Developers | ||
// may have implemented that interface on other Job classes. That said: This extension only works for | ||
// AbstractQueuedJobs | ||
if (!$job instanceof AbstractQueuedJob) { | ||
return; | ||
} | ||
|
||
if (!$this->jobClassAllowsRetry($job)) { | ||
// We don't want to enable retries for this job, so just leave it as is | ||
return; | ||
} | ||
|
||
if (!$this->statusCodeAllowsRetry($exception)) { | ||
// We don't want to enable retries for this status code, so just leave it as is | ||
return; | ||
} | ||
|
||
// AttemptCount might not exist in the JobData if this was the first attempt | ||
$attemptCount = $job->AttemptCount ?? 0; | ||
// Specifically not using += 1 because this property won't yet be set if this is the first failure. Setting | ||
// this to a magic property will result in AttemptCount being saved as JobData, meaning that it can be | ||
// retrieved with the current value the next time the job runs | ||
$job->AttemptCount = $attemptCount + 1; | ||
// Find out how many attempts (total) we allow | ||
$maxAttempts = $this->config()->get('max_attempts'); | ||
|
||
// Track the fact that we're about to reset this job for another attempt | ||
$job->addMessage(sprintf( | ||
'Attempt %s failed. %s attempts remaining', | ||
$job->AttemptCount, | ||
$maxAttempts - $job->AttemptCount | ||
)); | ||
|
||
if ($job->AttemptCount >= $maxAttempts) { | ||
// This job has already gone through its allowed number of attempts | ||
return; | ||
} | ||
|
||
// "New" is the only status we have available to initiate another attempt | ||
$descriptor->JobStatus = QueuedJob::STATUS_NEW; | ||
|
||
// Set a new StartAfter time based on how many failed attempts we've already had | ||
$descriptor->StartAfter = $this->getBackoffTimer($job->AttemptCount); | ||
|
||
// Release the job lock, so it could be picked up again | ||
$descriptor->Worker = null; | ||
$descriptor->Expiry = null; | ||
} | ||
|
||
private function jobClassAllowsRetry(QueuedJob $job): bool | ||
{ | ||
foreach ($this->config()->get('job_class_allowlist') as $class) { | ||
if ($job instanceof $class) { | ||
return true; | ||
} | ||
} | ||
|
||
return false; | ||
} | ||
|
||
private function statusCodeAllowsRetry(Throwable $exception): bool | ||
{ | ||
foreach ($this->config()->get('status_code_allowlist') as $code) { | ||
if ($exception->getCode() === $code) { | ||
return true; | ||
} | ||
} | ||
|
||
return false; | ||
} | ||
|
||
private function getBackoffTimer(int $attemptCount): int | ||
{ | ||
$backoffTime = $this->config()->get('backoff_time') ?? 0; | ||
$backoffMultiplier = pow($this->config()->get('backoff_multiplier') ?? 0, $attemptCount); | ||
$backoffDuration = $backoffTime * $backoffMultiplier; | ||
|
||
return DBDatetime::now()->getTimestamp() + $backoffDuration; | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
<?php | ||
|
||
namespace SilverStripe\Forager\Tests\Extensions; | ||
|
||
use SilverStripe\Dev\SapphireTest; | ||
use SilverStripe\Forager\Tests\Fake\BatchJobFake; | ||
use SilverStripe\ORM\FieldType\DBDatetime; | ||
use Symbiote\QueuedJobs\DataObjects\QueuedJobDescriptor; | ||
use Symbiote\QueuedJobs\Services\QueuedJob; | ||
use Symbiote\QueuedJobs\Services\QueuedJobService; | ||
|
||
class JobRetryExtensionTest extends SapphireTest | ||
{ | ||
|
||
protected $usesDatabase = true; // phpcs:ignore SlevomatCodingStandard.TypeHints | ||
|
||
/** | ||
* We'll run the job a total of 4 times. We expect 3 attempts to break, but be retried, and we expect the 4th try | ||
* to break and remain broken | ||
* | ||
* @dataProvider provideCodes | ||
*/ | ||
public function testRetryCodes(int $code): void | ||
{ | ||
$job = new BatchJobFake($code); | ||
|
||
$jobId = QueuedJobService::singleton()->queueJob($job); | ||
|
||
// THE 1ST TRY | ||
// This should break (but be allowed to retry) | ||
|
||
// Run the job through our service, so that the extension point is invoked | ||
QueuedJobService::singleton()->runJob($jobId); | ||
|
||
// Fetch the Descriptor (after job processing) so that we can test that it was set with the correct data | ||
$jobDescriptor = QueuedJobDescriptor::get()->byID($jobId); | ||
|
||
$this->assertNotNull($jobDescriptor); | ||
|
||
// The job status should have been set back to "New" | ||
$this->assertEquals(QueuedJob::STATUS_NEW, $jobDescriptor->JobStatus); | ||
// Check that our message was added | ||
$this->assertStringContainsString('Attempt 1 failed. 3 attempts remaining', $jobDescriptor->SavedJobMessages); | ||
|
||
$jobData = unserialize($jobDescriptor->SavedJobData); | ||
$attemptCount = $jobData->AttemptCount ?? 0; | ||
|
||
$this->assertEquals(1, $attemptCount); | ||
$this->assertNotNull($jobDescriptor->StartAfter); | ||
|
||
$startAfter = strtotime($jobDescriptor->StartAfter); | ||
|
||
// After the 1st try we except a 1-minute (60-second) delay for the StartAfter. Putting in a window of 10 | ||
// seconds for the comparison to reduce flakiness | ||
$expectedLow = DBDatetime::now()->getTimestamp() + 55; | ||
$expectedHigh = DBDatetime::now()->getTimestamp() + 65; | ||
|
||
$this->assertGreaterThan($expectedLow, $startAfter); | ||
$this->assertLessThan($expectedHigh, $startAfter); | ||
|
||
// THE 2ND TRY | ||
// This should break (but be allowed to retry) | ||
|
||
// In order to try the job again immediately, we need to remove the StartAfter | ||
$jobDescriptor->StartAfter = null; | ||
$jobDescriptor->write(); | ||
|
||
// Run the job again | ||
QueuedJobService::singleton()->runJob($jobId); | ||
|
||
// Fetch the Descriptor (after job processing) so that we can test that it was set with the correct data | ||
$jobDescriptor = QueuedJobDescriptor::get()->byID($jobId); | ||
|
||
$this->assertNotNull($jobDescriptor); | ||
|
||
// The job status should have been set back to "New" | ||
$this->assertEquals(QueuedJob::STATUS_NEW, $jobDescriptor->JobStatus); | ||
// Check that our message was added | ||
$this->assertStringContainsString('Attempt 2 failed. 2 attempts remaining', $jobDescriptor->SavedJobMessages); | ||
|
||
$jobData = unserialize($jobDescriptor->SavedJobData); | ||
$attemptCount = $jobData->AttemptCount ?? 0; | ||
|
||
$this->assertEquals(2, $attemptCount); | ||
$this->assertNotNull($jobDescriptor->StartAfter); | ||
|
||
$startAfter = strtotime($jobDescriptor->StartAfter); | ||
|
||
// After the 2nd try we except a 5-minute (300-second) delay for the StartAfter. Putting in a window of 10 | ||
// seconds for the comparison to reduce flakiness | ||
$expectedLow = DBDatetime::now()->getTimestamp() + 295; | ||
$expectedHigh = DBDatetime::now()->getTimestamp() + 305; | ||
|
||
$this->assertGreaterThan($expectedLow, $startAfter); | ||
$this->assertLessThan($expectedHigh, $startAfter); | ||
|
||
// In order to try the job again immediately, we need to remove the StartAfter | ||
$jobDescriptor->StartAfter = null; | ||
$jobDescriptor->write(); | ||
|
||
// THE 3RD TRY | ||
// This should break (but be allowed to retry) | ||
|
||
// Run the job again | ||
QueuedJobService::singleton()->runJob($jobId); | ||
|
||
// Fetch the Descriptor (after job processing) so that we can test that it was set with the correct data | ||
$jobDescriptor = QueuedJobDescriptor::get()->byID($jobId); | ||
|
||
$this->assertNotNull($jobDescriptor); | ||
|
||
// The job status should have been set back to "New" | ||
$this->assertEquals(QueuedJob::STATUS_NEW, $jobDescriptor->JobStatus); | ||
// Check that our message was added | ||
$this->assertStringContainsString('Attempt 3 failed. 1 attempts remaining', $jobDescriptor->SavedJobMessages); | ||
|
||
$jobData = unserialize($jobDescriptor->SavedJobData); | ||
$attemptCount = $jobData->AttemptCount ?? 0; | ||
|
||
$this->assertEquals(3, $attemptCount); | ||
$this->assertNotNull($jobDescriptor->StartAfter); | ||
|
||
$startAfter = strtotime($jobDescriptor->StartAfter); | ||
|
||
// After the 3rd try we except a 25-minute (1500-second) delay for the StartAfter. Putting in a window of 10 | ||
// seconds for the comparison to reduce flakiness | ||
$expectedLow = DBDatetime::now()->getTimestamp() + 1495; | ||
$expectedHigh = DBDatetime::now()->getTimestamp() + 1505; | ||
|
||
$this->assertGreaterThan($expectedLow, $startAfter); | ||
$this->assertLessThan($expectedHigh, $startAfter); | ||
// In order to try the job again immediately, we need to remove the StartAfter | ||
$jobDescriptor->StartAfter = null; | ||
$jobDescriptor->write(); | ||
|
||
// THE 4TH TRY | ||
// This should break (and remain broken) | ||
|
||
// Run the job again | ||
QueuedJobService::singleton()->runJob($jobId); | ||
|
||
// Fetch the Descriptor (after job processing) so that we can test that it was set with the correct data | ||
$jobDescriptor = QueuedJobDescriptor::get()->byID($jobId); | ||
|
||
$this->assertNotNull($jobDescriptor); | ||
|
||
// The job status should have been set back to "New" | ||
$this->assertEquals(QueuedJob::STATUS_BROKEN, $jobDescriptor->JobStatus); | ||
// Check that our message was added | ||
$this->assertStringContainsString('Attempt 4 failed. 0 attempts remaining', $jobDescriptor->SavedJobMessages); | ||
} | ||
|
||
public function provideCodes(): array | ||
{ | ||
return [ | ||
[429], | ||
[504], | ||
]; | ||
} | ||
|
||
public function testNoRetry(): void | ||
{ | ||
$job = new BatchJobFake(404); | ||
|
||
$jobId = QueuedJobService::singleton()->queueJob($job); | ||
|
||
// Run the job through our service, so that the extension point is invoked | ||
QueuedJobService::singleton()->runJob($jobId); | ||
|
||
// Fetch the Descriptor (after job processing) so that we can test that it was set with the correct data | ||
$jobDescriptor = QueuedJobDescriptor::get()->byID($jobId); | ||
|
||
$this->assertNotNull($jobDescriptor); | ||
|
||
// The job status should have been set back to "New" | ||
$this->assertEquals(QueuedJob::STATUS_BROKEN, $jobDescriptor->JobStatus); | ||
} | ||
|
||
} |
Oops, something went wrong.