Skip to content

Commit

Permalink
Add Stop Jobs action (#577)
Browse files Browse the repository at this point in the history
* Add Stop Jobs action

* Fix mock function

* Fix Test

* Add Stop button to detail

* Fix url Typo

* Fix stopped jobs do not need to be cancelled, test, and job regex
  • Loading branch information
gabriels1234 authored May 6, 2023
1 parent 4ec80ce commit d7e08b5
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 1 deletion.
6 changes: 6 additions & 0 deletions django_rq/templates/django_rq/job_detail.html
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@

<div class="submit-row">
<p class="deletelink-box"><a href="delete/" class="deletelink">Delete</a></p>
{% if job.is_started %}
<form method = 'POST' action = "{% url 'rq_stop_job' queue_index job.id %}">
{% csrf_token %}
<input type="submit" value="Stop" class="deletelink" name="stop">
</form>
{% endif %}
{% if job.is_failed %}
<form method = 'POST' action = "{% url 'rq_requeue_job' queue_index job.id %}">
{% csrf_token %}
Expand Down
3 changes: 3 additions & 0 deletions django_rq/templates/django_rq/jobs.html
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
{% if job_status == 'Failed' %}
<option value="requeue">Requeue</option>
{% endif %}
{% if job_status == 'Started' %}
<option value="stop">Stop</option>
{% endif %}
</select>
</label>
<button type="submit" class="button" title="Execute selected action" name="index" value="0">Go</button>
Expand Down
33 changes: 33 additions & 0 deletions django_rq/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import datetime
from unittest.mock import PropertyMock, patch


from django.contrib.auth.models import User
from django.test import TestCase, override_settings
from django.test.client import Client
Expand Down Expand Up @@ -362,3 +363,35 @@ def test_statistics_json_view(self):
self.assertEqual(response.status_code, 200)
self.assertNotIn("name", response.content.decode('utf-8'))
self.assertIn('"error": true', response.content.decode('utf-8'))

def test_action_stop_jobs(self):
queue = get_queue('django_rq_test')
queue_index = get_queue_index('django_rq_test')

# Enqueue some jobs
job_ids, jobs = [], []
worker = get_worker('django_rq_test')
for _ in range(3):
job = queue.enqueue(access_self)
job_ids.append(job.id)
jobs.append(job)
worker.prepare_job_execution(job)

# Check if the jobs are started
for job_id in job_ids:
job = Job.fetch(job_id, connection=queue.connection)
self.assertEqual(job.get_status(), JobStatus.STARTED)

# Stop those jobs using the view
started_job_registry = StartedJobRegistry(queue.name, connection=queue.connection)
self.assertEqual(len(started_job_registry), len(job_ids))
self.client.post(reverse('rq_actions', args=[queue_index]), {'action': 'stop', 'job_ids': job_ids})
for job in jobs:
worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started
self.assertEqual(len(started_job_registry), 0)

canceled_job_registry = FailedJobRegistry(queue.name, connection=queue.connection)
self.assertEqual(len(canceled_job_registry), len(job_ids))

for job_id in job_ids:
self.assertIn(job_id, canceled_job_registry)
3 changes: 3 additions & 0 deletions django_rq/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@
re_path(
r'^queues/(?P<queue_index>[\d]+)/(?P<job_id>[^/]+)/enqueue/$', views.enqueue_job, name='rq_enqueue_job'
),
re_path(
r'^queues/(?P<queue_index>[\d]+)/(?P<job_id>[^/]+)/stop/$', views.stop_job, name='rq_stop_job'
),
]
14 changes: 14 additions & 0 deletions django_rq/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
StartedJobRegistry,
clean_registries,
)
from rq.command import send_stop_job_command
from rq.worker import Worker
from rq.worker_registration import clean_worker_registry

Expand Down Expand Up @@ -106,3 +107,16 @@ def get_jobs(queue, job_ids, registry=None):
valid_jobs.append(job)

return valid_jobs

def stop_jobs(queue, job_ids):
job_ids = job_ids if isinstance(job_ids, (list, tuple)) else [job_ids]
stopped_job_ids = []
failed_to_stop_job_ids = []
for job_id in job_ids:
try:
send_stop_job_command(queue.connection, job_id)
except Exception:
failed_to_stop_job_ids.append(job_id)
continue
stopped_job_ids.append(job_id)
return stopped_job_ids, failed_to_stop_job_ids
26 changes: 25 additions & 1 deletion django_rq/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from django.shortcuts import redirect, render
from django.urls import reverse
from django.views.decorators.cache import never_cache
from django.views.decorators.http import require_POST

from redis.exceptions import ResponseError
from rq import requeue_job
Expand All @@ -22,10 +23,11 @@
)
from rq.worker import Worker
from rq.worker_registration import clean_worker_registry
from rq.command import send_stop_job_command

from .queues import get_queue_by_index
from .settings import API_TOKEN
from .utils import get_statistics, get_jobs
from .utils import get_statistics, get_jobs, stop_jobs


@never_cache
Expand Down Expand Up @@ -493,6 +495,12 @@ def actions(request, queue_index):
for job_id in job_ids:
requeue_job(job_id, connection=queue.connection)
messages.info(request, 'You have successfully requeued %d jobs!' % len(job_ids))
elif request.POST['action'] == 'stop':
stopped, failed_to_stop = stop_jobs(queue, job_ids)
if len(stopped) >0 :
messages.info(request, 'You have successfully stopped %d jobs!' % len(stopped))
if len(failed_to_stop) >0 :
messages.error(request, '%d jobs failed to stop!' % len(failed_to_stop))

return redirect(next_url)

Expand Down Expand Up @@ -534,3 +542,19 @@ def enqueue_job(request, queue_index, job_id):
'queue': queue,
}
return render(request, 'django_rq/delete_job.html', context_data)


@never_cache
@staff_member_required
@require_POST
def stop_job(request, queue_index, job_id):
"""Stop started job"""
queue_index = int(queue_index)
queue = get_queue_by_index(queue_index)
stopped, _ = stop_jobs(queue, job_id)
if len(stopped) == 1:
messages.info(request, 'You have successfully stopped %s' % job_id)
return redirect('rq_job_detail', queue_index, job_id)
else:
messages.error(request, 'Failed to stop %s' % job_id)
return redirect('rq_job_detail', queue_index, job_id)

0 comments on commit d7e08b5

Please sign in to comment.