diff --git a/ddtrace/_trace/context.py b/ddtrace/_trace/context.py index 07bc3960b56..4f444aafd9d 100644 --- a/ddtrace/_trace/context.py +++ b/ddtrace/_trace/context.py @@ -48,7 +48,17 @@ class Context(object): boundaries. """ - __slots__ = ["trace_id", "span_id", "_lock", "_meta", "_metrics", "_span_links", "_baggage", "_is_remote"] + __slots__ = [ + "trace_id", + "span_id", + "_lock", + "_meta", + "_metrics", + "_span_links", + "_baggage", + "_is_remote", + "__weakref__", + ] def __init__( self, @@ -259,7 +269,6 @@ def __eq__(self, other: Any) -> bool: with self._lock: return ( self.trace_id == other.trace_id - and self.span_id == other.span_id and self._meta == other._meta and self._metrics == other._metrics and self._span_links == other._span_links @@ -279,4 +288,7 @@ def __repr__(self) -> str: self._is_remote, ) + def __hash__(self) -> int: + return hash(self.trace_id) + __str__ = __repr__ diff --git a/ddtrace/contrib/trace_utils.py b/ddtrace/contrib/trace_utils.py index db8509d8c35..c12ce466ffe 100644 --- a/ddtrace/contrib/trace_utils.py +++ b/ddtrace/contrib/trace_utils.py @@ -29,6 +29,7 @@ from ddtrace.internal.compat import ensure_text from ddtrace.internal.compat import ip_is_global from ddtrace.internal.compat import parse +from ddtrace.internal.core.event_hub import dispatch from ddtrace.internal.logger import get_logger from ddtrace.internal.utils.cache import cached from ddtrace.internal.utils.http import normalize_header_name @@ -589,6 +590,8 @@ def activate_distributed_headers(tracer, int_config=None, request_headers=None, # have a context with the same trace id active tracer.context_provider.activate(context) + dispatch("distributed_context.activated", (context,)) + def _flatten( obj, # type: Any diff --git a/ddtrace/debugging/_exception/replay.py b/ddtrace/debugging/_exception/replay.py index 080b4cbfc61..04c05b3ec77 100644 --- a/ddtrace/debugging/_exception/replay.py +++ b/ddtrace/debugging/_exception/replay.py @@ -11,6 +11,7 @@ from ddtrace._trace.span import Span from ddtrace.debugging._probe.model import LiteralTemplateSegment from ddtrace.debugging._probe.model import LogLineProbe +from ddtrace.debugging._session import Session from ddtrace.debugging._signal.snapshot import DEFAULT_CAPTURE_LIMITS from ddtrace.debugging._signal.snapshot import Snapshot from ddtrace.debugging._uploader import LogsIntakeUploaderV1 @@ -193,6 +194,9 @@ def can_capture(span: Span) -> bool: return True if info_captured is None: + if Session.from_trace(): + # If we are in a debug session we always capture + return True result = GLOBAL_RATE_LIMITER.limit() is not RateLimitExceeded root.set_tag_str(CAPTURE_TRACE_TAG, str(result).lower()) return result diff --git a/ddtrace/debugging/_live.py b/ddtrace/debugging/_live.py new file mode 100644 index 00000000000..31a3d315bc4 --- /dev/null +++ b/ddtrace/debugging/_live.py @@ -0,0 +1,10 @@ +from ddtrace.debugging._session import Session +from ddtrace.internal import core + + +def enable() -> None: + core.on("distributed_context.activated", Session.activate_distributed, "live_debugger") + + +def disable() -> None: + core.reset_listeners("distributed_context.activated", Session.activate_distributed) diff --git a/ddtrace/debugging/_origin/span.py b/ddtrace/debugging/_origin/span.py index 9b592df2bde..abd63dbb97d 100644 --- a/ddtrace/debugging/_origin/span.py +++ b/ddtrace/debugging/_origin/span.py @@ -1,29 +1,27 @@ from dataclasses import dataclass +from functools import partial from itertools import count from pathlib import Path import sys -import time - -# from threading import current_thread +from threading import current_thread +from time import monotonic_ns from types import FrameType from types import FunctionType import typing as t import uuid import ddtrace - -# from ddtrace import config from ddtrace._trace.processor import SpanProcessor - -# from ddtrace.debugging._debugger import Debugger from ddtrace.debugging._probe.model import DEFAULT_CAPTURE_LIMITS from ddtrace.debugging._probe.model import LiteralTemplateSegment from ddtrace.debugging._probe.model import LogFunctionProbe from ddtrace.debugging._probe.model import LogLineProbe from ddtrace.debugging._probe.model import ProbeEvalTiming - -# from ddtrace.debugging._signal.snapshot import Snapshot -from ddtrace.debugging._signal.model import Signal +from ddtrace.debugging._session import Session +from ddtrace.debugging._signal.collector import SignalCollector +from ddtrace.debugging._signal.snapshot import Snapshot +from ddtrace.debugging._uploader import LogsIntakeUploaderV1 +from ddtrace.debugging._uploader import UploaderProduct from ddtrace.ext import EXIT_SPAN_TYPES from ddtrace.internal import core from ddtrace.internal.packages import is_user_code @@ -40,13 +38,13 @@ def frame_stack(frame: FrameType) -> t.Iterator[FrameType]: _frame = _frame.f_back -def wrap_entrypoint(f: t.Callable) -> None: +def wrap_entrypoint(collector: SignalCollector, f: t.Callable) -> None: if not _isinstance(f, FunctionType): return _f = t.cast(FunctionType, f) if not EntrySpanWrappingContext.is_wrapped(_f): - EntrySpanWrappingContext(_f).wrap() + EntrySpanWrappingContext(collector, _f).wrap() @dataclass @@ -120,9 +118,13 @@ class EntrySpanLocation: class EntrySpanWrappingContext(WrappingContext): - def __init__(self, f): + __priority__ = 199 + + def __init__(self, collector: SignalCollector, f: FunctionType) -> None: super().__init__(f) + self.collector = collector + filename = str(Path(f.__code__.co_filename).resolve()) name = f.__qualname__ module = f.__module__ @@ -152,36 +154,37 @@ def __enter__(self): s.set_tag_str("_dd.code_origin.frames.0.type", location.module) s.set_tag_str("_dd.code_origin.frames.0.method", location.name) - # TODO[gab]: This will be enabled as part of the live debugger/distributed debugging - # if ld_config.enabled: - # # Create a snapshot - # snapshot = Snapshot( - # probe=location.probe, - # frame=self.__frame__, - # thread=current_thread(), - # trace_context=root, - # ) - - # # Capture on entry - # context = Debugger.get_collector().attach(snapshot) - - # # Correlate the snapshot with the span - # root.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid) - # span.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid) - - # self.set("context", context) - # self.set("start_time", time.monotonic_ns()) + self.set("start_time", monotonic_ns()) return self def _close_signal(self, retval=None, exc_info=(None, None, None)): - try: - signal: Signal = t.cast(Signal, self.get("signal")) - except KeyError: - # No snapshot was created + root = ddtrace.tracer.current_root_span() + span = ddtrace.tracer.current_span() + if root is None or span is None: return - signal.do_exit(retval, exc_info, time.monotonic_ns() - self.get("start_time")) + # Check if we have any level 2 debugging sessions running for the + # current trace + if any(s.level >= 2 for s in Session.from_trace()): + # Create a snapshot + snapshot = Snapshot( + probe=self.location.probe, + frame=self.__frame__, + thread=current_thread(), + trace_context=root, + ) + + # Capture on entry + snapshot.do_enter() + + # Correlate the snapshot with the span + root.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid) + span.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid) + + snapshot.do_exit(retval, exc_info, monotonic_ns() - self.get("start_time")) + + self.collector.push(snapshot) def __return__(self, retval): self._close_signal(retval=retval) @@ -194,7 +197,10 @@ def __exit__(self, exc_type, exc_value, traceback): @dataclass class SpanCodeOriginProcessor(SpanProcessor): + __uploader__ = LogsIntakeUploaderV1 + _instance: t.Optional["SpanCodeOriginProcessor"] = None + _handler: t.Optional[t.Callable] = None def on_span_start(self, span: Span) -> None: if span.span_type not in EXIT_SPAN_TYPES: @@ -219,24 +225,25 @@ def on_span_start(self, span: Span) -> None: # DEV: Without a function object we cannot infer the function # and any potential class name. - # TODO[gab]: This will be enabled as part of the live debugger/distributed debugging - # if ld_config.enabled: - # # Create a snapshot - # snapshot = Snapshot( - # probe=ExitSpanProbe.from_frame(frame), - # frame=frame, - # thread=current_thread(), - # trace_context=span, - # ) + # Check if we have any level 2 debugging sessions running for + # the current trace + if any(s.level >= 2 for s in Session.from_trace()): + # Create a snapshot + snapshot = Snapshot( + probe=ExitSpanProbe.from_frame(frame), + frame=frame, + thread=current_thread(), + trace_context=span, + ) - # # Capture on entry - # snapshot.line() + # Capture on entry + snapshot.do_line() - # # Collect - # Debugger.get_collector().push(snapshot) + # Collect + self.__uploader__.get_collector().push(snapshot) - # # Correlate the snapshot with the span - # span.set_tag_str(f"_dd.code_origin.frames.{n}.snapshot_id", snapshot.uuid) + # Correlate the snapshot with the span + span.set_tag_str(f"_dd.code_origin.frames.{n}.snapshot_id", snapshot.uuid) def on_span_finish(self, span: Span) -> None: pass @@ -246,17 +253,31 @@ def enable(cls): if cls._instance is not None: return - core.on("service_entrypoint.patch", wrap_entrypoint) - instance = cls._instance = cls() + + # Register code origin for span with the snapshot uploader + cls.__uploader__.register(UploaderProduct.CODE_ORIGIN_SPAN) + + # Register the processor for exit spans instance.register() + # Register the entrypoint wrapping for entry spans + cls._handler = handler = partial(wrap_entrypoint, cls.__uploader__.get_collector()) + core.on("service_entrypoint.patch", handler) + @classmethod def disable(cls): if cls._instance is None: return + # Unregister the entrypoint wrapping for entry spans + core.reset_listeners("service_entrypoint.patch", cls._handler) + cls._handler = None + + # Unregister the processor for exit spans cls._instance.unregister() - cls._instance = None - core.reset_listeners("service_entrypoint.patch", wrap_entrypoint) + # Unregister code origin for span with the snapshot uploader + cls.__uploader__.unregister(UploaderProduct.CODE_ORIGIN_SPAN) + + cls._instance = None diff --git a/ddtrace/debugging/_probe/model.py b/ddtrace/debugging/_probe/model.py index f832484ac6d..1b904288104 100644 --- a/ddtrace/debugging/_probe/model.py +++ b/ddtrace/debugging/_probe/model.py @@ -14,6 +14,7 @@ from typing import Union from ddtrace.debugging._expressions import DDExpression +from ddtrace.debugging._session import SessionId from ddtrace.internal.compat import maybe_stringify from ddtrace.internal.logger import get_logger from ddtrace.internal.module import _resolve @@ -26,6 +27,7 @@ DEFAULT_PROBE_RATE = 5000.0 DEFAULT_SNAPSHOT_PROBE_RATE = 1.0 +DEFAULT_TRIGGER_PROBE_RATE = 1.0 / 60.0 # 1 per minute DEFAULT_PROBE_CONDITION_ERROR_RATE = 1.0 / 60 / 5 @@ -301,8 +303,26 @@ class SpanDecorationFunctionProbe(Probe, FunctionLocationMixin, TimingMixin, Spa pass -LineProbe = Union[LogLineProbe, MetricLineProbe, SpanDecorationLineProbe] -FunctionProbe = Union[LogFunctionProbe, MetricFunctionProbe, SpanFunctionProbe, SpanDecorationFunctionProbe] +@dataclass +class SessionMixin: + session_id: SessionId + level: int + + +@dataclass +class TriggerLineProbe(Probe, LineLocationMixin, SessionMixin, ProbeConditionMixin, RateLimitMixin): + pass + + +@dataclass +class TriggerFunctionProbe(Probe, FunctionLocationMixin, SessionMixin, ProbeConditionMixin, RateLimitMixin): + pass + + +LineProbe = Union[LogLineProbe, MetricLineProbe, SpanDecorationLineProbe, TriggerLineProbe] +FunctionProbe = Union[ + LogFunctionProbe, MetricFunctionProbe, SpanFunctionProbe, SpanDecorationFunctionProbe, TriggerFunctionProbe +] class ProbeType(str, Enum): @@ -310,3 +330,4 @@ class ProbeType(str, Enum): METRIC_PROBE = "METRIC_PROBE" SPAN_PROBE = "SPAN_PROBE" SPAN_DECORATION_PROBE = "SPAN_DECORATION_PROBE" + TRIGGER_PROBE = "TRIGGER_PROBE" diff --git a/ddtrace/debugging/_probe/remoteconfig.py b/ddtrace/debugging/_probe/remoteconfig.py index b305e0fb5da..4db0a7fac78 100644 --- a/ddtrace/debugging/_probe/remoteconfig.py +++ b/ddtrace/debugging/_probe/remoteconfig.py @@ -14,6 +14,7 @@ from ddtrace.debugging._probe.model import DEFAULT_PROBE_CONDITION_ERROR_RATE from ddtrace.debugging._probe.model import DEFAULT_PROBE_RATE from ddtrace.debugging._probe.model import DEFAULT_SNAPSHOT_PROBE_RATE +from ddtrace.debugging._probe.model import DEFAULT_TRIGGER_PROBE_RATE from ddtrace.debugging._probe.model import CaptureLimits from ddtrace.debugging._probe.model import ExpressionTemplateSegment from ddtrace.debugging._probe.model import FunctionProbe @@ -34,6 +35,8 @@ from ddtrace.debugging._probe.model import StringTemplate from ddtrace.debugging._probe.model import TemplateSegment from ddtrace.debugging._probe.model import TimingMixin +from ddtrace.debugging._probe.model import TriggerFunctionProbe +from ddtrace.debugging._probe.model import TriggerLineProbe from ddtrace.debugging._probe.status import ProbeStatusLogger from ddtrace.debugging._redaction import DDRedactedExpression from ddtrace.internal.logger import get_logger @@ -202,10 +205,34 @@ def update_args(cls, args, attribs): ) +class TriggerProbeFactory(ProbeFactory): + __line_class__ = TriggerLineProbe + __function_class__ = TriggerFunctionProbe + + @classmethod + def update_args(cls, args, attribs): + args.update( + rate=attribs.get("sampling", {}).get("cooldownInSeconds", DEFAULT_TRIGGER_PROBE_RATE), + session_id=attribs["sessionId"], + level=int(attribs["level"]), + condition=DDRedactedExpression.compile(attribs["when"]) if "when" in attribs else None, + condition_error_rate=DEFAULT_PROBE_CONDITION_ERROR_RATE, + ) + + class InvalidProbeConfiguration(ValueError): pass +PROBE_FACTORY = { + ProbeType.LOG_PROBE: LogProbeFactory, + ProbeType.METRIC_PROBE: MetricProbeFactory, + ProbeType.SPAN_PROBE: SpanProbeFactory, + ProbeType.SPAN_DECORATION_PROBE: SpanDecorationProbeFactory, + ProbeType.TRIGGER_PROBE: TriggerProbeFactory, +} + + def build_probe(attribs: Dict[str, Any]) -> Probe: """ Create a new Probe instance. @@ -222,14 +249,9 @@ def build_probe(attribs: Dict[str, Any]) -> Probe: tags=dict(_.split(":", 1) for _ in attribs.get("tags", [])), ) - if _type == ProbeType.LOG_PROBE: - return LogProbeFactory.build(args, attribs) - if _type == ProbeType.METRIC_PROBE: - return MetricProbeFactory.build(args, attribs) - if _type == ProbeType.SPAN_PROBE: - return SpanProbeFactory.build(args, attribs) - if _type == ProbeType.SPAN_DECORATION_PROBE: - return SpanDecorationProbeFactory.build(args, attribs) + factory = PROBE_FACTORY.get(_type) + if factory is not None: + return factory.build(args, attribs) raise InvalidProbeConfiguration("Unsupported probe type: %s" % _type) diff --git a/ddtrace/debugging/_products/live_debugger.py b/ddtrace/debugging/_products/live_debugger.py new file mode 100644 index 00000000000..1417d22d320 --- /dev/null +++ b/ddtrace/debugging/_products/live_debugger.py @@ -0,0 +1,31 @@ +from ddtrace.settings.live_debugging import config + + +# TODO[gab]: Uncomment when the product is ready +# requires = ["tracer"] + + +def post_preload(): + pass + + +def start() -> None: + if config.enabled: + from ddtrace.debugging._live import enable + + enable() + + +def restart(join: bool = False) -> None: + pass + + +def stop(join: bool = False): + if config.enabled: + from ddtrace.debugging._live import disable + + disable() + + +def at_exit(join: bool = False): + stop(join=join) diff --git a/ddtrace/debugging/_session.py b/ddtrace/debugging/_session.py new file mode 100644 index 00000000000..d9f25157d2c --- /dev/null +++ b/ddtrace/debugging/_session.py @@ -0,0 +1,109 @@ +from dataclasses import dataclass +import typing as t +from weakref import WeakKeyDictionary as wkdict + +from ddtrace import tracer + + +SessionId = str + +DEFAULT_SESSION_LEVEL = 1 + + +def _sessions_from_debug_tag(debug_tag: str) -> t.Generator["Session", None, None]: + for session in debug_tag.split("."): + ident, _, level = session.partition(":") + yield Session(ident=ident, level=int(level or DEFAULT_SESSION_LEVEL)) + + +def _sessions_to_debug_tag(sessions: t.Iterable["Session"]) -> str: + # TODO: Validate tag length + return ".".join( + (f"{session.ident}:{session.level}" if session.level != DEFAULT_SESSION_LEVEL else session.ident) + for session in sessions + ) + + +@dataclass +class Session: + ident: SessionId + level: int + + @classmethod + def activate_distributed(cls, context: t.Any) -> None: + debug_tag = context._meta.get("_dd.p.debug") + if debug_tag is None: + return + + for session in _sessions_from_debug_tag(debug_tag): + session.link_to_trace(context) + + def propagate(self, context: t.Any) -> None: + debug_tag = context._meta.get("_dd.p.debug") + sessions = list(_sessions_from_debug_tag(debug_tag)) if debug_tag is not None else [] + for session in sessions: + if self.ident == session.ident: + # The session is already in the tags so we don't need to add it + if self.level > session.level: + # We only need to update the level if it's higher + session.level = self.level + break + else: + # The session is not in the tags so we need to add it + sessions.append(self) + + context._meta["_dd.p.debug"] = _sessions_to_debug_tag(sessions) + + def link_to_trace(self, trace_context: t.Optional[t.Any] = None): + SessionManager.link_session_to_trace(self, trace_context) + + def unlink_from_trace(self, trace_context: t.Optional[t.Any] = None): + SessionManager.unlink_session_from_trace(self, trace_context) + + @classmethod + def from_trace(cls) -> t.List["Session"]: + return SessionManager.get_sessions_for_trace() + + @classmethod + def lookup(cls, ident: SessionId) -> t.Optional["Session"]: + return SessionManager.lookup_session(ident) + + +class SessionManager: + _sessions_trace_map: t.MutableMapping[ + t.Any, t.Dict[SessionId, Session] + ] = wkdict() # Trace context to Sessions mapping + + @classmethod + def link_session_to_trace(cls, session, trace_context: t.Optional[t.Any] = None) -> None: + context = trace_context or tracer.current_trace_context() + if context is None: + # Nothing to link to + return + + cls._sessions_trace_map.setdefault(context, {})[session.ident] = session + + @classmethod + def unlink_session_from_trace(cls, session, trace_context: t.Optional[t.Any] = None) -> None: + context = trace_context or tracer.current_trace_context() + if context is None: + # Nothing to unlink from + return + + cls._sessions_trace_map.get(context, {}).pop(session.ident, None) + + @classmethod + def get_sessions_for_trace(cls) -> t.List[Session]: + context = tracer.current_trace_context() + if context is None: + return [] + + return list(cls._sessions_trace_map.get(context, {}).values()) + + @classmethod + def lookup_session(cls, ident: SessionId) -> t.Optional[Session]: + context = tracer.current_trace_context() + if context is None: + return None + + return cls._sessions_trace_map.get(context, {}).get(ident) diff --git a/ddtrace/debugging/_signal/__init__.py b/ddtrace/debugging/_signal/__init__.py index 6dea9afe965..494a46a9a91 100644 --- a/ddtrace/debugging/_signal/__init__.py +++ b/ddtrace/debugging/_signal/__init__.py @@ -3,3 +3,4 @@ from ddtrace.debugging._signal.snapshot import Snapshot # noqa from ddtrace.debugging._signal.tracing import DynamicSpan # noqa from ddtrace.debugging._signal.tracing import SpanDecoration # noqa +from ddtrace.debugging._signal.trigger import Trigger # noqa diff --git a/ddtrace/debugging/_signal/model.py b/ddtrace/debugging/_signal/model.py index 9c9448677c0..9354c56f0ff 100644 --- a/ddtrace/debugging/_signal/model.py +++ b/ddtrace/debugging/_signal/model.py @@ -27,6 +27,7 @@ from ddtrace.debugging._probe.model import RateLimitMixin from ddtrace.debugging._probe.model import TimingMixin from ddtrace.debugging._safety import get_args +from ddtrace.debugging._session import Session from ddtrace.internal.compat import ExcInfoType from ddtrace.internal.metrics import Metrics from ddtrace.internal.rate_limiter import BudgetRateLimiterWithJitter as RateLimiter @@ -121,12 +122,28 @@ def _rate_limit_exceeded(self) -> bool: # We don't have a rate limiter, so no rate was exceeded. return False - exceeded = probe.limiter.limit() is RateLimitExceeded + exceeded = self.session is None and probe.limiter.limit() is RateLimitExceeded if exceeded: self.state = SignalState.SKIP_RATE return exceeded + def _session_check(self) -> bool: + # Check that we emit signals from probes with a session ID only if the + # session is active. If the probe has no session ID, or the session ID + # is active, we can proceed with the signal emission. + session_id = self.probe.tags.get("sessionId") + if session_id is not None: + session = Session.lookup(session_id) + if session is None or session.level == 0: + return False + return True + + @property + def session(self): + session_id = self.probe.tags.get("sessionId") + return Session.lookup(session_id) if session_id is not None else None + @property def args(self): return dict(get_args(self.frame)) @@ -144,6 +161,9 @@ def line(self, scope: Mapping[str, Any]) -> None: pass def do_enter(self) -> None: + if not self._session_check(): + return + if self._timing is not ProbeEvalTiming.ENTRY: return @@ -157,6 +177,9 @@ def do_enter(self) -> None: self.enter(scope) def do_exit(self, retval: Any, exc_info: ExcInfoType, duration: int) -> None: + if not self._session_check(): + return + if self.state is not SignalState.NONE: # The signal has already been handled and move to a final state return @@ -186,6 +209,9 @@ def do_exit(self, retval: Any, exc_info: ExcInfoType, duration: int) -> None: self.state = SignalState.DONE def do_line(self, global_limiter: Optional[RateLimiter] = None) -> None: + if not self._session_check(): + return + frame = self.frame scope = ChainMap(frame.f_locals, frame.f_globals) diff --git a/ddtrace/debugging/_signal/trigger.py b/ddtrace/debugging/_signal/trigger.py new file mode 100644 index 00000000000..6f1eec74e30 --- /dev/null +++ b/ddtrace/debugging/_signal/trigger.py @@ -0,0 +1,64 @@ +from dataclasses import dataclass +import typing as t + +from ddtrace.debugging._probe.model import ProbeEvalTiming +from ddtrace.debugging._probe.model import SessionMixin +from ddtrace.debugging._probe.model import TriggerFunctionProbe +from ddtrace.debugging._probe.model import TriggerLineProbe +from ddtrace.debugging._session import Session +from ddtrace.debugging._signal.log import LogSignal +from ddtrace.debugging._signal.model import probe_to_signal +from ddtrace.internal.compat import ExcInfoType +from ddtrace.internal.logger import get_logger + + +log = get_logger(__name__) + + +@dataclass +class Trigger(LogSignal): + """Trigger a session creation.""" + + __default_timing__ = ProbeEvalTiming.ENTRY + + def _link_session(self) -> None: + probe = t.cast(SessionMixin, self.probe) + session = Session(probe.session_id, probe.level) + + # Link the session to the running trace + session.link_to_trace(self.trace_context) + + # Ensure that the new session information is included in the debug + # propagation tag for distributed debugging + session.propagate(self.trace_context) + + # DEV: Unfortunately we don't have an API for this :( + self.trace_context._meta[f"_dd.ld.probe_id.{self.probe.probe_id}"] = "true" # type: ignore[union-attr] + + def enter(self, scope: t.Mapping[str, t.Any]) -> None: + self._link_session() + + def exit(self, retval: t.Any, exc_info: ExcInfoType, duration: float, scope: t.Mapping[str, t.Any]) -> None: + # DEV: We do not unlink on exit explicitly here. We let the weak + # reference to the context object do the job. + pass + + def line(self, scope: t.Mapping[str, t.Any]): + self._link_session() + + @property + def message(self): + return f"Condition evaluation errors for probe {self.probe.probe_id}" if self.errors else None + + def has_message(self) -> bool: + return bool(self.errors) + + +@probe_to_signal.register +def _(probe: TriggerFunctionProbe, frame, thread, trace_context, meter): + return Trigger(probe=probe, frame=frame, thread=thread, trace_context=trace_context) + + +@probe_to_signal.register +def _(probe: TriggerLineProbe, frame, thread, trace_context, meter): + return Trigger(probe=probe, frame=frame, thread=thread, trace_context=trace_context) diff --git a/ddtrace/debugging/_uploader.py b/ddtrace/debugging/_uploader.py index f8f1a22a9d2..69d9104156d 100644 --- a/ddtrace/debugging/_uploader.py +++ b/ddtrace/debugging/_uploader.py @@ -25,6 +25,7 @@ class UploaderProduct(str, Enum): DEBUGGER = "dynamic_instrumentation" EXCEPTION_REPLAY = "exception_replay" + CODE_ORIGIN_SPAN = "code_origin.span" class LogsIntakeUploaderV1(ForksafeAwakeablePeriodicService): diff --git a/ddtrace/propagation/http.py b/ddtrace/propagation/http.py index a1664664ace..4261433f573 100644 --- a/ddtrace/propagation/http.py +++ b/ddtrace/propagation/http.py @@ -976,15 +976,15 @@ class HTTPPropagator(object): def _extract_configured_contexts_avail(normalized_headers): contexts = [] styles_w_ctx = [] - for prop_style in config._propagation_style_extract: - propagator = _PROP_STYLES[prop_style] - context = propagator._extract(normalized_headers) - # baggage is handled separately - if prop_style == _PROPAGATION_STYLE_BAGGAGE: - continue - if context: - contexts.append(context) - styles_w_ctx.append(prop_style) + if config._propagation_style_extract is not None: + for prop_style in config._propagation_style_extract: + if prop_style == _PROPAGATION_STYLE_BAGGAGE: + continue + propagator = _PROP_STYLES[prop_style] + context = propagator._extract(normalized_headers) + if context: + contexts.append(context) + styles_w_ctx.append(prop_style) return contexts, styles_w_ctx @staticmethod @@ -992,8 +992,8 @@ def _resolve_contexts(contexts, styles_w_ctx, normalized_headers): primary_context = contexts[0] links = [] - for context in contexts[1:]: - style_w_ctx = styles_w_ctx[contexts.index(context)] + for i, context in enumerate(contexts[1:], 1): + style_w_ctx = styles_w_ctx[i] # encoding expects at least trace_id and span_id if context.span_id and context.trace_id and context.trace_id != primary_context.trace_id: links.append( diff --git a/ddtrace/settings/live_debugging.py b/ddtrace/settings/live_debugging.py new file mode 100644 index 00000000000..b9183ae0a77 --- /dev/null +++ b/ddtrace/settings/live_debugging.py @@ -0,0 +1,16 @@ +from envier import En + + +class LiveDebuggerConfig(En): + __prefix__ = "dd.live_debugging" + + enabled = En.v( + bool, + "enabled", + default=False, + help_type="Boolean", + help="Enable the live debugger.", + ) + + +config = LiveDebuggerConfig() diff --git a/docs/configuration.rst b/docs/configuration.rst index 35bb63fac20..add19aa219c 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -918,3 +918,9 @@ Code Origin ----------- .. ddtrace-envier-configuration:: ddtrace.settings.code_origin:CodeOriginConfig + + +Live Debugging +-------------- + +.. ddtrace-envier-configuration:: ddtrace.settings.live_debugging:LiveDebuggerConfig diff --git a/pyproject.toml b/pyproject.toml index 3c83cfc5067..47604a4bc26 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,6 +63,7 @@ ddtrace = "ddtrace.contrib.pytest.plugin" "code-origin-for-spans" = "ddtrace.debugging._products.code_origin.span" "dynamic-instrumentation" = "ddtrace.debugging._products.dynamic_instrumentation" "exception-replay" = "ddtrace.debugging._products.exception_replay" +"live-debugger" = "ddtrace.debugging._products.live_debugger" "remote-configuration" = "ddtrace.internal.remoteconfig.product" "symbol-database" = "ddtrace.internal.symbol_db.product" diff --git a/tests/debugging/conftest.py b/tests/debugging/conftest.py index 71664983ea4..24fde8ebe86 100644 --- a/tests/debugging/conftest.py +++ b/tests/debugging/conftest.py @@ -5,9 +5,8 @@ @pytest.fixture def stuff(): - was_loaded = False - if "tests.submod.stuff" in sys.modules: - was_loaded = True + was_loaded = "tests.submod.stuff" in sys.modules + if was_loaded: del sys.modules["tests.submod.stuff"] __import__("tests.submod.stuff") diff --git a/tests/debugging/live/__init__.py b/tests/debugging/live/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/debugging/live/test_live_debugger.py b/tests/debugging/live/test_live_debugger.py new file mode 100644 index 00000000000..cb7872486e2 --- /dev/null +++ b/tests/debugging/live/test_live_debugger.py @@ -0,0 +1,68 @@ +from collections import Counter +import typing as t + +import ddtrace +from ddtrace.debugging._origin.span import SpanCodeOriginProcessor +from ddtrace.debugging._probe.model import ProbeEvalTiming +from ddtrace.internal import core +from tests.debugging.mocking import MockLogsIntakeUploaderV1 +from tests.debugging.mocking import debugger +from tests.debugging.utils import create_snapshot_function_probe +from tests.debugging.utils import create_trigger_function_probe +from tests.utils import TracerTestCase + + +class MockSpanCodeOriginProcessor(SpanCodeOriginProcessor): + __uploader__ = MockLogsIntakeUploaderV1 + + @classmethod + def get_uploader(cls) -> MockLogsIntakeUploaderV1: + return t.cast(MockLogsIntakeUploaderV1, cls.__uploader__._instance) + + +class SpanProbeTestCase(TracerTestCase): + def setUp(self): + super(SpanProbeTestCase, self).setUp() + self.backup_tracer = ddtrace.tracer + ddtrace.tracer = self.tracer + + MockSpanCodeOriginProcessor.enable() + + def tearDown(self): + ddtrace.tracer = self.backup_tracer + super(SpanProbeTestCase, self).tearDown() + + MockSpanCodeOriginProcessor.disable() + core.reset_listeners(event_id="service_entrypoint.patch") + + def test_live_debugger(self): + from tests.submod.traced_stuff import entrypoint + from tests.submod.traced_stuff import traced_entrypoint + + with debugger() as d: + d.add_probes( + create_trigger_function_probe( + probe_id="trigger-probe", + module="tests.submod.traced_stuff", + func_qname="entrypoint", + session_id="test-session-id", + level=2, + ), + create_snapshot_function_probe( + probe_id="snapshot-probe", + module="tests.submod.traced_stuff", + func_qname="middle", + evaluate_at=ProbeEvalTiming.EXIT, + tags={"sessionId": "test-session-id"}, + rate=0.0, + ), + ) + + core.dispatch("service_entrypoint.patch", (entrypoint,)) + + for _ in range(10): + traced_entrypoint(self.tracer) + + # Check that the function probe has been triggered always, regardless of + # the rate limit. + assert Counter(s.probe.probe_id for s in d.snapshots)["snapshot-probe"] == 10 diff --git a/tests/debugging/mocking.py b/tests/debugging/mocking.py index 4446bce559c..6c293988efc 100644 --- a/tests/debugging/mocking.py +++ b/tests/debugging/mocking.py @@ -6,6 +6,7 @@ from time import sleep from typing import Any from typing import Generator +from typing import List from envier import En @@ -16,6 +17,7 @@ from ddtrace.debugging._probe.remoteconfig import ProbePollerEvent from ddtrace.debugging._probe.remoteconfig import _filter_by_env_and_version from ddtrace.debugging._signal.collector import SignalCollector +from ddtrace.debugging._signal.snapshot import Snapshot from ddtrace.debugging._uploader import LogsIntakeUploaderV1 from tests.debugging.probe.test_status import DummyProbeStatusLogger @@ -115,6 +117,10 @@ def collector(self): def payloads(self): return [_ for data in self.queue for _ in json.loads(data)] + @property + def snapshots(self) -> List[Snapshot]: + return self.collector.queue + class TestDebugger(Debugger): __logger__ = MockProbeStatusLogger @@ -145,6 +151,10 @@ def uploader(self): def collector(self): return self.__uploader__.get_collector() + @property + def snapshots(self) -> List[Snapshot]: + return self.uploader.snapshots + @property def probe_status_logger(self): return self._probe_registry.logger diff --git a/tests/debugging/origin/test_span.py b/tests/debugging/origin/test_span.py index b443f784d2a..226bb8c7f79 100644 --- a/tests/debugging/origin/test_span.py +++ b/tests/debugging/origin/test_span.py @@ -1,25 +1,36 @@ from pathlib import Path +import typing as t import ddtrace from ddtrace.debugging._origin.span import SpanCodeOriginProcessor +from ddtrace.debugging._session import Session from ddtrace.ext import SpanTypes from ddtrace.internal import core +from tests.debugging.mocking import MockLogsIntakeUploaderV1 from tests.utils import TracerTestCase +class MockSpanCodeOriginProcessor(SpanCodeOriginProcessor): + __uploader__ = MockLogsIntakeUploaderV1 + + @classmethod + def get_uploader(cls) -> MockLogsIntakeUploaderV1: + return t.cast(MockLogsIntakeUploaderV1, cls.__uploader__._instance) + + class SpanProbeTestCase(TracerTestCase): def setUp(self): super(SpanProbeTestCase, self).setUp() self.backup_tracer = ddtrace.tracer ddtrace.tracer = self.tracer - SpanCodeOriginProcessor.enable() + MockSpanCodeOriginProcessor.enable() def tearDown(self): ddtrace.tracer = self.backup_tracer super(SpanProbeTestCase, self).tearDown() - SpanCodeOriginProcessor.disable() + MockSpanCodeOriginProcessor.disable() core.reset_listeners(event_id="service_entrypoint.patch") def test_span_origin(self): @@ -52,5 +63,43 @@ def entry_call(): # Check for the expected tags on the exit span assert _exit.get_tag("_dd.code_origin.type") == "exit" - assert _exit.get_tag("_dd.code_origin.frames.2.file") == str(Path(__file__).resolve()) - assert _exit.get_tag("_dd.code_origin.frames.2.line") == str(self.test_span_origin.__code__.co_firstlineno) + assert _exit.get_tag("_dd.code_origin.frames.0.file") == str(Path(__file__).resolve()) + assert _exit.get_tag("_dd.code_origin.frames.0.line") == str(self.test_span_origin.__code__.co_firstlineno) + + def test_span_origin_session(self): + def entry_call(): + pass + + core.dispatch("service_entrypoint.patch", (entry_call,)) + + with self.tracer.trace("entry"): + # Emulate a trigger probe + Session(ident="test", level=2).link_to_trace() + entry_call() + with self.tracer.trace("middle"): + with self.tracer.trace("exit", span_type=SpanTypes.HTTP): + pass + + self.assert_span_count(3) + entry, middle, _exit = self.get_spans() + payloads = MockSpanCodeOriginProcessor.get_uploader().wait_for_payloads() + snapshot_ids = {p["debugger"]["snapshot"]["id"] for p in payloads} + + assert len(payloads) == len(snapshot_ids) + + entry_snapshot_id = entry.get_tag("_dd.code_origin.frames.0.snapshot_id") + assert entry.get_tag("_dd.code_origin.type") == "entry" + assert entry_snapshot_id in snapshot_ids + + # Check that we don't have span location tags on the middle span + assert middle.get_tag("_dd.code_origin.frames.0.snapshot_id") is None + + # Check that we have all the snapshots for the exit span + assert _exit.get_tag("_dd.code_origin.type") == "exit" + snapshot_ids_from_span_tags = {_exit.get_tag(f"_dd.code_origin.frames.{_}.snapshot_id") for _ in range(8)} + snapshot_ids_from_span_tags.discard(None) + assert snapshot_ids_from_span_tags < snapshot_ids + + # Check that we have complete data + snapshot_ids_from_span_tags.add(entry_snapshot_id) + assert snapshot_ids_from_span_tags == snapshot_ids diff --git a/tests/debugging/signal/test_collector.py b/tests/debugging/signal/test_collector.py index 49e4f1aef2c..9bb4e72de8b 100644 --- a/tests/debugging/signal/test_collector.py +++ b/tests/debugging/signal/test_collector.py @@ -5,10 +5,12 @@ import mock +from ddtrace.debugging._probe.model import ProbeEvalTiming from ddtrace.debugging._signal.collector import SignalCollector from ddtrace.debugging._signal.log import LogSignal from ddtrace.debugging._signal.model import SignalState from ddtrace.debugging._signal.snapshot import Snapshot +from tests.debugging.utils import create_log_function_probe from tests.debugging.utils import create_snapshot_line_probe @@ -45,7 +47,18 @@ def has_message(self): c = 0 for i in range(10): - mocked_signal = MockLogSignal(mock.Mock(), sys._getframe(), threading.current_thread()) + mocked_signal = MockLogSignal( + create_log_function_probe( + probe_id="test", + template=None, + segments=[], + module="foo", + func_qname="bar", + evaluate_at=ProbeEvalTiming.ENTRY, + ), + sys._getframe(), + threading.current_thread(), + ) mocked_signal.do_enter() assert mocked_signal.enter_call_count == 1 diff --git a/tests/debugging/test_debugger_span_decoration.py b/tests/debugging/test_debugger_span_decoration.py index 5d7e51ee6a7..8c9eb3b42a5 100644 --- a/tests/debugging/test_debugger_span_decoration.py +++ b/tests/debugging/test_debugger_span_decoration.py @@ -126,7 +126,7 @@ def test_debugger_span_decoration_probe_in_inner_function_active_span(self): create_span_decoration_line_probe( probe_id="span-decoration", source_file="tests/submod/traced_stuff.py", - line=3, + line=6, target_span=SpanDecorationTargetSpan.ACTIVE, decorations=[ SpanDecoration( @@ -179,7 +179,7 @@ def test_debugger_span_decoration_probe_in_traced_function_active_span(self): create_span_decoration_line_probe( probe_id="span-decoration", source_file="tests/submod/traced_stuff.py", - line=7, + line=10, target_span=SpanDecorationTargetSpan.ACTIVE, decorations=[ SpanDecoration( @@ -205,7 +205,7 @@ def test_debugger_span_decoration_probe_in_traced_function_root_span(self): create_span_decoration_line_probe( probe_id="span-decoration", source_file="tests/submod/traced_stuff.py", - line=8, + line=11, target_span=SpanDecorationTargetSpan.ROOT, decorations=[ SpanDecoration( diff --git a/tests/debugging/test_session.py b/tests/debugging/test_session.py new file mode 100644 index 00000000000..ef71b062d70 --- /dev/null +++ b/tests/debugging/test_session.py @@ -0,0 +1,35 @@ +import pytest + +from ddtrace.debugging._session import DEFAULT_SESSION_LEVEL +from ddtrace.debugging._session import Session +from ddtrace.debugging._session import _sessions_from_debug_tag +from ddtrace.debugging._session import _sessions_to_debug_tag + + +@pytest.mark.parametrize( + "tag,sessions", + [ + ("session1", [Session("session1", DEFAULT_SESSION_LEVEL)]), + ("session1:2", [Session("session1", 2)]), + ("session1:1.session2:2", [Session("session1", 1), Session("session2", 2)]), + ("session1.session2", [Session("session1", DEFAULT_SESSION_LEVEL), Session("session2", DEFAULT_SESSION_LEVEL)]), + ], +) +def test_session_parse(tag, sessions): + assert list(_sessions_from_debug_tag(tag)) == sessions + + +@pytest.mark.parametrize( + "a,b", + [ + ("session1:1.session2:2", "session1.session2:2"), + ("session1:0.session2:2", "session1:0.session2:2"), + ("session1.session2", "session1.session2"), + ], +) +def test_tag_identity(a, b): + """ + Test that the combination of _sessions_from_debug_tag and + _sessions_to_debug_tag is the identity function. + """ + assert _sessions_to_debug_tag(_sessions_from_debug_tag(a)) == b diff --git a/tests/debugging/utils.py b/tests/debugging/utils.py index d4828be3cfd..4d25f908bb3 100644 --- a/tests/debugging/utils.py +++ b/tests/debugging/utils.py @@ -1,3 +1,5 @@ +import uuid + from ddtrace.debugging._expressions import DDExpression from ddtrace.debugging._expressions import dd_compile from ddtrace.debugging._probe.model import DEFAULT_CAPTURE_LIMITS @@ -16,6 +18,7 @@ from ddtrace.debugging._probe.model import SpanDecorationTargetSpan from ddtrace.debugging._probe.model import SpanFunctionProbe from ddtrace.debugging._probe.model import StringTemplate +from ddtrace.debugging._probe.model import TriggerFunctionProbe from ddtrace.debugging._redaction import DDRedactedExpression @@ -114,6 +117,16 @@ def _wrapper(*args, **kwargs): return _wrapper +def trigger_probe_defaults(f): + def _wrapper(*args, **kwargs): + kwargs.setdefault("session_id", str(uuid.uuid4)) + kwargs.setdefault("level", 0) + kwargs.setdefault("rate", DEFAULT_PROBE_RATE) + return f(*args, **kwargs) + + return _wrapper + + @create_probe_defaults @probe_conditional_defaults @snapshot_probe_defaults @@ -177,3 +190,10 @@ def create_span_decoration_line_probe(**kwargs): @span_decoration_probe_defaults def create_span_decoration_function_probe(**kwargs): return SpanDecorationFunctionProbe(**kwargs) + + +@create_probe_defaults +@probe_conditional_defaults +@trigger_probe_defaults +def create_trigger_function_probe(**kwargs): + return TriggerFunctionProbe(**kwargs) diff --git a/tests/submod/traced_stuff.py b/tests/submod/traced_stuff.py index a414e86b4e3..caa28dac5b4 100644 --- a/tests/submod/traced_stuff.py +++ b/tests/submod/traced_stuff.py @@ -1,4 +1,7 @@ # -*- encoding: utf-8 -*- +from ddtrace.ext import SpanTypes + + def inner(): return 42 @@ -6,3 +9,22 @@ def inner(): def traceme(): cake = "🍰" # noqa return 42 + inner() + + +def exit_call(tracer): + with tracer.trace("exit", span_type=SpanTypes.HTTP): + return 42 + + +def middle(tracer): + with tracer.trace("middle"): + return exit_call(tracer) + + +def entrypoint(tracer): + return middle(tracer) + + +def traced_entrypoint(tracer): + with tracer.trace("entry"): + return entrypoint(tracer) diff --git a/tests/tracer/test_context.py b/tests/tracer/test_context.py index 15b2f870fae..e8fdcfc6782 100644 --- a/tests/tracer/test_context.py +++ b/tests/tracer/test_context.py @@ -37,10 +37,6 @@ def test_eq(ctx1, ctx2): Context(trace_id=123, span_id=321, dd_origin="synthetics", sampling_priority=2), Context(trace_id=1234, span_id=321, dd_origin="synthetics", sampling_priority=2), ), - ( - Context(trace_id=123, span_id=321, dd_origin="synthetics", sampling_priority=2), - Context(trace_id=123, span_id=3210, dd_origin="synthetics", sampling_priority=2), - ), ( Context(trace_id=123, span_id=321, dd_origin="synthetics", sampling_priority=2), Context(trace_id=123, span_id=321, dd_origin="synthetics1", sampling_priority=2),