Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(celery): stop closing prerun_span too soon to account for Celery chains scenario [backport 2.16] #11804

Closed
wants to merge 2 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix(celery): stop closing prerun_span too soon to account for Celery …
…chains scenario (#11498)

We've made a few changes to handle celery context recently, including:
#10676

In particular the goal of
#10676 was to handle a
scenario where a long running task may run into an exception, preventing
it from closing.

Unfortunately, this scenario did not account for cases where tasks are
chained and may not close until later.

See: #11479 and
#11624

With this PR, the sample app in
#11479 would attach the
celery specific span back to the root span.

I also need to add tests for the chains scenario.

Related to AIDM-494

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

(cherry picked from commit e8aab65)
wantsui authored and github-actions[bot] committed Dec 19, 2024
commit c8e7e0b2a84251952aebf205c3e71f1292597bfc
11 changes: 0 additions & 11 deletions ddtrace/contrib/internal/celery/app.py
Original file line number Diff line number Diff line change
@@ -133,10 +133,6 @@ def _traced_apply_async_inner(func, instance, args, kwargs):
if task_span:
task_span.set_exc_info(*sys.exc_info())

prerun_span = core.get_item("prerun_span")
if prerun_span:
prerun_span.set_exc_info(*sys.exc_info())

raise
finally:
task_span = core.get_item("task_span")
@@ -147,11 +143,4 @@ def _traced_apply_async_inner(func, instance, args, kwargs):
)
task_span.finish()

prerun_span = core.get_item("prerun_span")
if prerun_span:
log.debug(
"The task_postrun signal was not called, so manually closing span: %s", prerun_span._pprint()
)
prerun_span.finish()

return _traced_apply_async_inner
3 changes: 0 additions & 3 deletions ddtrace/contrib/internal/celery/signals.py
Original file line number Diff line number Diff line change
@@ -49,9 +49,6 @@ def trace_prerun(*args, **kwargs):
service = config.celery["worker_service_name"]
span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER)

# Store an item called "prerun span" in case task_postrun doesn't get called
core.set_item("prerun_span", span)

# set span.kind to the type of request being performed
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
tracing(celery): Fixes an issue where ``celery.apply`` spans from Celery prerun got closed too soon leading to span tags being missing.
5 changes: 5 additions & 0 deletions tests/contrib/celery/run_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from tasks import fn_a
from tasks import fn_b


(fn_a.si() | fn_b.si()).delay()
14 changes: 14 additions & 0 deletions tests/contrib/celery/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from celery import Celery


app = Celery("tasks")


@app.task(name="tests.contrib.celery.tasks.fn_a")
def fn_a():
return "a"


@app.task(name="tests.contrib.celery.tasks.fn_b")
def fn_b():
return "b"
62 changes: 62 additions & 0 deletions tests/contrib/celery/test_chained_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import os
import re
import subprocess
import time

from celery import Celery


# Ensure that when we call Celery chains, the root span has celery specific span tags
# The test_integration.py setup doesn't perfectly mimic the condition of a worker process running.
# This test runs the worker as a side so we can check the tracer logs afterwards to ensure expected span results.
# See https://github.com/DataDog/dd-trace-py/issues/11479
def test_task_chain_task_call_task():
app = Celery("tasks")

celery_worker_cmd = "ddtrace-run celery -A tasks worker -c 1 -l DEBUG -n uniquename1 -P solo"
celery_task_runner_cmd = "ddtrace-run python run_tasks.py"

# The commands need to run from the directory where this test file lives
current_directory = str(os.path.dirname(__file__))

worker_process = subprocess.Popen(
celery_worker_cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
close_fds=True,
cwd=current_directory,
)

max_wait_time = 10
waited_so_far = 0
# {app.control.inspect().active() returns {'celery@uniquename1': []} when the worker is running}
while app.control.inspect().active() is None and waited_so_far < max_wait_time:
time.sleep(1)
waited_so_far += 1

# The task should only run after the Celery worker has sufficient time to start up
task_runner_process = subprocess.Popen(
celery_task_runner_cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
close_fds=True,
cwd=current_directory,
)

task_runner_process.wait()
# Kill the process so it starts to send traces to the Trace Agent
worker_process.kill()
worker_logs = worker_process.stderr.read()

# Check that the root span was created with one of the Celery specific tags, such as celery.correlation_id
# Some versions of python seem to require escaping when using `re.search`:
old_pattern_match = r"resource=\\'tests.contrib.celery.tasks.fn_a\\' type=\\'worker\\' .* tags=.*correlation_id.*"
new_pattern_match = r"resource=\'tests.contrib.celery.tasks.fn_a\' type=\'worker\' .* tags=.*correlation_id.*"

pattern_exists = (
re.search(old_pattern_match, str(worker_logs)) is not None
or re.search(new_pattern_match, str(worker_logs)) is not None
)
assert pattern_exists is not None