Skip to content

Commit

Permalink
chore(di): trigger probes
Browse files Browse the repository at this point in the history
We implement trigger probes. These allows triggering the capture of
debug information along a trace, ensuring all the relevant probes are
also triggered.
  • Loading branch information
P403n1x87 committed Oct 4, 2024
1 parent dd0891e commit a804567
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 48 deletions.
109 changes: 63 additions & 46 deletions ddtrace/debugging/_origin/span.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,27 @@
from dataclasses import dataclass
from functools import partial
from itertools import count
from pathlib import Path
import sys

# from threading import current_thread
from threading import current_thread
from types import FrameType
from types import FunctionType
import typing as t
import uuid

import ddtrace

# from ddtrace import config
from ddtrace._trace.processor import SpanProcessor

# from ddtrace.debugging._debugger import Debugger
from ddtrace.debugging._probe.model import DEFAULT_CAPTURE_LIMITS
from ddtrace.debugging._probe.model import LiteralTemplateSegment
from ddtrace.debugging._probe.model import LogFunctionProbe
from ddtrace.debugging._probe.model import LogLineProbe
from ddtrace.debugging._probe.model import ProbeEvaluateTimingForMethod
from ddtrace.debugging._session import Session
from ddtrace.debugging._signal.collector import SignalCollector
from ddtrace.debugging._signal.collector import SignalContext

# from ddtrace.debugging._signal.snapshot import Snapshot
from ddtrace.debugging._signal.snapshot import Snapshot
from ddtrace.debugging._uploader import LogsIntakeUploaderV1
from ddtrace.debugging._uploader import UploaderProduct
from ddtrace.ext import EXIT_SPAN_TYPES
from ddtrace.internal import compat
from ddtrace.internal import core
Expand All @@ -40,13 +39,13 @@ def frame_stack(frame: FrameType) -> t.Iterator[FrameType]:
_frame = _frame.f_back


def wrap_entrypoint(f: t.Callable) -> None:
def wrap_entrypoint(collector: SignalCollector, f: t.Callable) -> None:
if not _isinstance(f, FunctionType):
return

_f = t.cast(FunctionType, f)
if not EntrySpanWrappingContext.is_wrapped(_f):
EntrySpanWrappingContext(_f).wrap()
EntrySpanWrappingContext(collector, _f).wrap()


@dataclass
Expand Down Expand Up @@ -120,9 +119,11 @@ class EntrySpanLocation:


class EntrySpanWrappingContext(WrappingContext):
def __init__(self, f):
def __init__(self, collector: SignalCollector, f: FunctionType) -> None:
super().__init__(f)

self.collector = collector

filename = str(Path(f.__code__.co_filename).resolve())
name = f.__qualname__
module = f.__module__
Expand Down Expand Up @@ -152,25 +153,26 @@ def __enter__(self):
s.set_tag_str("_dd.code_origin.frames.0.type", location.module)
s.set_tag_str("_dd.code_origin.frames.0.method", location.name)

# TODO[gab]: This will be enabled as part of the live debugger/distributed debugging
# if ld_config.enabled:
# # Create a snapshot
# snapshot = Snapshot(
# probe=location.probe,
# frame=self.__frame__,
# thread=current_thread(),
# trace_context=root,
# )
# Check if we have a debugging session running for the current trace
session = Session.get_for_trace()
if session is not None and session.level >= 2:
# Create a snapshot
snapshot = Snapshot(
probe=location.probe,
frame=self.__frame__,
thread=current_thread(),
trace_context=root,
)

# # Capture on entry
# context = Debugger.get_collector().attach(snapshot)
# Capture on entry
context = self.collector.attach(snapshot)

# # Correlate the snapshot with the span
# root.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)
# span.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)
# Correlate the snapshot with the span
root.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)
span.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)

# self.set("context", context)
# self.set("start_time", compat.monotonic_ns())
self.set("context", context)
self.set("start_time", compat.monotonic_ns())

return self

Expand All @@ -194,6 +196,8 @@ def __exit__(self, exc_type, exc_value, traceback):

@dataclass
class SpanCodeOriginProcessor(SpanProcessor):
__uploader__ = LogsIntakeUploaderV1

_instance: t.Optional["SpanCodeOriginProcessor"] = None

def on_span_start(self, span: Span) -> None:
Expand All @@ -219,24 +223,25 @@ def on_span_start(self, span: Span) -> None:
# DEV: Without a function object we cannot infer the function
# and any potential class name.

# TODO[gab]: This will be enabled as part of the live debugger/distributed debugging
# if ld_config.enabled:
# # Create a snapshot
# snapshot = Snapshot(
# probe=ExitSpanProbe.from_frame(frame),
# frame=frame,
# thread=current_thread(),
# trace_context=span,
# )
# Check if we have a debugging session running for the current trace
session = Session.get_for_trace()
if session is not None and session.level >= 2:
# Create a snapshot
snapshot = Snapshot(
probe=ExitSpanProbe.from_frame(frame),
frame=frame,
thread=current_thread(),
trace_context=span,
)

# # Capture on entry
# snapshot.line()
# Capture on entry
snapshot.line()

# # Collect
# Debugger.get_collector().push(snapshot)
# Collect
self.__uploader__.get_collector().push(snapshot)

# # Correlate the snapshot with the span
# span.set_tag_str(f"_dd.code_origin.frames.{n}.snapshot_id", snapshot.uuid)
# Correlate the snapshot with the span
span.set_tag_str(f"_dd.code_origin.frames.{n}.snapshot_id", snapshot.uuid)

def on_span_finish(self, span: Span) -> None:
pass
Expand All @@ -246,17 +251,29 @@ def enable(cls):
if cls._instance is not None:
return

core.on("service_entrypoint.patch", wrap_entrypoint)

instance = cls._instance = cls()

# Register code origin for span with the snapshot uploader
cls.__uploader__.register(UploaderProduct.CODE_ORIGIN_SPAN)

# Register the processor for exit spans
instance.register()

# Register the entrypoint wrapping for entry spans
core.on("service_entrypoint.patch", partial(wrap_entrypoint, cls.__uploader__.get_collector()))

@classmethod
def disable(cls):
if cls._instance is None:
return

# Unregister the entrypoint wrapping for entry spans
core.reset_listeners("service_entrypoint.patch", wrap_entrypoint)

# Unregister the processor for exit spans
cls._instance.unregister()
cls._instance = None

core.reset_listeners("service_entrypoint.patch", wrap_entrypoint)
# Unregister code origin for span with the snapshot uploader
cls.__uploader__.unregister(UploaderProduct.CODE_ORIGIN_SPAN)

cls._instance = None
30 changes: 30 additions & 0 deletions ddtrace/debugging/_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from dataclasses import dataclass
import typing as t
from weakref import WeakKeyDictionary as wkdict

from ddtrace import tracer


@dataclass
class Session:
ident: str
level: int

def link_to_trace(self):
SessionManager.link_session_to_trace(self)

@classmethod
def get_for_trace(cls) -> t.Optional["Session"]:
return SessionManager.get_session_for_trace()


class SessionManager:
_session_trace_map: wkdict = wkdict() # Span to Session mapping

@classmethod
def link_session_to_trace(cls, session) -> None:
cls._session_trace_map[tracer.current_root_span()] = session

@classmethod
def get_session_for_trace(cls) -> t.Optional[Session]:
return cls._session_trace_map.get(tracer.current_root_span())
1 change: 1 addition & 0 deletions ddtrace/debugging/_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class UploaderProduct(str, Enum):

DEBUGGER = "dynamic_instrumentation"
EXCEPTION_REPLAY = "exception_replay"
CODE_ORIGIN_SPAN = "code_origin.span"


class LogsIntakeUploaderV1(ForksafeAwakeablePeriodicService):
Expand Down
53 changes: 51 additions & 2 deletions tests/debugging/origin/test_span.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,36 @@
from pathlib import Path
import typing as t

import ddtrace
from ddtrace.debugging._origin.span import SpanCodeOriginProcessor
from ddtrace.debugging._session import Session
from ddtrace.ext import SpanTypes
from ddtrace.internal import core
from tests.debugging.mocking import MockLogsIntakeUploaderV1
from tests.utils import TracerTestCase


class MockSpanCodeOriginProcessor(SpanCodeOriginProcessor):
__uploader__ = MockLogsIntakeUploaderV1

@classmethod
def get_uploader(cls) -> MockLogsIntakeUploaderV1:
return t.cast(MockLogsIntakeUploaderV1, cls.__uploader__._instance)


class SpanProbeTestCase(TracerTestCase):
def setUp(self):
super(SpanProbeTestCase, self).setUp()
self.backup_tracer = ddtrace.tracer
ddtrace.tracer = self.tracer

SpanCodeOriginProcessor.enable()
MockSpanCodeOriginProcessor.enable()

def tearDown(self):
ddtrace.tracer = self.backup_tracer
super(SpanProbeTestCase, self).tearDown()

SpanCodeOriginProcessor.disable()
MockSpanCodeOriginProcessor.disable()
core.reset_listeners(event_id="service_entrypoint.patch")

def test_span_origin(self):
Expand Down Expand Up @@ -54,3 +65,41 @@ def entry_call():
assert _exit.get_tag("_dd.code_origin.type") == "exit"
assert _exit.get_tag("_dd.code_origin.frames.2.file") == str(Path(__file__).resolve())
assert _exit.get_tag("_dd.code_origin.frames.2.line") == str(self.test_span_origin.__code__.co_firstlineno)

def test_span_origin_session(self):
def entry_call():
pass

core.dispatch("service_entrypoint.patch", (entry_call,))

with self.tracer.trace("entry"):
# Emulate a trigger probe
Session(ident="test", level=2).link_to_trace()
entry_call()
with self.tracer.trace("middle"):
with self.tracer.trace("exit", span_type=SpanTypes.HTTP):
pass

self.assert_span_count(3)
entry, middle, _exit = self.get_spans()
payloads = MockSpanCodeOriginProcessor.get_uploader().wait_for_payloads()
snapshot_ids = {p["debugger"]["snapshot"]["id"] for p in payloads}

assert len(payloads) == len(snapshot_ids)

entry_snapshot_id = entry.get_tag("_dd.code_origin.frames.0.snapshot_id")
assert entry.get_tag("_dd.code_origin.type") == "entry"
assert entry_snapshot_id in snapshot_ids

# Check that we don't have span location tags on the middle span
assert middle.get_tag("_dd.code_origin.frames.0.snapshot_id") is None

# Check that we have all the snapshots for the exit span
assert _exit.get_tag("_dd.code_origin.type") == "exit"
snapshot_ids_from_span_tags = {_exit.get_tag(f"_dd.code_origin.frames.{_}.snapshot_id") for _ in range(8)}
snapshot_ids_from_span_tags.discard(None)
assert snapshot_ids_from_span_tags < snapshot_ids

# Check that we have complete data
snapshot_ids_from_span_tags.add(entry_snapshot_id)
assert snapshot_ids_from_span_tags == snapshot_ids

0 comments on commit a804567

Please sign in to comment.