From a2bc74099b93da752d2c87d8fc79aa95b0378db6 Mon Sep 17 00:00:00 2001 From: Olivier Leger Date: Thu, 8 Sep 2022 15:47:58 -0400 Subject: [PATCH] Move REST Services celery tasks to their own queue --- Dockerfile | 2 ++ docker/run_celery.bash | 5 +++-- docker/run_celery_low_priority.bash | 18 ++++++++++++++++++ kobo/apps/hook/tests/test_email.py | 2 +- kobo/apps/hook/utils.py | 4 +++- kobo/apps/hook/views/v2/hook.py | 4 +++- kobo/settings/base.py | 2 +- 7 files changed, 31 insertions(+), 6 deletions(-) create mode 100755 docker/run_celery_low_priority.bash diff --git a/Dockerfile b/Dockerfile index 2a4da5d4b1..3d52ee1425 100644 --- a/Dockerfile +++ b/Dockerfile @@ -44,6 +44,7 @@ RUN mkdir -p "${NGINX_STATIC_DIR}" && \ mkdir -p ${CELERY_PID_DIR} && \ mkdir -p ${SERVICES_DIR}/uwsgi && \ mkdir -p ${SERVICES_DIR}/celery && \ + mkdir -p ${SERVICES_DIR}/celery_low_priority && \ mkdir -p ${SERVICES_DIR}/celery_beat && \ mkdir -p "${INIT_PATH}" @@ -155,6 +156,7 @@ RUN rm -rf /etc/runit/runsvdir/default/getty-tty* # Create symlinks for runsv services RUN ln -s "${KPI_SRC_DIR}/docker/run_uwsgi.bash" "${SERVICES_DIR}/uwsgi/run" && \ ln -s "${KPI_SRC_DIR}/docker/run_celery.bash" "${SERVICES_DIR}/celery/run" && \ + ln -s "${KPI_SRC_DIR}/docker/run_celery_low_priority.bash" "${SERVICES_DIR}/celery_low_priority/run" && \ ln -s "${KPI_SRC_DIR}/docker/run_celery_beat.bash" "${SERVICES_DIR}/celery_beat/run" diff --git a/docker/run_celery.bash b/docker/run_celery.bash index d81a91ac51..99b07ce4bc 100755 --- a/docker/run_celery.bash +++ b/docker/run_celery.bash @@ -2,7 +2,7 @@ set -e source /etc/profile -# Run the main Celery worker (will not process `sync_kobocat_xforms` jobs). +# Run the main Celery worker (will not process low priority jobs). # Start 2 processes by default; this will be overridden later, in Python code, # according to the user's preference saved by django-constance cd "${KPI_SRC_DIR}" @@ -11,7 +11,8 @@ exec celery -A kobo worker --loglevel=info \ --hostname=kpi_main_worker@%h \ --logfile=${KPI_LOGS_DIR}/celery.log \ --pidfile=/tmp/celery.pid \ - --exclude-queues=sync_kobocat_xforms_queue \ + --queues=kpi_queue \ + --exclude-queues=kpi_low_priority_queue \ --uid=${UWSGI_USER} \ --gid=${UWSGI_GROUP} \ --autoscale 2,2 diff --git a/docker/run_celery_low_priority.bash b/docker/run_celery_low_priority.bash new file mode 100755 index 0000000000..6bc52cddb6 --- /dev/null +++ b/docker/run_celery_low_priority.bash @@ -0,0 +1,18 @@ +#!/bin/bash +set -e +source /etc/profile + +# Run the main Celery worker (will process only low priority jobs). +# Start 2 processes by default; this will be overridden later, in Python code, +# according to the user's preference saved by django-constance +cd "${KPI_SRC_DIR}" + +exec celery -A kobo worker --loglevel=info \ + --hostname=kpi_main_worker@%h \ + --logfile=${KPI_LOGS_DIR}/celery_low_priority.log \ + --pidfile=/tmp/celery_low_priority.pid \ + --queues=kpi_low_priority_queue \ + --exclude-queues=kpi_queue \ + --uid=${UWSGI_USER} \ + --gid=${UWSGI_GROUP} \ + --autoscale 2,2 diff --git a/kobo/apps/hook/tests/test_email.py b/kobo/apps/hook/tests/test_email.py index 9ae8efb07e..2b1f7d0fdf 100644 --- a/kobo/apps/hook/tests/test_email.py +++ b/kobo/apps/hook/tests/test_email.py @@ -34,7 +34,7 @@ def _create_periodic_task(self): def test_notifications(self): self._create_periodic_task() first_log_response = self._send_and_fail() - failures_reports.delay() + failures_reports.apply_async(queue='kpi_low_priority_queue') self.assertEqual(len(mail.outbox), 1) expected_record = { diff --git a/kobo/apps/hook/utils.py b/kobo/apps/hook/utils.py index c3d94ed1cf..55fe5760e0 100644 --- a/kobo/apps/hook/utils.py +++ b/kobo/apps/hook/utils.py @@ -25,6 +25,8 @@ def call_services(asset: 'kpi.models.asset.Asset', submission_id: int): submission_id=submission_id, hook_id=hook_id ).exists(): success = True - service_definition_task.delay(hook_id, submission_id) + service_definition_task.apply_async( + queue='kpi_low_priority_queue', args=(hook_id, submission_id) + ) return success diff --git a/kobo/apps/hook/views/v2/hook.py b/kobo/apps/hook/views/v2/hook.py index 12d7742917..9c4f795b0d 100644 --- a/kobo/apps/hook/views/v2/hook.py +++ b/kobo/apps/hook/views/v2/hook.py @@ -192,7 +192,9 @@ def retry(self, request, uid=None, *args, **kwargs): # Mark all logs as PENDING HookLog.objects.filter(id__in=hooklogs_ids).update(status=HOOK_LOG_PENDING) # Delegate to Celery - retry_all_task.delay(hooklogs_ids) + retry_all_task.apply_async( + queue='kpi_low_priority_queue', args=(hooklogs_ids,) + ) response.update({ "pending_uids": hooklogs_uids }) diff --git a/kobo/settings/base.py b/kobo/settings/base.py index bfb342ef0c..05f1df0353 100644 --- a/kobo/settings/base.py +++ b/kobo/settings/base.py @@ -613,7 +613,7 @@ def __init__(self, *args, **kwargs): "send-hooks-failures-reports": { "task": "kobo.apps.hook.tasks.failures_reports", "schedule": crontab(hour=0, minute=0), - 'options': {'queue': 'kpi_queue'} + 'options': {'queue': 'kpi_low_priority_queue'} }, }