diff --git a/newrelic/hooks/messagebroker_confluentkafka.py b/newrelic/hooks/messagebroker_confluentkafka.py index 81d9fa59a..b7c70a129 100644 --- a/newrelic/hooks/messagebroker_confluentkafka.py +++ b/newrelic/hooks/messagebroker_confluentkafka.py @@ -65,10 +65,12 @@ def wrap_Producer_produce(wrapped, instance, args, kwargs): destination_type="Topic", destination_name=topic or "Default", source=wrapped, - ) as trace: - dt_headers = {k: v.encode("utf-8") for k, v in trace.generate_request_headers(transaction)} + ): + dt_headers = {k: v.encode("utf-8") for k, v in MessageTrace.generate_request_headers(transaction)} # headers can be a list of tuples or a dict so convert to dict for consistency. - dt_headers.update(dict(headers) if headers else {}) + if headers: + dt_headers.update(dict(headers)) + try: return wrapped(topic, headers=dt_headers, *args, **kwargs) except Exception as error: diff --git a/newrelic/hooks/messagebroker_kafkapython.py b/newrelic/hooks/messagebroker_kafkapython.py index 9124a16dc..dff5e2c78 100644 --- a/newrelic/hooks/messagebroker_kafkapython.py +++ b/newrelic/hooks/messagebroker_kafkapython.py @@ -58,11 +58,16 @@ def wrap_KafkaProducer_send(wrapped, instance, args, kwargs): destination_name=topic or "Default", source=wrapped, terminal=False, - ) as trace: - dt_headers = [(k, v.encode("utf-8")) for k, v in trace.generate_request_headers(transaction)] - headers.extend(dt_headers) + ): + dt_headers = [(k, v.encode("utf-8")) for k, v in MessageTrace.generate_request_headers(transaction)] + # headers can be a list of tuples or a dict so convert to dict for consistency. + if headers: + dt_headers.extend(headers) + try: - return wrapped(topic, value=value, key=key, headers=headers, partition=partition, timestamp_ms=timestamp_ms) + return wrapped( + topic, value=value, key=key, headers=dt_headers, partition=partition, timestamp_ms=timestamp_ms + ) except Exception: notice_error() raise diff --git a/tests/messagebroker_confluentkafka/test_producer.py b/tests/messagebroker_confluentkafka/test_producer.py index 2b3e74e7a..fe33794fa 100644 --- a/tests/messagebroker_confluentkafka/test_producer.py +++ b/tests/messagebroker_confluentkafka/test_producer.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time import threading +import time import pytest from conftest import cache_kafka_producer_headers @@ -28,6 +28,7 @@ ) from newrelic.api.background_task import background_task +from newrelic.api.function_trace import FunctionTrace from newrelic.common.object_names import callable_name from newrelic.packages import six @@ -137,6 +138,25 @@ def test(): test() +def test_distributed_tracing_headers_under_terminal(topic, send_producer_message): + @validate_transaction_metrics( + "test_distributed_tracing_headers_under_terminal", + rollup_metrics=[ + ("Supportability/TraceContext/Create/Success", 1), + ("Supportability/DistributedTrace/CreatePayload/Success", 1), + ], + background_task=True, + ) + @background_task(name="test_distributed_tracing_headers_under_terminal") + @cache_kafka_producer_headers() + @validate_messagebroker_headers + def test(): + with FunctionTrace(name="terminal_trace", terminal=True): + send_producer_message() + + test() + + def test_producer_errors(topic, producer, monkeypatch): if hasattr(producer, "_value_serializer"): # Remove serializer to intentionally cause a type error in underlying producer implementation diff --git a/tests/messagebroker_kafkapython/test_producer.py b/tests/messagebroker_kafkapython/test_producer.py index 53a31dce5..418ea4b40 100644 --- a/tests/messagebroker_kafkapython/test_producer.py +++ b/tests/messagebroker_kafkapython/test_producer.py @@ -25,6 +25,7 @@ ) from newrelic.api.background_task import background_task +from newrelic.api.function_trace import FunctionTrace from newrelic.common.object_names import callable_name from newrelic.packages import six @@ -70,6 +71,25 @@ def test(): test() +def test_distributed_tracing_headers_under_terminal(topic, send_producer_message): + @validate_transaction_metrics( + "test_distributed_tracing_headers_under_terminal", + rollup_metrics=[ + ("Supportability/TraceContext/Create/Success", 1), + ("Supportability/DistributedTrace/CreatePayload/Success", 1), + ], + background_task=True, + ) + @background_task(name="test_distributed_tracing_headers_under_terminal") + @cache_kafka_producer_headers + @validate_messagebroker_headers + def test(): + with FunctionTrace(name="terminal_trace", terminal=True): + send_producer_message() + + test() + + def test_producer_errors(topic, producer, monkeypatch): monkeypatch.setitem(producer.config, "value_serializer", None) monkeypatch.setitem(producer.config, "key_serializer", None)