From 625e1b3608862f68295006edee00d0d0916787f2 Mon Sep 17 00:00:00 2001 From: Ivana Kellyerova Date: Tue, 27 Jun 2023 11:21:00 +0200 Subject: [PATCH] Do not overwrite existing baggage on outgoing requests (#2191) --- sentry_sdk/integrations/celery.py | 19 ++++++++++- sentry_sdk/integrations/httpx.py | 17 ++++++++-- tests/integrations/celery/test_celery.py | 42 +++++++++++++++++------- tests/integrations/httpx/test_httpx.py | 40 ++++++++++++++++++++++ 4 files changed, 103 insertions(+), 15 deletions(-) diff --git a/sentry_sdk/integrations/celery.py b/sentry_sdk/integrations/celery.py index 741a2c8bb7..443fcdad45 100644 --- a/sentry_sdk/integrations/celery.py +++ b/sentry_sdk/integrations/celery.py @@ -11,7 +11,7 @@ from sentry_sdk.hub import Hub from sentry_sdk.integrations import Integration, DidNotEnable from sentry_sdk.integrations.logging import ignore_logger -from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK +from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, TRANSACTION_SOURCE_TASK from sentry_sdk._types import TYPE_CHECKING from sentry_sdk.utils import ( capture_internal_exceptions, @@ -158,7 +158,20 @@ def apply_async(*args, **kwargs): # Note: kwargs can contain headers=None, so no setdefault! # Unsure which backend though. kwarg_headers = kwargs.get("headers") or {} + + existing_baggage = kwarg_headers.get(BAGGAGE_HEADER_NAME) + sentry_baggage = headers.get(BAGGAGE_HEADER_NAME) + + combined_baggage = sentry_baggage or existing_baggage + if sentry_baggage and existing_baggage: + combined_baggage = "{},{}".format( + existing_baggage, + sentry_baggage, + ) + kwarg_headers.update(headers) + if combined_baggage: + kwarg_headers[BAGGAGE_HEADER_NAME] = combined_baggage # https://github.com/celery/celery/issues/4875 # @@ -166,6 +179,10 @@ def apply_async(*args, **kwargs): # tracing tools (dd-trace-py) also employ this exact # workaround and we don't want to break them. kwarg_headers.setdefault("headers", {}).update(headers) + if combined_baggage: + kwarg_headers["headers"][ + BAGGAGE_HEADER_NAME + ] = combined_baggage # Add the Sentry options potentially added in `sentry_apply_entry` # to the headers (done when auto-instrumenting Celery Beat tasks) diff --git a/sentry_sdk/integrations/httpx.py b/sentry_sdk/integrations/httpx.py index e84a28d165..04db5047b4 100644 --- a/sentry_sdk/integrations/httpx.py +++ b/sentry_sdk/integrations/httpx.py @@ -1,6 +1,7 @@ from sentry_sdk import Hub from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations import Integration, DidNotEnable +from sentry_sdk.tracing import BAGGAGE_HEADER_NAME from sentry_sdk.tracing_utils import should_propagate_trace from sentry_sdk.utils import ( SENSITIVE_DATA_SUBSTITUTE, @@ -72,7 +73,13 @@ def send(self, request, **kwargs): key=key, value=value, url=request.url ) ) - request.headers[key] = value + if key == BAGGAGE_HEADER_NAME and request.headers.get( + BAGGAGE_HEADER_NAME + ): + # do not overwrite any existing baggage, just append to it + request.headers[key] += "," + value + else: + request.headers[key] = value rv = real_send(self, request, **kwargs) @@ -119,7 +126,13 @@ async def send(self, request, **kwargs): key=key, value=value, url=request.url ) ) - request.headers[key] = value + if key == BAGGAGE_HEADER_NAME and request.headers.get( + BAGGAGE_HEADER_NAME + ): + # do not overwrite any existing baggage, just append to it + request.headers[key] += "," + value + else: + request.headers[key] = value rv = await real_send(self, request, **kwargs) diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index d120d34a12..304f6c2f04 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -11,7 +11,6 @@ from celery import Celery, VERSION from celery.bin import worker -from celery.signals import task_success try: from unittest import mock # python 3.3 and above @@ -360,7 +359,7 @@ def dummy_task(self): # TODO: This test is hanging when running test with `tox --parallel auto`. Find out why and fix it! @pytest.mark.skip @pytest.mark.forked -def test_redis_backend_trace_propagation(init_celery, capture_events_forksafe, tmpdir): +def test_redis_backend_trace_propagation(init_celery, capture_events_forksafe): celery = init_celery(traces_sample_rate=1.0, backend="redis", debug=True) events = capture_events_forksafe() @@ -493,17 +492,36 @@ def test_task_headers(celery): "sentry-monitor-check-in-id": "123abc", } - @celery.task(name="dummy_task") - def dummy_task(x, y): - return x + y - - def crons_task_success(sender, **kwargs): - headers = _get_headers(sender) - assert headers == sentry_crons_setup - - task_success.connect(crons_task_success) + @celery.task(name="dummy_task", bind=True) + def dummy_task(self, x, y): + return _get_headers(self) # This is how the Celery Beat auto-instrumentation starts a task # in the monkey patched version of `apply_async` # in `sentry_sdk/integrations/celery.py::_wrap_apply_async()` - dummy_task.apply_async(args=(1, 0), headers=sentry_crons_setup) + result = dummy_task.apply_async(args=(1, 0), headers=sentry_crons_setup) + assert result.get() == sentry_crons_setup + + +def test_baggage_propagation(init_celery): + celery = init_celery(traces_sample_rate=1.0, release="abcdef") + + @celery.task(name="dummy_task", bind=True) + def dummy_task(self, x, y): + return _get_headers(self) + + with start_transaction() as transaction: + result = dummy_task.apply_async( + args=(1, 0), + headers={"baggage": "custom=value"}, + ).get() + + assert sorted(result["baggage"].split(",")) == sorted( + [ + "sentry-release=abcdef", + "sentry-trace_id={}".format(transaction.trace_id), + "sentry-environment=production", + "sentry-sample_rate=1.0", + "custom=value", + ] + ) diff --git a/tests/integrations/httpx/test_httpx.py b/tests/integrations/httpx/test_httpx.py index 72188a23e3..9b7842fbb7 100644 --- a/tests/integrations/httpx/test_httpx.py +++ b/tests/integrations/httpx/test_httpx.py @@ -89,6 +89,46 @@ def test_outgoing_trace_headers(sentry_init, httpx_client): ) +@pytest.mark.parametrize( + "httpx_client", + (httpx.Client(), httpx.AsyncClient()), +) +def test_outgoing_trace_headers_append_to_baggage(sentry_init, httpx_client): + sentry_init( + traces_sample_rate=1.0, + integrations=[HttpxIntegration()], + release="d08ebdb9309e1b004c6f52202de58a09c2268e42", + ) + + url = "http://example.com/" + responses.add(responses.GET, url, status=200) + + with start_transaction( + name="/interactions/other-dogs/new-dog", + op="greeting.sniff", + trace_id="01234567890123456789012345678901", + ) as transaction: + if asyncio.iscoroutinefunction(httpx_client.get): + response = asyncio.get_event_loop().run_until_complete( + httpx_client.get(url, headers={"baGGage": "custom=data"}) + ) + else: + response = httpx_client.get(url, headers={"baGGage": "custom=data"}) + + request_span = transaction._span_recorder.spans[-1] + assert response.request.headers[ + "sentry-trace" + ] == "{trace_id}-{parent_span_id}-{sampled}".format( + trace_id=transaction.trace_id, + parent_span_id=request_span.span_id, + sampled=1, + ) + assert ( + response.request.headers["baggage"] + == "custom=data,sentry-trace_id=01234567890123456789012345678901,sentry-environment=production,sentry-release=d08ebdb9309e1b004c6f52202de58a09c2268e42,sentry-transaction=/interactions/other-dogs/new-dog,sentry-sample_rate=1.0" + ) + + @pytest.mark.parametrize( "httpx_client,trace_propagation_targets,url,trace_propagated", [