Skip to content

Commit

Permalink
Add RQPool management command (#635)
Browse files Browse the repository at this point in the history
* Add RQPool management command

* Update test to call rqpool command

* Fix queue names arg

* Fix docstring

* Rename management command
  • Loading branch information
chromium7 authored Dec 18, 2023
1 parent b993205 commit 1a1cd3b
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
permissions:
contents: read # to fetch code (actions/checkout)

jobs:
jobs:

build:
runs-on: ubuntu-latest
Expand Down
100 changes: 100 additions & 0 deletions django_rq/management/commands/rqworker-pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import os
import sys

from rq.serializers import resolve_serializer
from rq.worker_pool import WorkerPool
from rq.logutils import setup_loghandlers

from django.core.management.base import BaseCommand

from ...jobs import get_job_class
from ...utils import configure_sentry
from ...queues import get_queues
from ...workers import get_worker_class


class Command(BaseCommand):
"""
Runs RQ pool with x number of workers on specified queues.
Note that all queues passed into a
single rqworker-pool command must share the same connection.
Example usage:
python manage.py rqworker-pool high medium low --num-workers 4
"""

args = '<queue queue ...>'

def add_arguments(self, parser):
parser.add_argument('--num-workers', action='store', dest='num_workers',
type=int, default=1, help='Number of workers to spawn')
parser.add_argument('--worker-class', action='store', dest='worker_class',
help='RQ Worker class to use')
parser.add_argument('--pid', action='store', dest='pid',
default=None, help='PID file to write the worker`s pid into')
parser.add_argument('--burst', action='store_true', dest='burst',
default=False, help='Run worker in burst mode')
parser.add_argument('--queue-class', action='store', dest='queue_class',
help='Queues class to use')
parser.add_argument('--job-class', action='store', dest='job_class',
help='Jobs class to use')
parser.add_argument('--serializer', action='store', default='rq.serializers.DefaultSerializer', dest='serializer',
help='Specify a custom Serializer.')
parser.add_argument('args', nargs='*', type=str,
help='The queues to work on, separated by space')

# Args present in `rqworker` command but not yet implemented here
# parser.add_argument('--worker-ttl', action='store', type=int,
# dest='worker_ttl', default=420,
# help='Default worker timeout to be used')
# parser.add_argument('--max-jobs', action='store', default=None, dest='max_jobs', type=int,
# help='Maximum number of jobs to execute')
# parser.add_argument('--with-scheduler', action='store_true', dest='with_scheduler',
# default=False, help='Run worker with scheduler enabled')

# Sentry arguments
parser.add_argument('--sentry-dsn', action='store', default=None, dest='sentry_dsn',
help='Report exceptions to this Sentry DSN')
parser.add_argument('--sentry-ca-certs', action='store', default=None, dest='sentry_ca_certs',
help='A path to an alternative CA bundle file in PEM-format')
parser.add_argument('--sentry-debug', action='store', default=False, dest='sentry_debug',
help='Turns debug mode on or off.')

def handle(self, *args, **options):
pid = options.get('pid')
if pid:
with open(os.path.expanduser(pid), "w") as fp:
fp.write(str(os.getpid()))

# Verbosity is defined by default in BaseCommand for all commands
verbosity = options.get('verbosity')
if verbosity >= 2:
logging_level = 'DEBUG'
elif verbosity == 0:
logging_level = 'WARNING'
else:
logging_level = 'INFO'
setup_loghandlers(logging_level)

sentry_dsn = options.pop('sentry_dsn')
if sentry_dsn:
try:
configure_sentry(sentry_dsn, **options)
except ImportError:
self.stderr.write("Please install sentry-sdk using `pip install sentry-sdk`")
sys.exit(1)

job_class = get_job_class(options['job_class'])
queues = get_queues(*args, **{'job_class': job_class, 'queue_class': options['queue_class']})
worker_class = get_worker_class(options.get('worker_class', None))
serializer = resolve_serializer(options['serializer'])

pool = WorkerPool(
queues=queues,
connection=queues[0].connection,
num_workers=options['num_workers'],
serializer=serializer,
worker_class=worker_class,
job_class=job_class,
)
pool.start(burst=options.get('burst', False), logging_level=logging_level)
35 changes: 1 addition & 34 deletions django_rq/management/commands/rqworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,9 @@
from rq.logutils import setup_loghandlers

from django.core.management.base import BaseCommand
from django.db import connections

from ...workers import get_worker


def reset_db_connections():
for c in connections.all():
c.close()


def configure_sentry(sentry_dsn, **options):
"""
Configure the Sentry client.
The **options kwargs are passed straight from the command
invocation - options relevant to Sentry configuration are
extracted.
In addition to the 'debug' and 'ca_certs' options, which can
be passed in as command options, we add the RqIntegration and
DjangoIntegration to the config.
Raises ImportError if the sentry_sdk is not available.
"""
import sentry_sdk
sentry_options = {
'debug': options.get('sentry_debug', False),
'ca_certs': options.get('sentry_ca_certs', None),
'integrations': [
sentry_sdk.integrations.redis.RedisIntegration(),
sentry_sdk.integrations.rq.RqIntegration(),
sentry_sdk.integrations.django.DjangoIntegration()
]
}
sentry_sdk.init(sentry_dsn, **sentry_options)
from ...utils import configure_sentry, reset_db_connections


class Command(BaseCommand):
Expand Down
27 changes: 22 additions & 5 deletions django_rq/tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def test_sentinel_class_initialized_with_kw_args(self, sentinel_class_mock):
get_redis_connection(config)
sentinel_init_sentinel_kwargs = sentinel_class_mock.call_args[1]
self.assertDictEqual(
sentinel_init_sentinel_kwargs,
sentinel_init_sentinel_kwargs,
{'db': 0, 'username': 'redis-user', 'password': 'redis-pass', 'socket_timeout': 0.2, 'ssl': False, 'sentinel_kwargs': {'username': 'sentinel-user', 'password': 'sentinel-pass', 'socket_timeout': 0.3}}
)

Expand Down Expand Up @@ -286,6 +286,23 @@ def test_pass_queue_via_commandline_args(self):
self.assertTrue(job['job'].is_finished)
self.assertIn(job['job'].id, job['finished_job_registry'].get_job_ids())

# Test with rqworker-pool command
jobs = []
for queue_name in queue_names:
queue = get_queue(queue_name)
jobs.append(
{
'job': queue.enqueue(divide, 42, 1),
'finished_job_registry': FinishedJobRegistry(queue.name, queue.connection),
}
)

call_command('rqworker-pool', *queue_names, burst=True)

for job in jobs:
self.assertTrue(job['job'].is_finished)
self.assertIn(job['job'].id, job['finished_job_registry'].get_job_ids())

def test_configure_sentry(self):
rqworker.configure_sentry('https://[email protected]/1')
self.mock_sdk.init.assert_called_once_with(
Expand Down Expand Up @@ -843,14 +860,14 @@ def test_scheduler_scheduler_pid_active(self):
'PORT': 6379,
},
'name': test_queue,
}]
}]
with patch('django_rq.utils.QUEUES_LIST',
new_callable=PropertyMock(return_value=queues)):
scheduler = get_scheduler(test_queue)
scheduler.register_birth()
self.assertIs(get_scheduler_pid(get_queue(scheduler.queue_name)), False)
scheduler.register_death()

@skipIf(RQ_SCHEDULER_INSTALLED is False, 'RQ Scheduler not installed')
def test_scheduler_scheduler_pid_inactive(self):
test_queue = 'scheduler_scheduler_inactive_test'
Expand All @@ -861,7 +878,7 @@ def test_scheduler_scheduler_pid_inactive(self):
'PORT': 6379,
},
'name': test_queue,
}]
}]
with patch('django_rq.utils.QUEUES_LIST',
new_callable=PropertyMock(return_value=queues)):
connection = get_connection(test_queue)
Expand Down Expand Up @@ -905,7 +922,7 @@ def test_worker_scheduler_pid_inactive(self):
'PORT': 6379,
},
'name': test_queue,
}]
}]
with patch('django_rq.utils.QUEUES_LIST',
new_callable=PropertyMock(return_value=queues)):
worker = get_worker(test_queue, name=uuid4().hex)
Expand Down
38 changes: 36 additions & 2 deletions django_rq/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from django.core.exceptions import ImproperlyConfigured
from django.db import connections
from rq.command import send_stop_job_command
from rq.job import Job
from rq.registry import (
Expand All @@ -20,7 +21,7 @@
def get_scheduler_pid(queue):
'''Checks whether there's a scheduler-lock on a particular queue, and returns the PID.
It Only works with RQ's Built-in RQScheduler.
When RQ-Scheduler is available returns False
When RQ-Scheduler is available returns False
If not, it checks the RQ's RQScheduler for a scheduler lock in the desired queue
Note: result might have some delay (1-15 minutes) but it helps visualizing whether the setup is working correcly
'''
Expand All @@ -32,7 +33,7 @@ def get_scheduler_pid(queue):
from rq.scheduler import RQScheduler

# When a scheduler acquires a lock it adds an expiring key: (e.g: rq:scheduler-lock:<queue.name>)
#TODO: (RQ>= 1.13) return queue.scheduler_pid
#TODO: (RQ>= 1.13) return queue.scheduler_pid
pid = queue.connection.get(RQScheduler.get_locking_key(queue.name))
return int(pid.decode()) if pid is not None else None
except Exception as e:
Expand Down Expand Up @@ -144,3 +145,36 @@ def stop_jobs(queue, job_ids):
continue
stopped_job_ids.append(job_id)
return stopped_job_ids, failed_to_stop_job_ids


def reset_db_connections():
for c in connections.all():
c.close()


def configure_sentry(sentry_dsn, **options):
"""
Configure the Sentry client.
The **options kwargs are passed straight from the command
invocation - options relevant to Sentry configuration are
extracted.
In addition to the 'debug' and 'ca_certs' options, which can
be passed in as command options, we add the RqIntegration and
DjangoIntegration to the config.
Raises ImportError if the sentry_sdk is not available.
"""
import sentry_sdk
sentry_options = {
'debug': options.get('sentry_debug', False),
'ca_certs': options.get('sentry_ca_certs', None),
'integrations': [
sentry_sdk.integrations.redis.RedisIntegration(),
sentry_sdk.integrations.rq.RqIntegration(),
sentry_sdk.integrations.django.DjangoIntegration()
]
}
sentry_sdk.init(sentry_dsn, **sentry_options)

0 comments on commit 1a1cd3b

Please sign in to comment.