Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shift limit aka persistent worker #56

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,18 @@ to ensure the jobs table remains at a reasonable size.
To start a worker:

```
manage.py worker [queue_name] [--rate_limit]
manage.py worker [queue_name] [--rate_limit] [--shift_limit]
```

- `queue_name` is optional, and will default to `default`
- The `--rate_limit` flag is optional, and will default to `1`. It is the minimum number of seconds that must have elapsed before a subsequent job can be run.
- The `--shift_limit` flag is optional, and will default to `0`. It is the maximum number of seconds that the worker
can seek new jobs to process. The worker will seek further jobs if time remains in the shift. If the
`--shift_limit` is exceeded once
a job is started, the job will still run to completion, regardless of the time remaining. One use case for `--shift_limit` is to run the worker process via a
CRON Job. See `tests.py` for illustrative effects of `[--rate_limit]` and `[--shift_limit]` combinations on processed
jobs. If `--shift_limit` is not supplied, the default of `0` will be used and the worker will continue to run until
the process is shutdown.

##### manage.py queue_depth
If you'd like to check your queue depth from the command line, you can run `manage.py queue_depth [queue_name [queue_name ...]]` and any
Expand Down
50 changes: 43 additions & 7 deletions django_dbq/management/commands/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@


class Worker:
def __init__(self, name, rate_limit_in_seconds):
def __init__(self, name, rate_limit_in_seconds, shift_limit_in_seconds):
self.queue_name = name
self.rate_limit_in_seconds = rate_limit_in_seconds
self.shift_limit_in_seconds = shift_limit_in_seconds
self.shift_start = timezone.now()
self.alive = True
self.last_job_finished = None
self.current_job = None
Expand All @@ -39,7 +41,7 @@ def shutdown(self, signum, frame):
self.current_job.save(update_fields=["state"])

def run(self):
while self.alive:
while self.alive and self._shift_availability():
self.process_job()

def process_job(self):
Expand Down Expand Up @@ -111,6 +113,24 @@ def _process_job(self):

self.current_job = None

def _shift_availability(self):
"""
Setting a value for shift_limit_in_seconds enables the worker to be run via a CRON Job for a period of time,
whereby worker will seek further jobs if time remains in the shift. If the shift_limit_in_seconds is
exceeded once a job is started it will still run to completion, regardless of the time remaining.
Consequently, the duration of the CRON Interval should be greater than the anticipated duration of the
longest Job.
If shift_limit_in_seconds is not supplied, the default of 0 will be used and the worker will continue to run
until shutdown.
"""
if self.shift_limit_in_seconds <= 0:
return True
elif self.shift_limit_in_seconds > 0 and (timezone.now() - self.shift_start).total_seconds() < \
self.shift_limit_in_seconds:
return True
else:
return False


class Command(BaseCommand):

Expand All @@ -125,6 +145,15 @@ def add_arguments(self, parser):
default=1,
type=int,
)
parser.add_argument(
"--shift_limit",
help="The time limit in seconds within which the worker can process new jobs. The default rate "
"limit is 0 seconds, which disables this argument, allowing the worker to run indefinitely.",
nargs="?",
default=0,
type=int,
)

parser.add_argument(
"--dry-run",
action="store_true",
Expand All @@ -142,13 +171,20 @@ def handle(self, *args, **options):

queue_name = options["queue_name"]
rate_limit_in_seconds = options["rate_limit"]
shift_limit_in_seconds = options["shift_limit"]

self.stdout.write(
'Starting job worker for queue "%s" with rate limit of one job per %s second(s)'
% (queue_name, rate_limit_in_seconds)
)
if shift_limit_in_seconds:
self.stdout.write(
'Starting job worker for queue "%s" with rate limit of one job per %s second(s) and a shift constraint of %s seconds.'
% (queue_name, rate_limit_in_seconds, shift_limit_in_seconds)
)
else:
self.stdout.write(
'Starting job worker for queue "%s" with rate limit of one job per %s second(s).'
% (queue_name, rate_limit_in_seconds)
)

worker = Worker(queue_name, rate_limit_in_seconds)
worker = Worker(queue_name, rate_limit_in_seconds, shift_limit_in_seconds)

if options["dry_run"]:
return
Expand Down
63 changes: 59 additions & 4 deletions django_dbq/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ def failing_task(job):
raise Exception("uh oh")


def shift_task(job):
job.workspace["message"] = f"{job.id} ran"


def failure_hook(job, exception):
job.workspace["output"] = "failure hook ran"

Expand Down Expand Up @@ -163,7 +167,7 @@ def test_process_job_previous_job_long_time_ago(self, mock_sleep):
class ShutdownTestCase(TestCase):
def test_shutdown_sets_state_to_stopping(self):
job = Job.objects.create(name="testjob")
worker = Worker("default", 1)
worker = Worker("default", 1, 0)
worker.current_job = job

worker.shutdown(None, None)
Expand Down Expand Up @@ -279,7 +283,7 @@ def test_task_sequence(self):
class ProcessJobTestCase(TestCase):
def test_process_job(self):
job = Job.objects.create(name="testjob")
Worker("default", 1)._process_job()
Worker("default", 1, 0)._process_job()
job = Job.objects.get()
self.assertEqual(job.state, Job.STATES.COMPLETE)

Expand All @@ -288,7 +292,7 @@ def test_process_job_wrong_queue(self):
Processing a different queue shouldn't touch our other job
"""
job = Job.objects.create(name="testjob", queue_name="lol")
Worker("default", 1)._process_job()
Worker("default", 1, 0)._process_job()
job = Job.objects.get()
self.assertEqual(job.state, Job.STATES.NEW)

Expand Down Expand Up @@ -327,7 +331,7 @@ def test_creation_hook_only_runs_on_create(self):
class JobFailureHookTestCase(TestCase):
def test_failure_hook(self):
job = Job.objects.create(name="testjob")
Worker("default", 1)._process_job()
Worker("default", 1, 0)._process_job()
job = Job.objects.get()
self.assertEqual(job.state, Job.STATES.FAILED)
self.assertEqual(job.workspace["output"], "failure hook ran")
Expand Down Expand Up @@ -361,3 +365,54 @@ def test_delete_old_jobs(self):
self.assertEqual(Job.objects.count(), 2)
self.assertTrue(j4 in Job.objects.all())
self.assertTrue(j5 in Job.objects.all())


@override_settings(JOBS={"testshift": {"tasks": ["django_dbq.tests.shift_task"]}})
class ShiftTestCase(TestCase):
""" Tests various combinations of rate_limit and shift_limit in terms of their impact on processing jobs"""
def test_rate_with_shorter_shift_limit(self):
Job.objects.create(name="testshift")
Job.objects.create(name="testshift")
stdout = StringIO()
call_command("worker", rate_limit=2, shift_limit=1, stdout=stdout)
output = stdout.getvalue()
self.assertTrue("rate limit of one job per 2 second(s) and a shift constraint of 1 seconds" in output)
self.assertEqual(Job.objects.filter(state=Job.STATES.NEW).count(), 1)
self.assertEqual(Job.objects.filter(state=Job.STATES.COMPLETE).count(), 1)

def test_rate_with_equal_shift_limit(self):
Job.objects.create(name="testshift")
Job.objects.create(name="testshift")
stdout = StringIO()
call_command("worker", rate_limit=1, shift_limit=1, stdout=stdout)
output = stdout.getvalue()
self.assertTrue("rate limit of one job per 1 second(s) and a shift constraint of 1 seconds" in output)
self.assertEqual(Job.objects.filter(state=Job.STATES.NEW).count(), 1)
self.assertEqual(Job.objects.filter(state=Job.STATES.COMPLETE).count(), 1)

def test_rate_with_longer_shift_limit(self):
Job.objects.create(name="testshift")
Job.objects.create(name="testshift")
stdout = StringIO()
call_command("worker", rate_limit=1, shift_limit=2, stdout=stdout)
output = stdout.getvalue()
self.assertTrue("rate limit of one job per 1 second(s) and a shift constraint of 2 seconds" in output)
self.assertEqual(Job.objects.filter(state=Job.STATES.NEW).count(), 0)
self.assertEqual(Job.objects.filter(state=Job.STATES.COMPLETE).count(), 2)

def test_rate_with_two_workers(self):
Job.objects.create(name="testshift")
Job.objects.create(name="testshift")
Job.objects.create(name="testshift")
Job.objects.create(name="testshift")
stdout = StringIO()
call_command("worker", rate_limit=1, shift_limit=2, stdout=stdout)
output = stdout.getvalue()
self.assertTrue("rate limit of one job per 1 second(s) and a shift constraint of 2 seconds" in output)
call_command("worker", rate_limit=1, shift_limit=1, stdout=stdout)
output = stdout.getvalue()
self.assertTrue("rate limit of one job per 1 second(s) and a shift constraint of 1 seconds" in output)
self.assertEqual(Job.objects.filter(state=Job.STATES.NEW).count(), 1)
self.assertEqual(Job.objects.filter(state=Job.STATES.COMPLETE).count(), 3)