From 24242b99cbfe0de2b487c0c684241272839065cc Mon Sep 17 00:00:00 2001 From: Perry Roper Date: Sat, 16 Dec 2023 19:31:23 +0700 Subject: [PATCH 1/3] Use datetime.timezone.utc as Django 5.0 removed the alias. (#636) --- django_rq/templatetags/django_rq.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/django_rq/templatetags/django_rq.py b/django_rq/templatetags/django_rq.py index a9a0ac9e..63e5eab0 100644 --- a/django_rq/templatetags/django_rq.py +++ b/django_rq/templatetags/django_rq.py @@ -1,3 +1,5 @@ +import datetime + from django import template from django.utils import timezone from django.utils.html import escape @@ -10,7 +12,7 @@ def to_localtime(time): """Converts naive datetime to localtime based on settings""" - utc_time = time.replace(tzinfo=timezone.utc) + utc_time = time.replace(tzinfo=datetime.timezone.utc) to_zone = timezone.get_default_timezone() return utc_time.astimezone(to_zone) From b993205136aa228afb53b6fd46f1f589664105b1 Mon Sep 17 00:00:00 2001 From: Ben Lopatin Date: Sat, 16 Dec 2023 07:32:02 -0500 Subject: [PATCH 2/3] Account for missing db in connection kwargs (#633) When the `db` value is not included explicitly in the connection kwargs the default `0` db is used but this is not reflected in the connection kwargs. Closes gh-632 --- django_rq/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/django_rq/utils.py b/django_rq/utils.py index 5033cead..14b6f489 100644 --- a/django_rq/utils.py +++ b/django_rq/utils.py @@ -102,7 +102,7 @@ def get_scheduler_statistics(): # jobs in more than one of them queue = get_queue_by_index(index) connection_kwargs = queue.connection.connection_pool.connection_kwargs - conn_key = f"{connection_kwargs['host']}:{connection_kwargs['port']}/{connection_kwargs['db']}" + conn_key = f"{connection_kwargs['host']}:{connection_kwargs['port']}/{connection_kwargs.get('db', 0)}" if conn_key not in schedulers: try: scheduler = get_scheduler(config['name']) From 1a1cd3bdb2078155762d9f8a762a531d74e9f1d6 Mon Sep 17 00:00:00 2001 From: Christofer Saputra <61144981+chromium7@users.noreply.github.com> Date: Mon, 18 Dec 2023 17:09:17 +0700 Subject: [PATCH 3/3] Add RQPool management command (#635) * Add RQPool management command * Update test to call rqpool command * Fix queue names arg * Fix docstring * Rename management command --- .github/workflows/test.yml | 2 +- .../management/commands/rqworker-pool.py | 100 ++++++++++++++++++ django_rq/management/commands/rqworker.py | 35 +----- django_rq/tests/tests.py | 27 ++++- django_rq/utils.py | 38 ++++++- 5 files changed, 160 insertions(+), 42 deletions(-) create mode 100644 django_rq/management/commands/rqworker-pool.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 630269ff..2f0c5c05 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -9,7 +9,7 @@ on: permissions: contents: read # to fetch code (actions/checkout) -jobs: +jobs: build: runs-on: ubuntu-latest diff --git a/django_rq/management/commands/rqworker-pool.py b/django_rq/management/commands/rqworker-pool.py new file mode 100644 index 00000000..a7329801 --- /dev/null +++ b/django_rq/management/commands/rqworker-pool.py @@ -0,0 +1,100 @@ +import os +import sys + +from rq.serializers import resolve_serializer +from rq.worker_pool import WorkerPool +from rq.logutils import setup_loghandlers + +from django.core.management.base import BaseCommand + +from ...jobs import get_job_class +from ...utils import configure_sentry +from ...queues import get_queues +from ...workers import get_worker_class + + +class Command(BaseCommand): + """ + Runs RQ pool with x number of workers on specified queues. + Note that all queues passed into a + single rqworker-pool command must share the same connection. + + Example usage: + python manage.py rqworker-pool high medium low --num-workers 4 + """ + + args = '' + + def add_arguments(self, parser): + parser.add_argument('--num-workers', action='store', dest='num_workers', + type=int, default=1, help='Number of workers to spawn') + parser.add_argument('--worker-class', action='store', dest='worker_class', + help='RQ Worker class to use') + parser.add_argument('--pid', action='store', dest='pid', + default=None, help='PID file to write the worker`s pid into') + parser.add_argument('--burst', action='store_true', dest='burst', + default=False, help='Run worker in burst mode') + parser.add_argument('--queue-class', action='store', dest='queue_class', + help='Queues class to use') + parser.add_argument('--job-class', action='store', dest='job_class', + help='Jobs class to use') + parser.add_argument('--serializer', action='store', default='rq.serializers.DefaultSerializer', dest='serializer', + help='Specify a custom Serializer.') + parser.add_argument('args', nargs='*', type=str, + help='The queues to work on, separated by space') + + # Args present in `rqworker` command but not yet implemented here + # parser.add_argument('--worker-ttl', action='store', type=int, + # dest='worker_ttl', default=420, + # help='Default worker timeout to be used') + # parser.add_argument('--max-jobs', action='store', default=None, dest='max_jobs', type=int, + # help='Maximum number of jobs to execute') + # parser.add_argument('--with-scheduler', action='store_true', dest='with_scheduler', + # default=False, help='Run worker with scheduler enabled') + + # Sentry arguments + parser.add_argument('--sentry-dsn', action='store', default=None, dest='sentry_dsn', + help='Report exceptions to this Sentry DSN') + parser.add_argument('--sentry-ca-certs', action='store', default=None, dest='sentry_ca_certs', + help='A path to an alternative CA bundle file in PEM-format') + parser.add_argument('--sentry-debug', action='store', default=False, dest='sentry_debug', + help='Turns debug mode on or off.') + + def handle(self, *args, **options): + pid = options.get('pid') + if pid: + with open(os.path.expanduser(pid), "w") as fp: + fp.write(str(os.getpid())) + + # Verbosity is defined by default in BaseCommand for all commands + verbosity = options.get('verbosity') + if verbosity >= 2: + logging_level = 'DEBUG' + elif verbosity == 0: + logging_level = 'WARNING' + else: + logging_level = 'INFO' + setup_loghandlers(logging_level) + + sentry_dsn = options.pop('sentry_dsn') + if sentry_dsn: + try: + configure_sentry(sentry_dsn, **options) + except ImportError: + self.stderr.write("Please install sentry-sdk using `pip install sentry-sdk`") + sys.exit(1) + + job_class = get_job_class(options['job_class']) + queues = get_queues(*args, **{'job_class': job_class, 'queue_class': options['queue_class']}) + worker_class = get_worker_class(options.get('worker_class', None)) + serializer = resolve_serializer(options['serializer']) + + pool = WorkerPool( + queues=queues, + connection=queues[0].connection, + num_workers=options['num_workers'], + serializer=serializer, + worker_class=worker_class, + job_class=job_class, + ) + pool.start(burst=options.get('burst', False), logging_level=logging_level) diff --git a/django_rq/management/commands/rqworker.py b/django_rq/management/commands/rqworker.py index b1b2c277..2e61442d 100644 --- a/django_rq/management/commands/rqworker.py +++ b/django_rq/management/commands/rqworker.py @@ -6,42 +6,9 @@ from rq.logutils import setup_loghandlers from django.core.management.base import BaseCommand -from django.db import connections from ...workers import get_worker - - -def reset_db_connections(): - for c in connections.all(): - c.close() - - -def configure_sentry(sentry_dsn, **options): - """ - Configure the Sentry client. - - The **options kwargs are passed straight from the command - invocation - options relevant to Sentry configuration are - extracted. - - In addition to the 'debug' and 'ca_certs' options, which can - be passed in as command options, we add the RqIntegration and - DjangoIntegration to the config. - - Raises ImportError if the sentry_sdk is not available. - - """ - import sentry_sdk - sentry_options = { - 'debug': options.get('sentry_debug', False), - 'ca_certs': options.get('sentry_ca_certs', None), - 'integrations': [ - sentry_sdk.integrations.redis.RedisIntegration(), - sentry_sdk.integrations.rq.RqIntegration(), - sentry_sdk.integrations.django.DjangoIntegration() - ] - } - sentry_sdk.init(sentry_dsn, **sentry_options) +from ...utils import configure_sentry, reset_db_connections class Command(BaseCommand): diff --git a/django_rq/tests/tests.py b/django_rq/tests/tests.py index 61c9b3e5..50f2733d 100644 --- a/django_rq/tests/tests.py +++ b/django_rq/tests/tests.py @@ -172,7 +172,7 @@ def test_sentinel_class_initialized_with_kw_args(self, sentinel_class_mock): get_redis_connection(config) sentinel_init_sentinel_kwargs = sentinel_class_mock.call_args[1] self.assertDictEqual( - sentinel_init_sentinel_kwargs, + sentinel_init_sentinel_kwargs, {'db': 0, 'username': 'redis-user', 'password': 'redis-pass', 'socket_timeout': 0.2, 'ssl': False, 'sentinel_kwargs': {'username': 'sentinel-user', 'password': 'sentinel-pass', 'socket_timeout': 0.3}} ) @@ -286,6 +286,23 @@ def test_pass_queue_via_commandline_args(self): self.assertTrue(job['job'].is_finished) self.assertIn(job['job'].id, job['finished_job_registry'].get_job_ids()) + # Test with rqworker-pool command + jobs = [] + for queue_name in queue_names: + queue = get_queue(queue_name) + jobs.append( + { + 'job': queue.enqueue(divide, 42, 1), + 'finished_job_registry': FinishedJobRegistry(queue.name, queue.connection), + } + ) + + call_command('rqworker-pool', *queue_names, burst=True) + + for job in jobs: + self.assertTrue(job['job'].is_finished) + self.assertIn(job['job'].id, job['finished_job_registry'].get_job_ids()) + def test_configure_sentry(self): rqworker.configure_sentry('https://1@sentry.io/1') self.mock_sdk.init.assert_called_once_with( @@ -843,14 +860,14 @@ def test_scheduler_scheduler_pid_active(self): 'PORT': 6379, }, 'name': test_queue, - }] + }] with patch('django_rq.utils.QUEUES_LIST', new_callable=PropertyMock(return_value=queues)): scheduler = get_scheduler(test_queue) scheduler.register_birth() self.assertIs(get_scheduler_pid(get_queue(scheduler.queue_name)), False) scheduler.register_death() - + @skipIf(RQ_SCHEDULER_INSTALLED is False, 'RQ Scheduler not installed') def test_scheduler_scheduler_pid_inactive(self): test_queue = 'scheduler_scheduler_inactive_test' @@ -861,7 +878,7 @@ def test_scheduler_scheduler_pid_inactive(self): 'PORT': 6379, }, 'name': test_queue, - }] + }] with patch('django_rq.utils.QUEUES_LIST', new_callable=PropertyMock(return_value=queues)): connection = get_connection(test_queue) @@ -905,7 +922,7 @@ def test_worker_scheduler_pid_inactive(self): 'PORT': 6379, }, 'name': test_queue, - }] + }] with patch('django_rq.utils.QUEUES_LIST', new_callable=PropertyMock(return_value=queues)): worker = get_worker(test_queue, name=uuid4().hex) diff --git a/django_rq/utils.py b/django_rq/utils.py index 14b6f489..1eba2dc9 100644 --- a/django_rq/utils.py +++ b/django_rq/utils.py @@ -1,4 +1,5 @@ from django.core.exceptions import ImproperlyConfigured +from django.db import connections from rq.command import send_stop_job_command from rq.job import Job from rq.registry import ( @@ -20,7 +21,7 @@ def get_scheduler_pid(queue): '''Checks whether there's a scheduler-lock on a particular queue, and returns the PID. It Only works with RQ's Built-in RQScheduler. - When RQ-Scheduler is available returns False + When RQ-Scheduler is available returns False If not, it checks the RQ's RQScheduler for a scheduler lock in the desired queue Note: result might have some delay (1-15 minutes) but it helps visualizing whether the setup is working correcly ''' @@ -32,7 +33,7 @@ def get_scheduler_pid(queue): from rq.scheduler import RQScheduler # When a scheduler acquires a lock it adds an expiring key: (e.g: rq:scheduler-lock:) - #TODO: (RQ>= 1.13) return queue.scheduler_pid + #TODO: (RQ>= 1.13) return queue.scheduler_pid pid = queue.connection.get(RQScheduler.get_locking_key(queue.name)) return int(pid.decode()) if pid is not None else None except Exception as e: @@ -144,3 +145,36 @@ def stop_jobs(queue, job_ids): continue stopped_job_ids.append(job_id) return stopped_job_ids, failed_to_stop_job_ids + + +def reset_db_connections(): + for c in connections.all(): + c.close() + + +def configure_sentry(sentry_dsn, **options): + """ + Configure the Sentry client. + + The **options kwargs are passed straight from the command + invocation - options relevant to Sentry configuration are + extracted. + + In addition to the 'debug' and 'ca_certs' options, which can + be passed in as command options, we add the RqIntegration and + DjangoIntegration to the config. + + Raises ImportError if the sentry_sdk is not available. + + """ + import sentry_sdk + sentry_options = { + 'debug': options.get('sentry_debug', False), + 'ca_certs': options.get('sentry_ca_certs', None), + 'integrations': [ + sentry_sdk.integrations.redis.RedisIntegration(), + sentry_sdk.integrations.rq.RqIntegration(), + sentry_sdk.integrations.django.DjangoIntegration() + ] + } + sentry_sdk.init(sentry_dsn, **sentry_options)