From 0272ebc236aae934270191494bda15ba970de67a Mon Sep 17 00:00:00 2001 From: sophcass <113556969+sophcass@users.noreply.github.com> Date: Tue, 21 Nov 2023 09:36:58 +0000 Subject: [PATCH] Specify queue serializer when creating Django RQ queue (#630) * Add serializer when instantiating a queue. * Add test --- django_rq/queues.py | 9 ++++++++- django_rq/tests/settings.py | 6 ++++++ django_rq/tests/tests.py | 9 +++++++++ django_rq/utils.py | 2 +- django_rq/views.py | 18 +++++++++--------- 5 files changed, 33 insertions(+), 11 deletions(-) diff --git a/django_rq/queues.py b/django_rq/queues.py index 480ae86f..9c7218ab 100644 --- a/django_rq/queues.py +++ b/django_rq/queues.py @@ -155,6 +155,7 @@ def get_queue( connection=None, queue_class=None, job_class=None, + serializer=None, **kwargs ): """ @@ -176,6 +177,8 @@ def get_queue( default_timeout = QUEUES[name].get('DEFAULT_TIMEOUT') if connection is None: connection = get_connection(name) + if serializer is None: + serializer = QUEUES[name].get('SERIALIZER') queue_class = get_queue_class(QUEUES[name], queue_class) return queue_class( name, @@ -184,6 +187,7 @@ def get_queue( is_async=is_async, job_class=job_class, autocommit=autocommit, + serializer=serializer, **kwargs ) @@ -196,7 +200,10 @@ def get_queue_by_index(index): config = QUEUES_LIST[int(index)] return get_queue_class(config)( - config['name'], connection=get_redis_connection(config['connection_config']), is_async=config.get('ASYNC', True) + config['name'], + connection=get_redis_connection(config['connection_config']), + is_async=config.get('ASYNC', True), + serializer=config['connection_config'].get('SERIALIZER') ) def get_scheduler_by_index(index): diff --git a/django_rq/tests/settings.py b/django_rq/tests/settings.py index d550b5aa..c54273f0 100644 --- a/django_rq/tests/settings.py +++ b/django_rq/tests/settings.py @@ -187,6 +187,12 @@ 'DB': 0, 'DEFAULT_TIMEOUT': 400, }, + 'test_serializer': { + 'HOST': REDIS_HOST, + 'PORT': 6379, + 'DB': 0, + 'SERIALIZER': 'rq.serializers.JSONSerializer', + }, } RQ = { 'AUTOCOMMIT': False, diff --git a/django_rq/tests/tests.py b/django_rq/tests/tests.py index 8adc6568..61c9b3e5 100644 --- a/django_rq/tests/tests.py +++ b/django_rq/tests/tests.py @@ -13,6 +13,7 @@ from redis.exceptions import ConnectionError from rq import get_current_job, Queue +import rq from rq.exceptions import NoSuchJobError from rq.job import Job from rq.registry import FinishedJobRegistry, ScheduledJobRegistry @@ -457,6 +458,14 @@ def test_default_timeout(self): queue = get_queue('test1') self.assertEqual(queue._default_timeout, 400) + def test_get_queue_serializer(self): + """ + Test that the correct serializer is set on the queue. + """ + queue = get_queue('test_serializer') + self.assertEqual(queue.name, 'test_serializer') + self.assertEqual(queue.serializer, rq.serializers.JSONSerializer) + @override_settings(RQ={'AUTOCOMMIT': True}) class DecoratorTest(TestCase): diff --git a/django_rq/utils.py b/django_rq/utils.py index 3e67554b..5033cead 100644 --- a/django_rq/utils.py +++ b/django_rq/utils.py @@ -120,7 +120,7 @@ def get_jobs(queue, job_ids, registry=None): 1. If job data is not present in Redis, discard the result 2. If `registry` argument is supplied, delete empty jobs from registry """ - jobs = Job.fetch_many(job_ids, connection=queue.connection) + jobs = Job.fetch_many(job_ids, connection=queue.connection, serializer=queue.serializer) valid_jobs = [] for i, job in enumerate(jobs): if job is None: diff --git a/django_rq/views.py b/django_rq/views.py index 7ae8529b..acf0e0af 100644 --- a/django_rq/views.py +++ b/django_rq/views.py @@ -289,7 +289,7 @@ def deferred_jobs(request, queue_index): for job_id in job_ids: try: - jobs.append(Job.fetch(job_id, connection=queue.connection)) + jobs.append(Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer)) except NoSuchJobError: pass @@ -316,7 +316,7 @@ def job_detail(request, queue_index, job_id): queue = get_queue_by_index(queue_index) try: - job = Job.fetch(job_id, connection=queue.connection) + job = Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer) except NoSuchJobError: raise Http404("Couldn't find job with this ID: %s" % job_id) @@ -353,7 +353,7 @@ def job_detail(request, queue_index, job_id): def delete_job(request, queue_index, job_id): queue_index = int(queue_index) queue = get_queue_by_index(queue_index) - job = Job.fetch(job_id, connection=queue.connection) + job = Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer) if request.method == 'POST': # Remove job id from queue and delete the actual job @@ -376,10 +376,10 @@ def delete_job(request, queue_index, job_id): def requeue_job_view(request, queue_index, job_id): queue_index = int(queue_index) queue = get_queue_by_index(queue_index) - job = Job.fetch(job_id, connection=queue.connection) + job = Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer) if request.method == 'POST': - requeue_job(job_id, connection=queue.connection) + requeue_job(job_id, connection=queue.connection, serializer=queue.serializer) messages.info(request, 'You have successfully requeued %s' % job.id) return redirect('rq_job_detail', queue_index, job_id) @@ -433,7 +433,7 @@ def requeue_all(request, queue_index): # Confirmation received for job_id in job_ids: try: - requeue_job(job_id, connection=queue.connection) + requeue_job(job_id, connection=queue.connection, serializer=queue.serializer) count += 1 except NoSuchJobError: pass @@ -488,14 +488,14 @@ def actions(request, queue_index): if request.POST['action'] == 'delete': for job_id in job_ids: - job = Job.fetch(job_id, connection=queue.connection) + job = Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer) # Remove job id from queue and delete the actual job queue.connection.lrem(queue.key, 0, job.id) job.delete() messages.info(request, 'You have successfully deleted %s jobs!' % len(job_ids)) elif request.POST['action'] == 'requeue': for job_id in job_ids: - requeue_job(job_id, connection=queue.connection) + requeue_job(job_id, connection=queue.connection, serializer=queue.serializer) messages.info(request, 'You have successfully requeued %d jobs!' % len(job_ids)) elif request.POST['action'] == 'stop': stopped, failed_to_stop = stop_jobs(queue, job_ids) @@ -513,7 +513,7 @@ def enqueue_job(request, queue_index, job_id): """Enqueue deferred jobs""" queue_index = int(queue_index) queue = get_queue_by_index(queue_index) - job = Job.fetch(job_id, connection=queue.connection) + job = Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer) if request.method == 'POST': try: