From 0eaa9d019073a5ce8ece6214cbf3d246ee515b27 Mon Sep 17 00:00:00 2001 From: Tim McCormack Date: Fri, 13 Sep 2024 16:00:35 +0000 Subject: [PATCH] temp: Possible repro branch for DD trace concatenation - Add `/celery_repro` URL to run a sample task - Send Celery tasks to a broker, rather than running in-process - Hardcode a broker URL - Log all celery signals See https://github.com/edx/edx-arch-experiments/issues/692 --- lms/envs/devstack.py | 6 ----- lms/urls.py | 1 + .../core/djangoapps/celery_repro/__init__.py | 0 openedx/core/djangoapps/celery_repro/urls.py | 6 +++++ openedx/core/djangoapps/celery_repro/views.py | 14 ++++++++++ openedx/core/lib/celery/__init__.py | 26 ++++++++++++++++++- 6 files changed, 46 insertions(+), 7 deletions(-) create mode 100644 openedx/core/djangoapps/celery_repro/__init__.py create mode 100644 openedx/core/djangoapps/celery_repro/urls.py create mode 100644 openedx/core/djangoapps/celery_repro/views.py diff --git a/lms/envs/devstack.py b/lms/envs/devstack.py index bcb919fbaf83..e1ed9dac0980 100644 --- a/lms/envs/devstack.py +++ b/lms/envs/devstack.py @@ -29,12 +29,6 @@ SESSION_COOKIE_NAME = 'lms_sessionid' -# By default don't use a worker, execute tasks as if they were local functions -CELERY_ALWAYS_EAGER = True -# When the celery task is run eagerly, it is executed locally while sharing the -# thread and its request cache with the active Django Request. In that case, -# do not clear the cache. -CLEAR_REQUEST_CACHE_ON_TASK_COMPLETION = False HTTPS = 'off' LMS_ROOT_URL = f'http://{LMS_BASE}' diff --git a/lms/urls.py b/lms/urls.py index f97bc7f0d04e..e40d58350cc1 100644 --- a/lms/urls.py +++ b/lms/urls.py @@ -108,6 +108,7 @@ path('', include('lms.djangoapps.static_template_view.urls')), path('heartbeat', include('openedx.core.djangoapps.heartbeat.urls')), + path('celery_repro', include('openedx.core.djangoapps.celery_repro.urls')), path('i18n/', include('django.conf.urls.i18n')), diff --git a/openedx/core/djangoapps/celery_repro/__init__.py b/openedx/core/djangoapps/celery_repro/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/openedx/core/djangoapps/celery_repro/urls.py b/openedx/core/djangoapps/celery_repro/urls.py new file mode 100644 index 000000000000..9a0bc228afb7 --- /dev/null +++ b/openedx/core/djangoapps/celery_repro/urls.py @@ -0,0 +1,6 @@ +from django.urls import path +from openedx.core.djangoapps.celery_repro.views import run_celery_task + +urlpatterns = [ + path('', run_celery_task, name='celery_repro'), +] diff --git a/openedx/core/djangoapps/celery_repro/views.py b/openedx/core/djangoapps/celery_repro/views.py new file mode 100644 index 000000000000..b36145c54334 --- /dev/null +++ b/openedx/core/djangoapps/celery_repro/views.py @@ -0,0 +1,14 @@ +import ddtrace.tracer +from celery import shared_task +from common.djangoapps.util.json_request import JsonResponse + + +@shared_task +def sample_task(): + print("📋📋📋 Task is running in this process.") + + +def run_celery_task(request): + print(f"ℹī¸ Current root span: {ddtrace.tracer.current_root_span()._pprint()}") + sample_task.apply_async() + return JsonResponse({}, status=200) diff --git a/openedx/core/lib/celery/__init__.py b/openedx/core/lib/celery/__init__.py index 855970c1df3e..54ee026c8a47 100644 --- a/openedx/core/lib/celery/__init__.py +++ b/openedx/core/lib/celery/__init__.py @@ -21,10 +21,34 @@ # WARNING: Do not refer to this unless you are cms.celery or # lms.celery. See module docstring! -APP = Celery('proj') +APP = Celery('proj', broker="redis://:password@edx.devstack.redis:6379/10") APP.conf.task_protocol = 1 # Using a string here means the worker will not have to # pickle the object when using Windows. APP.config_from_object('django.conf:settings') APP.autodiscover_tasks() + + +import celery.signals + +CELERY_SIGNAL_NAMES = [ + 'before_task_publish', + 'after_task_publish', + 'task_prerun', + 'task_postrun', + 'task_retry', + 'task_success', + 'task_failure', + 'task_internal_error', + 'task_received', + 'task_revoked', + 'task_unknown', + 'task_rejected', +] + +def log_celery_signal(*args, **kwargs): + print(f"⚠ī¸âš ī¸âš ī¸ Celery signal received: {args=} {kwargs=}") + +for signal_name in CELERY_SIGNAL_NAMES: + getattr(celery.signals, signal_name).connect(log_celery_signal, weak=False)