From a8045672763fdbfedb1251629d47b4984f6e3f1d Mon Sep 17 00:00:00 2001 From: "Gabriele N. Tornetta" Date: Fri, 4 Oct 2024 11:43:52 +0100 Subject: [PATCH] chore(di): trigger probes We implement trigger probes. These allows triggering the capture of debug information along a trace, ensuring all the relevant probes are also triggered. --- ddtrace/debugging/_origin/span.py | 109 ++++++++++++++++------------ ddtrace/debugging/_session.py | 30 ++++++++ ddtrace/debugging/_uploader.py | 1 + tests/debugging/origin/test_span.py | 53 +++++++++++++- 4 files changed, 145 insertions(+), 48 deletions(-) create mode 100644 ddtrace/debugging/_session.py diff --git a/ddtrace/debugging/_origin/span.py b/ddtrace/debugging/_origin/span.py index a373608b43a..8a3e5246dba 100644 --- a/ddtrace/debugging/_origin/span.py +++ b/ddtrace/debugging/_origin/span.py @@ -1,28 +1,27 @@ from dataclasses import dataclass +from functools import partial from itertools import count from pathlib import Path import sys - -# from threading import current_thread +from threading import current_thread from types import FrameType from types import FunctionType import typing as t import uuid import ddtrace - -# from ddtrace import config from ddtrace._trace.processor import SpanProcessor - -# from ddtrace.debugging._debugger import Debugger from ddtrace.debugging._probe.model import DEFAULT_CAPTURE_LIMITS from ddtrace.debugging._probe.model import LiteralTemplateSegment from ddtrace.debugging._probe.model import LogFunctionProbe from ddtrace.debugging._probe.model import LogLineProbe from ddtrace.debugging._probe.model import ProbeEvaluateTimingForMethod +from ddtrace.debugging._session import Session +from ddtrace.debugging._signal.collector import SignalCollector from ddtrace.debugging._signal.collector import SignalContext - -# from ddtrace.debugging._signal.snapshot import Snapshot +from ddtrace.debugging._signal.snapshot import Snapshot +from ddtrace.debugging._uploader import LogsIntakeUploaderV1 +from ddtrace.debugging._uploader import UploaderProduct from ddtrace.ext import EXIT_SPAN_TYPES from ddtrace.internal import compat from ddtrace.internal import core @@ -40,13 +39,13 @@ def frame_stack(frame: FrameType) -> t.Iterator[FrameType]: _frame = _frame.f_back -def wrap_entrypoint(f: t.Callable) -> None: +def wrap_entrypoint(collector: SignalCollector, f: t.Callable) -> None: if not _isinstance(f, FunctionType): return _f = t.cast(FunctionType, f) if not EntrySpanWrappingContext.is_wrapped(_f): - EntrySpanWrappingContext(_f).wrap() + EntrySpanWrappingContext(collector, _f).wrap() @dataclass @@ -120,9 +119,11 @@ class EntrySpanLocation: class EntrySpanWrappingContext(WrappingContext): - def __init__(self, f): + def __init__(self, collector: SignalCollector, f: FunctionType) -> None: super().__init__(f) + self.collector = collector + filename = str(Path(f.__code__.co_filename).resolve()) name = f.__qualname__ module = f.__module__ @@ -152,25 +153,26 @@ def __enter__(self): s.set_tag_str("_dd.code_origin.frames.0.type", location.module) s.set_tag_str("_dd.code_origin.frames.0.method", location.name) - # TODO[gab]: This will be enabled as part of the live debugger/distributed debugging - # if ld_config.enabled: - # # Create a snapshot - # snapshot = Snapshot( - # probe=location.probe, - # frame=self.__frame__, - # thread=current_thread(), - # trace_context=root, - # ) + # Check if we have a debugging session running for the current trace + session = Session.get_for_trace() + if session is not None and session.level >= 2: + # Create a snapshot + snapshot = Snapshot( + probe=location.probe, + frame=self.__frame__, + thread=current_thread(), + trace_context=root, + ) - # # Capture on entry - # context = Debugger.get_collector().attach(snapshot) + # Capture on entry + context = self.collector.attach(snapshot) - # # Correlate the snapshot with the span - # root.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid) - # span.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid) + # Correlate the snapshot with the span + root.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid) + span.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid) - # self.set("context", context) - # self.set("start_time", compat.monotonic_ns()) + self.set("context", context) + self.set("start_time", compat.monotonic_ns()) return self @@ -194,6 +196,8 @@ def __exit__(self, exc_type, exc_value, traceback): @dataclass class SpanCodeOriginProcessor(SpanProcessor): + __uploader__ = LogsIntakeUploaderV1 + _instance: t.Optional["SpanCodeOriginProcessor"] = None def on_span_start(self, span: Span) -> None: @@ -219,24 +223,25 @@ def on_span_start(self, span: Span) -> None: # DEV: Without a function object we cannot infer the function # and any potential class name. - # TODO[gab]: This will be enabled as part of the live debugger/distributed debugging - # if ld_config.enabled: - # # Create a snapshot - # snapshot = Snapshot( - # probe=ExitSpanProbe.from_frame(frame), - # frame=frame, - # thread=current_thread(), - # trace_context=span, - # ) + # Check if we have a debugging session running for the current trace + session = Session.get_for_trace() + if session is not None and session.level >= 2: + # Create a snapshot + snapshot = Snapshot( + probe=ExitSpanProbe.from_frame(frame), + frame=frame, + thread=current_thread(), + trace_context=span, + ) - # # Capture on entry - # snapshot.line() + # Capture on entry + snapshot.line() - # # Collect - # Debugger.get_collector().push(snapshot) + # Collect + self.__uploader__.get_collector().push(snapshot) - # # Correlate the snapshot with the span - # span.set_tag_str(f"_dd.code_origin.frames.{n}.snapshot_id", snapshot.uuid) + # Correlate the snapshot with the span + span.set_tag_str(f"_dd.code_origin.frames.{n}.snapshot_id", snapshot.uuid) def on_span_finish(self, span: Span) -> None: pass @@ -246,17 +251,29 @@ def enable(cls): if cls._instance is not None: return - core.on("service_entrypoint.patch", wrap_entrypoint) - instance = cls._instance = cls() + + # Register code origin for span with the snapshot uploader + cls.__uploader__.register(UploaderProduct.CODE_ORIGIN_SPAN) + + # Register the processor for exit spans instance.register() + # Register the entrypoint wrapping for entry spans + core.on("service_entrypoint.patch", partial(wrap_entrypoint, cls.__uploader__.get_collector())) + @classmethod def disable(cls): if cls._instance is None: return + # Unregister the entrypoint wrapping for entry spans + core.reset_listeners("service_entrypoint.patch", wrap_entrypoint) + + # Unregister the processor for exit spans cls._instance.unregister() - cls._instance = None - core.reset_listeners("service_entrypoint.patch", wrap_entrypoint) + # Unregister code origin for span with the snapshot uploader + cls.__uploader__.unregister(UploaderProduct.CODE_ORIGIN_SPAN) + + cls._instance = None diff --git a/ddtrace/debugging/_session.py b/ddtrace/debugging/_session.py new file mode 100644 index 00000000000..c6a9cab1595 --- /dev/null +++ b/ddtrace/debugging/_session.py @@ -0,0 +1,30 @@ +from dataclasses import dataclass +import typing as t +from weakref import WeakKeyDictionary as wkdict + +from ddtrace import tracer + + +@dataclass +class Session: + ident: str + level: int + + def link_to_trace(self): + SessionManager.link_session_to_trace(self) + + @classmethod + def get_for_trace(cls) -> t.Optional["Session"]: + return SessionManager.get_session_for_trace() + + +class SessionManager: + _session_trace_map: wkdict = wkdict() # Span to Session mapping + + @classmethod + def link_session_to_trace(cls, session) -> None: + cls._session_trace_map[tracer.current_root_span()] = session + + @classmethod + def get_session_for_trace(cls) -> t.Optional[Session]: + return cls._session_trace_map.get(tracer.current_root_span()) diff --git a/ddtrace/debugging/_uploader.py b/ddtrace/debugging/_uploader.py index f2a3dae8d68..7f6226f7192 100644 --- a/ddtrace/debugging/_uploader.py +++ b/ddtrace/debugging/_uploader.py @@ -26,6 +26,7 @@ class UploaderProduct(str, Enum): DEBUGGER = "dynamic_instrumentation" EXCEPTION_REPLAY = "exception_replay" + CODE_ORIGIN_SPAN = "code_origin.span" class LogsIntakeUploaderV1(ForksafeAwakeablePeriodicService): diff --git a/tests/debugging/origin/test_span.py b/tests/debugging/origin/test_span.py index b443f784d2a..67570f97710 100644 --- a/tests/debugging/origin/test_span.py +++ b/tests/debugging/origin/test_span.py @@ -1,25 +1,36 @@ from pathlib import Path +import typing as t import ddtrace from ddtrace.debugging._origin.span import SpanCodeOriginProcessor +from ddtrace.debugging._session import Session from ddtrace.ext import SpanTypes from ddtrace.internal import core +from tests.debugging.mocking import MockLogsIntakeUploaderV1 from tests.utils import TracerTestCase +class MockSpanCodeOriginProcessor(SpanCodeOriginProcessor): + __uploader__ = MockLogsIntakeUploaderV1 + + @classmethod + def get_uploader(cls) -> MockLogsIntakeUploaderV1: + return t.cast(MockLogsIntakeUploaderV1, cls.__uploader__._instance) + + class SpanProbeTestCase(TracerTestCase): def setUp(self): super(SpanProbeTestCase, self).setUp() self.backup_tracer = ddtrace.tracer ddtrace.tracer = self.tracer - SpanCodeOriginProcessor.enable() + MockSpanCodeOriginProcessor.enable() def tearDown(self): ddtrace.tracer = self.backup_tracer super(SpanProbeTestCase, self).tearDown() - SpanCodeOriginProcessor.disable() + MockSpanCodeOriginProcessor.disable() core.reset_listeners(event_id="service_entrypoint.patch") def test_span_origin(self): @@ -54,3 +65,41 @@ def entry_call(): assert _exit.get_tag("_dd.code_origin.type") == "exit" assert _exit.get_tag("_dd.code_origin.frames.2.file") == str(Path(__file__).resolve()) assert _exit.get_tag("_dd.code_origin.frames.2.line") == str(self.test_span_origin.__code__.co_firstlineno) + + def test_span_origin_session(self): + def entry_call(): + pass + + core.dispatch("service_entrypoint.patch", (entry_call,)) + + with self.tracer.trace("entry"): + # Emulate a trigger probe + Session(ident="test", level=2).link_to_trace() + entry_call() + with self.tracer.trace("middle"): + with self.tracer.trace("exit", span_type=SpanTypes.HTTP): + pass + + self.assert_span_count(3) + entry, middle, _exit = self.get_spans() + payloads = MockSpanCodeOriginProcessor.get_uploader().wait_for_payloads() + snapshot_ids = {p["debugger"]["snapshot"]["id"] for p in payloads} + + assert len(payloads) == len(snapshot_ids) + + entry_snapshot_id = entry.get_tag("_dd.code_origin.frames.0.snapshot_id") + assert entry.get_tag("_dd.code_origin.type") == "entry" + assert entry_snapshot_id in snapshot_ids + + # Check that we don't have span location tags on the middle span + assert middle.get_tag("_dd.code_origin.frames.0.snapshot_id") is None + + # Check that we have all the snapshots for the exit span + assert _exit.get_tag("_dd.code_origin.type") == "exit" + snapshot_ids_from_span_tags = {_exit.get_tag(f"_dd.code_origin.frames.{_}.snapshot_id") for _ in range(8)} + snapshot_ids_from_span_tags.discard(None) + assert snapshot_ids_from_span_tags < snapshot_ids + + # Check that we have complete data + snapshot_ids_from_span_tags.add(entry_snapshot_id) + assert snapshot_ids_from_span_tags == snapshot_ids