Skip to content

Commit

Permalink
Add new retry and backoff timer feature
Browse files Browse the repository at this point in the history
  • Loading branch information
chrispenny committed Dec 15, 2024
1 parent c5f6ee3 commit 062eaaa
Show file tree
Hide file tree
Showing 5 changed files with 389 additions and 1 deletion.
13 changes: 13 additions & 0 deletions _config/extensions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,31 @@ Name: silverstripe-forager-extensions
SilverStripe\ORM\FieldType\DBField:
extensions:
- SilverStripe\Forager\Extensions\DBFieldExtension

SilverStripe\ORM\FieldType\DBDate:
extensions:
- SilverStripe\Forager\Extensions\DBDateExtension

SilverStripe\ORM\FieldType\DBBoolean:
extensions:
- SilverStripe\Forager\Extensions\DBBooleanExtension

SilverStripe\ORM\FieldType\DBHTMLText:
extensions:
- SilverStripe\Forager\Extensions\DBHTMLFieldExtension

SilverStripe\ORM\FieldType\DBHTMLVarchar:
extensions:
- SilverStripe\Forager\Extensions\DBHTMLFieldExtension

Symbiote\QueuedJobs\Controllers\QueuedJobsAdmin:
extensions:
- SilverStripe\Forager\Extensions\QueuedJobsAdminExtension

Symbiote\QueuedJobs\Services\QueuedJobService:
extensions:
- SilverStripe\Forager\Extensions\JobRetryExtension

---
Name: 'silverstripe-forager-cms'
Only:
Expand All @@ -28,15 +38,18 @@ SilverStripe\CMS\Model\SiteTree:
extensions:
SearchServiceExtension: SilverStripe\Forager\Extensions\SearchServiceExtension
SiteTreeHierarchyExtension: SilverStripe\Forager\Extensions\SiteTreeHierarchyExtension

---
Name: silverstripe-forager-form-extension
---
SilverStripe\AssetAdmin\Forms\FileFormFactory:
extensions:
- SilverStripe\Forager\Extensions\SearchFormFactoryExtension

SilverStripe\Forager\Extensions\SearchFormFactoryExtension:
exclude_classes:
- SilverStripe\Assets\Image

---
Name: forager-extensions-file
Only:
Expand Down
157 changes: 157 additions & 0 deletions src/Extensions/JobRetryExtension.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
<?php

namespace SilverStripe\Forager\Extensions;

use SilverStripe\Core\Config\Configurable;
use SilverStripe\Core\Extension;
use SilverStripe\Forager\Jobs\BatchJob;
use SilverStripe\ORM\FieldType\DBDatetime;
use Symbiote\QueuedJobs\DataObjects\QueuedJobDescriptor;
use Symbiote\QueuedJobs\Services\AbstractQueuedJob;
use Symbiote\QueuedJobs\Services\QueuedJob;
use Throwable;

class JobRetryExtension extends Extension
{

use Configurable;

/**
* By default, we're only going to apply this retry logic to Jobs that are part of this module. If you have some
* bespoke Jobs that you'd like to apply this logic to, then you can add them to this configuration
*
* Using key/value because this allows you to remove existing values through configuration. EG (in yaml):
* SilverStripe\Forager\Extensions\JobRetryExtension:
* job_class_allowlist:
* BatchJob: null
*
* The above will remove BatchJob from the allowlist
*/
private static array $job_class_allowlist = [
'BatchJob' => BatchJob::class,
];

/**
* Using key/value because this allows you to remove existing values through configuration. EG (in yaml):
* SilverStripe\Forager\Extensions\JobRetryExtension:
* status_code_allowlist:
* RequestTimeout: null
*
* The above will remove RequestTimeout from the allowlist
*/
private static array $status_code_allowlist = [
'RequestTimeout' => 429,
'GatewayTimeout' => 504,
];

/**
* The maximum number of times that we'll attempt this job before we let it be marked as Broken
*/
private static int $max_attempts = 4;

/**
* We calculate the overall backoff time based on backoff_time, the backoff_multiplier, and the number of attempts
* that have been performed. The goal is for us to have an increasing backoff duration after each failed attempt
*
* Using default values, you should expect the following backoff times
* - 1 minute after the first attempt
* - 5 minutes after the second attempt
* - 25 minutes after the third attempt
*/
private static int $backoff_time = 12;

/**
* The backoff_multiplier is multiplied by the power of the number of attempts that has the job has had. EG:
* - pow(5, 1) after the first attempt
* - pow(5, 2) after the second attempt
* - pow(5, 3) after the third attempt
*/
private static int $backoff_multiplier = 5;

public function updateJobDescriptorAndJobOnException(
QueuedJobDescriptor $descriptor,
QueuedJob $job,
Throwable $exception
): void {
// I've left the type casting for the $job argument as the QueuedJob interface because this extension point is
// invoked by QueuedJobService for *every* job it runs, so we need to remain compatible with that. Developers
// may have implemented that interface on other Job classes. That said: This extension only works for
// AbstractQueuedJobs
if (!$job instanceof AbstractQueuedJob) {
return;
}

if (!$this->jobClassAllowsRetry($job)) {
// We don't want to enable retries for this job, so just leave it as is
return;
}

if (!$this->statusCodeAllowsRetry($exception)) {
// We don't want to enable retries for this status code, so just leave it as is
return;
}

// AttemptCount might not exist in the JobData if this was the first attempt
$attemptCount = $job->AttemptCount ?? 0;
// Specifically not using += 1 because this property won't yet be set if this is the first failure. Setting
// this to a magic property will result in AttemptCount being saved as JobData, meaning that it can be
// retrieved with the current value the next time the job runs
$job->AttemptCount = $attemptCount + 1;
// Find out how many attempts (total) we allow
$maxAttempts = $this->config()->get('max_attempts');

// Track the fact that we're about to reset this job for another attempt
$job->addMessage(sprintf(
'Attempt %s failed. %s attempts remaining',
$job->AttemptCount,
$maxAttempts - $job->AttemptCount
));

if ($job->AttemptCount >= $maxAttempts) {
// This job has already gone through its allowed number of attempts
return;
}

// "New" is the only status we have available to initiate another attempt
$descriptor->JobStatus = QueuedJob::STATUS_NEW;

// Set a new StartAfter time based on how many failed attempts we've already had
$descriptor->StartAfter = $this->getBackoffTimer($job->AttemptCount);

// Release the job lock, so it could be picked up again
$descriptor->Worker = null;
$descriptor->Expiry = null;
}

private function jobClassAllowsRetry(QueuedJob $job): bool
{
foreach ($this->config()->get('job_class_allowlist') as $class) {
if ($job instanceof $class) {
return true;
}
}

return false;
}

private function statusCodeAllowsRetry(Throwable $exception): bool
{
foreach ($this->config()->get('status_code_allowlist') as $code) {
if ($exception->getCode() === $code) {
return true;
}
}

return false;
}

private function getBackoffTimer(int $attemptCount): int
{
$backoffTime = $this->config()->get('backoff_time') ?? 0;
$backoffMultiplier = pow($this->config()->get('backoff_multiplier') ?? 0, $attemptCount);
$backoffDuration = $backoffTime * $backoffMultiplier;

return DBDatetime::now()->getTimestamp() + $backoffDuration;
}

}
4 changes: 3 additions & 1 deletion src/Jobs/BatchJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ abstract class BatchJob extends AbstractQueuedJob implements QueuedJob
use Configurable;

/**
* A cooldown period between each batched process in the job in milliseconds
* A cooldown period between each batched process in the job in milliseconds. If the service you're integrating with
* has rate limits, or if you find that your server is struggling for resources when this [runs as quick as it can]
* then this + batch size might be useful levers to pull to slow your processing down
*/
private static int $batch_cooldown_ms = 0;

Expand Down
179 changes: 179 additions & 0 deletions tests/Extensions/JobRetryExtensionTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
<?php

namespace SilverStripe\Forager\Tests\Extensions;

use SilverStripe\Dev\SapphireTest;
use SilverStripe\Forager\Tests\Fake\BatchJobFake;
use SilverStripe\ORM\FieldType\DBDatetime;
use Symbiote\QueuedJobs\DataObjects\QueuedJobDescriptor;
use Symbiote\QueuedJobs\Services\QueuedJob;
use Symbiote\QueuedJobs\Services\QueuedJobService;

class JobRetryExtensionTest extends SapphireTest
{

protected $usesDatabase = true; // phpcs:ignore SlevomatCodingStandard.TypeHints

/**
* We'll run the job a total of 4 times. We expect 3 attempts to break, but be retried, and we expect the 4th try
* to break and remain broken
*
* @dataProvider provideCodes
*/
public function testRetryCodes(int $code): void
{
$job = new BatchJobFake($code);

$jobId = QueuedJobService::singleton()->queueJob($job);

// THE 1ST TRY
// This should break (but be allowed to retry)

// Run the job through our service, so that the extension point is invoked
QueuedJobService::singleton()->runJob($jobId);

// Fetch the Descriptor (after job processing) so that we can test that it was set with the correct data
$jobDescriptor = QueuedJobDescriptor::get()->byID($jobId);

$this->assertNotNull($jobDescriptor);

// The job status should have been set back to "New"
$this->assertEquals(QueuedJob::STATUS_NEW, $jobDescriptor->JobStatus);
// Check that our message was added
$this->assertStringContainsString('Attempt 1 failed. 3 attempts remaining', $jobDescriptor->SavedJobMessages);

$jobData = unserialize($jobDescriptor->SavedJobData);
$attemptCount = $jobData->AttemptCount ?? 0;

$this->assertEquals(1, $attemptCount);
$this->assertNotNull($jobDescriptor->StartAfter);

$startAfter = strtotime($jobDescriptor->StartAfter);

// After the 1st try we except a 1-minute (60-second) delay for the StartAfter. Putting in a window of 10
// seconds for the comparison to reduce flakiness
$expectedLow = DBDatetime::now()->getTimestamp() + 55;
$expectedHigh = DBDatetime::now()->getTimestamp() + 65;

$this->assertGreaterThan($expectedLow, $startAfter);
$this->assertLessThan($expectedHigh, $startAfter);

// THE 2ND TRY
// This should break (but be allowed to retry)

// In order to try the job again immediately, we need to remove the StartAfter
$jobDescriptor->StartAfter = null;
$jobDescriptor->write();

// Run the job again
QueuedJobService::singleton()->runJob($jobId);

// Fetch the Descriptor (after job processing) so that we can test that it was set with the correct data
$jobDescriptor = QueuedJobDescriptor::get()->byID($jobId);

$this->assertNotNull($jobDescriptor);

// The job status should have been set back to "New"
$this->assertEquals(QueuedJob::STATUS_NEW, $jobDescriptor->JobStatus);
// Check that our message was added
$this->assertStringContainsString('Attempt 2 failed. 2 attempts remaining', $jobDescriptor->SavedJobMessages);

$jobData = unserialize($jobDescriptor->SavedJobData);
$attemptCount = $jobData->AttemptCount ?? 0;

$this->assertEquals(2, $attemptCount);
$this->assertNotNull($jobDescriptor->StartAfter);

$startAfter = strtotime($jobDescriptor->StartAfter);

// After the 2nd try we except a 5-minute (300-second) delay for the StartAfter. Putting in a window of 10
// seconds for the comparison to reduce flakiness
$expectedLow = DBDatetime::now()->getTimestamp() + 295;
$expectedHigh = DBDatetime::now()->getTimestamp() + 305;

$this->assertGreaterThan($expectedLow, $startAfter);
$this->assertLessThan($expectedHigh, $startAfter);

// In order to try the job again immediately, we need to remove the StartAfter
$jobDescriptor->StartAfter = null;
$jobDescriptor->write();

// THE 3RD TRY
// This should break (but be allowed to retry)

// Run the job again
QueuedJobService::singleton()->runJob($jobId);

// Fetch the Descriptor (after job processing) so that we can test that it was set with the correct data
$jobDescriptor = QueuedJobDescriptor::get()->byID($jobId);

$this->assertNotNull($jobDescriptor);

// The job status should have been set back to "New"
$this->assertEquals(QueuedJob::STATUS_NEW, $jobDescriptor->JobStatus);
// Check that our message was added
$this->assertStringContainsString('Attempt 3 failed. 1 attempts remaining', $jobDescriptor->SavedJobMessages);

$jobData = unserialize($jobDescriptor->SavedJobData);
$attemptCount = $jobData->AttemptCount ?? 0;

$this->assertEquals(3, $attemptCount);
$this->assertNotNull($jobDescriptor->StartAfter);

$startAfter = strtotime($jobDescriptor->StartAfter);

// After the 3rd try we except a 25-minute (1500-second) delay for the StartAfter. Putting in a window of 10
// seconds for the comparison to reduce flakiness
$expectedLow = DBDatetime::now()->getTimestamp() + 1495;
$expectedHigh = DBDatetime::now()->getTimestamp() + 1505;

$this->assertGreaterThan($expectedLow, $startAfter);
$this->assertLessThan($expectedHigh, $startAfter);
// In order to try the job again immediately, we need to remove the StartAfter
$jobDescriptor->StartAfter = null;
$jobDescriptor->write();

// THE 4TH TRY
// This should break (and remain broken)

// Run the job again
QueuedJobService::singleton()->runJob($jobId);

// Fetch the Descriptor (after job processing) so that we can test that it was set with the correct data
$jobDescriptor = QueuedJobDescriptor::get()->byID($jobId);

$this->assertNotNull($jobDescriptor);

// The job status should have been set back to "New"
$this->assertEquals(QueuedJob::STATUS_BROKEN, $jobDescriptor->JobStatus);
// Check that our message was added
$this->assertStringContainsString('Attempt 4 failed. 0 attempts remaining', $jobDescriptor->SavedJobMessages);
}

public function provideCodes(): array
{
return [
[429],
[504],
];
}

public function testNoRetry(): void
{
$job = new BatchJobFake(404);

$jobId = QueuedJobService::singleton()->queueJob($job);

// Run the job through our service, so that the extension point is invoked
QueuedJobService::singleton()->runJob($jobId);

// Fetch the Descriptor (after job processing) so that we can test that it was set with the correct data
$jobDescriptor = QueuedJobDescriptor::get()->byID($jobId);

$this->assertNotNull($jobDescriptor);

// The job status should have been set back to "New"
$this->assertEquals(QueuedJob::STATUS_BROKEN, $jobDescriptor->JobStatus);
}

}
Loading

0 comments on commit 062eaaa

Please sign in to comment.