Skip to content

Commit

Permalink
fix:infinite loop on callback calling is_scheduled()
Browse files Browse the repository at this point in the history
  • Loading branch information
cunla committed Oct 9, 2023
1 parent fb5636f commit 6e2a78d
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 28 deletions.
9 changes: 7 additions & 2 deletions docs/changelog.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
# Changelog

## v1.2.1 🌈

### 🐛 Bug Fixes

- Fix infinite loop on callback calling is_scheduled() #37.

## v1.2.0 🌈

### 🚀 Features

- Rename `*Job` models to `*Task` to differentiate.


## v1.1.0 🌈

### 🚀 Features
Expand All @@ -16,7 +21,7 @@

### 🐛 Bug Fixes

- #32 Running jobs should be marked as scheduled jobs. @rstalbow (#33)
- #32 Running jobs should be marked as scheduled jobs. @rstalbow (#33)

## v1.0.2 🌈

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "django-tasks-scheduler"
packages = [
{ include = "scheduler" },
]
version = "1.2.0"
version = "1.2.1"
description = "An async job scheduler for django using redis"
readme = "README.md"
keywords = ["redis", "django", "background-jobs", "job-queue", "task-queue", "redis-queue", "scheduled-jobs"]
Expand Down
10 changes: 5 additions & 5 deletions scheduler/admin/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ def enable_selected(self, request, queryset):

@admin.action(description="Enqueue now", permissions=('change',))
def enqueue_job_now(self, request, queryset):
job_names = []
for job in queryset:
job.enqueue_to_run()
job_names.append(job.name)
self.message_user(request, f"The following jobs have been enqueued: {', '.join(job_names)}", )
task_names = []
for task in queryset:
task.enqueue_to_run()
task_names.append(task.name)
self.message_user(request, f"The following jobs have been enqueued: {', '.join(task_names)}", )
35 changes: 19 additions & 16 deletions scheduler/models/scheduled_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@


def callback_save_job(job, connection, result, *args, **kwargs):
model_name = job.meta.get('job_type', None)
model_name = job.meta.get('task_type', None)
if model_name is None:
return
model = apps.get_model(app_label='scheduler', model_name=model_name)
task = model.objects.filter(job_id=job.id).first()
if task is not None:
task.unschedule()
task.schedule()
task.force_schedule()


class BaseTask(models.Model):
Expand Down Expand Up @@ -80,18 +79,20 @@ def callable_func(self):
"""Translate callable string to callable"""
return tools.callable_func(self.callable)

@admin.display(boolean=True, description=_('is next scheduled?'))
@admin.display(boolean=True, description=_('is scheduled?'))
def is_scheduled(self) -> bool:
"""Check whether a next job for this task is queued/scheduled to be executed"""
if not self.job_id: # no job_id => is not scheduled
if self.job_id is None: # no job_id => is not scheduled
return False
# check whether job_id is in scheduled/enqueued/active jobs
scheduled_jobs = self.rqueue.scheduled_job_registry.get_job_ids()
enqueued_jobs = self.rqueue.get_job_ids()
active_jobs = self.rqueue.started_job_registry.get_job_ids()
res = ((self.job_id in scheduled_jobs)
or (self.job_id in enqueued_jobs))
# If the job_id is not scheduled/enqueued, update the job_id to None.
# (The job_id belongs to a previous run which is completed or currently running)
or (self.job_id in enqueued_jobs)
or (self.job_id in active_jobs))
# If the job_id is not scheduled/enqueued/started,
# update the job_id to None. (The job_id belongs to a previous run which is completed)
if not res:
self.job_id = None
super(BaseTask, self).save()
Expand Down Expand Up @@ -132,8 +133,8 @@ def _enqueue_args(self) -> Dict:
res = dict(
meta=dict(
repeat=self.repeat,
job_type=self.TASK_TYPE,
scheduled_job_id=self.id,
task_type=self.TASK_TYPE,
scheduled_task_id=self.id,
),
on_success=callback_save_job,
on_failure=callback_save_job,
Expand Down Expand Up @@ -175,6 +176,11 @@ def schedule(self) -> bool:
"""
if not self.ready_for_schedule():
return False
self.force_schedule()
return True

def force_schedule(self):
"""Schedule task regardless of its current status"""
schedule_time = self._schedule_time()
kwargs = self._enqueue_args()
job = self.rqueue.enqueue_at(
Expand All @@ -184,19 +190,17 @@ def schedule(self) -> bool:
**kwargs, )
self.job_id = job.id
super(BaseTask, self).save()
return True

def enqueue_to_run(self) -> bool:
"""Enqueue job to run now.
"""
"""Enqueue job to run now."""
kwargs = self._enqueue_args()
job = self.rqueue.enqueue(
tools.run_task,
args=(self.TASK_TYPE, self.id),
**kwargs,
)
self.job_id = job.id
super(BaseTask, self).save()
self.save(schedule_job=False)
return True

def unschedule(self) -> bool:
Expand All @@ -217,8 +221,7 @@ def _schedule_time(self):
return utc(self.scheduled_time)

def to_dict(self) -> Dict:
"""Export model to dictionary, so it can be saved as external file backup
"""
"""Export model to dictionary, so it can be saved as external file backup"""
res = dict(
model=self.TASK_TYPE,
name=self.name,
Expand Down
2 changes: 1 addition & 1 deletion scheduler/rq_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def is_scheduled_task(self):
return self.meta.get('scheduled_task_id', None) is not None

def is_execution_of(self, scheduled_job):
return (self.meta.get('job_type', None) == scheduled_job.TASK_TYPE
return (self.meta.get('task_type', None) == scheduled_job.TASK_TYPE
and self.meta.get('scheduled_task_id', None) == scheduled_job.id)

def stop_execution(self, connection: Redis):
Expand Down
6 changes: 3 additions & 3 deletions scheduler/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,9 @@ def test_admin_enqueue_job_now(self):
# assert part 1
self.assertEqual(200, res.status_code)
entry = _get_job_from_scheduled_registry(task)
task_model, scheduled_job_id = entry.args
task_model, scheduled_task_id = entry.args
self.assertEqual(task_model, task.TASK_TYPE)
self.assertEqual(scheduled_job_id, task.id)
self.assertEqual(scheduled_task_id, task.id)
self.assertEqual('scheduled', entry.get_status())
assert_has_execution_with_status(task, 'queued')

Expand All @@ -346,7 +346,7 @@ def test_admin_enqueue_job_now(self):
# assert 2
entry = _get_job_from_scheduled_registry(task)
self.assertEqual(task_model, task.TASK_TYPE)
self.assertEqual(scheduled_job_id, task.id)
self.assertEqual(scheduled_task_id, task.id)
assert_has_execution_with_status(task, 'finished')

def test_admin_enable_job(self):
Expand Down

0 comments on commit 6e2a78d

Please sign in to comment.