Skip to content

Commit

Permalink
temp: Possible repro branch for DD trace concatenation
Browse files Browse the repository at this point in the history
- Convert `/heartbeat` view into a celery test
- Send Celery tasks to a broker, rather than running in-process
- Hardcode a broker URL
- Log all celery signals

See edx/edx-arch-experiments#692
  • Loading branch information
timmc-edx committed Sep 13, 2024
1 parent d9c4eb2 commit 441698f
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 23 deletions.
6 changes: 0 additions & 6 deletions lms/envs/devstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@

SESSION_COOKIE_NAME = 'lms_sessionid'

# By default don't use a worker, execute tasks as if they were local functions
CELERY_ALWAYS_EAGER = True
# When the celery task is run eagerly, it is executed locally while sharing the
# thread and its request cache with the active Django Request. In that case,
# do not clear the cache.
CLEAR_REQUEST_CACHE_ON_TASK_COMPLETION = False
HTTPS = 'off'

LMS_ROOT_URL = f'http://{LMS_BASE}'
Expand Down
25 changes: 9 additions & 16 deletions openedx/core/djangoapps/heartbeat/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,19 @@

log = logging.getLogger(__name__)

import ddtrace.tracer
from celery import shared_task

@shared_task
def sample_task():
print("📋📋📋 Task is running in this process.")

def heartbeat(request):
"""
Simple view that a loadbalancer can check to verify that the app is up. Returns a json doc
of service id: status or message. If the status for any service is anything other than True,
it returns HTTP code 503 (Service Unavailable); otherwise, it returns 200.
"""
check_results = {}
try:
check_results = runchecks('extended' in request.GET)

status_code = 200 # Default to OK
for check in check_results:
if not check_results[check]['status']:
status_code = 503 # 503 on any failure
except Exception as e: # lint-amnesty, pylint: disable=broad-except
status_code = 503
check_results = {'error': str(e)}

if status_code == 503:
log.error('Heartbeat check failed (%s): %s', status_code, check_results)

return JsonResponse(check_results, status=status_code)
print(f"ℹ️ Current root span: {ddtrace.tracer.current_root_span()._pprint()}")
sample_task.apply_async()
return JsonResponse({}, status=200)
26 changes: 25 additions & 1 deletion openedx/core/lib/celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,34 @@

# WARNING: Do not refer to this unless you are cms.celery or
# lms.celery. See module docstring!
APP = Celery('proj')
APP = Celery('proj', broker="redis://:[email protected]:6379/10")

APP.conf.task_protocol = 1
# Using a string here means the worker will not have to
# pickle the object when using Windows.
APP.config_from_object('django.conf:settings')
APP.autodiscover_tasks()


import celery.signals

CELERY_SIGNAL_NAMES = [
'before_task_publish',
'after_task_publish',
'task_prerun',
'task_postrun',
'task_retry',
'task_success',
'task_failure',
'task_internal_error',
'task_received',
'task_revoked',
'task_unknown',
'task_rejected',
]

def log_celery_signal(*args, **kwargs):
print(f"⚠️⚠️⚠️ Celery signal received: {args=} {kwargs=}")

for signal_name in CELERY_SIGNAL_NAMES:
getattr(celery.signals, signal_name).connect(log_celery_signal, weak=False)

0 comments on commit 441698f

Please sign in to comment.