Skip to content

Commit

Permalink
celery
Browse files Browse the repository at this point in the history
  • Loading branch information
sentrivana committed Jun 23, 2023
1 parent 191b66a commit 0d2d15c
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 13 deletions.
19 changes: 17 additions & 2 deletions sentry_sdk/integrations/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from sentry_sdk.hub import Hub
from sentry_sdk.integrations import Integration, DidNotEnable
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK
from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, TRANSACTION_SOURCE_TASK
from sentry_sdk._types import TYPE_CHECKING
from sentry_sdk.utils import (
capture_internal_exceptions,
Expand Down Expand Up @@ -158,14 +158,29 @@ def apply_async(*args, **kwargs):
# Note: kwargs can contain headers=None, so no setdefault!
# Unsure which backend though.
kwarg_headers = kwargs.get("headers") or {}
existing_baggage = kwarg_headers.get(BAGGAGE_HEADER_NAME)
kwarg_headers.update(headers)
if existing_baggage:
kwarg_headers[BAGGAGE_HEADER_NAME] = "{},{}".format(
existing_baggage, headers[BAGGAGE_HEADER_NAME]
)

# https://github.com/celery/celery/issues/4875
#
# Need to setdefault the inner headers too since other
# tracing tools (dd-trace-py) also employ this exact
# workaround and we don't want to break them.
kwarg_headers.setdefault("headers", {}).update(headers)
kwarg_headers.setdefault("headers", {})
existing_baggage = kwarg_headers["headers"].get(
BAGGAGE_HEADER_NAME
)
kwarg_headers["headers"].update(headers)
if existing_baggage:
kwarg_headers["headers"][
BAGGAGE_HEADER_NAME
] = "{},{}".format(
existing_baggage, headers[BAGGAGE_HEADER_NAME]
)

# Add the Sentry options potentially added in `sentry_apply_entry`
# to the headers (done when auto-instrumenting Celery Beat tasks)
Expand Down
40 changes: 29 additions & 11 deletions tests/integrations/celery/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from celery import Celery, VERSION
from celery.bin import worker
from celery.signals import task_success

try:
from unittest import mock # python 3.3 and above
Expand Down Expand Up @@ -493,17 +492,36 @@ def test_task_headers(celery):
"sentry-monitor-check-in-id": "123abc",
}

@celery.task(name="dummy_task")
def dummy_task(x, y):
return x + y

def crons_task_success(sender, **kwargs):
headers = _get_headers(sender)
assert headers == sentry_crons_setup

task_success.connect(crons_task_success)
@celery.task(name="dummy_task", bind=True)
def dummy_task(self, x, y):
return _get_headers(self)

# This is how the Celery Beat auto-instrumentation starts a task
# in the monkey patched version of `apply_async`
# in `sentry_sdk/integrations/celery.py::_wrap_apply_async()`
dummy_task.apply_async(args=(1, 0), headers=sentry_crons_setup)
result = dummy_task.apply_async(args=(1, 0), headers=sentry_crons_setup)
assert result.get() == sentry_crons_setup


def test_baggage_propagation(init_celery):
celery = init_celery(traces_sample_rate=1.0, release="abcdef")

@celery.task(name="dummy_task", bind=True)
def dummy_task(self, x, y):
return _get_headers(self)

with start_transaction() as transaction:
result = dummy_task.apply_async(
args=(1, 0),
headers={"baggage": "custom=value", "headers": {"baggage": "custom=value"}},
).get()

assert sorted(result["baggage"].split(",")) == sorted(
[
"sentry-release=abcdef",
"sentry-trace_id={}".format(transaction.trace_id),
"sentry-environment=production",
"sentry-sample_rate=1.0",
"custom=value",
]
)

0 comments on commit 0d2d15c

Please sign in to comment.