Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(di): trigger probes #10942

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
16 changes: 14 additions & 2 deletions ddtrace/_trace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,17 @@ class Context(object):
boundaries.
"""

__slots__ = ["trace_id", "span_id", "_lock", "_meta", "_metrics", "_span_links", "_baggage", "_is_remote"]
__slots__ = [
"trace_id",
"span_id",
"_lock",
"_meta",
"_metrics",
"_span_links",
"_baggage",
"_is_remote",
"__weakref__",
]

def __init__(
self,
Expand Down Expand Up @@ -259,7 +269,6 @@ def __eq__(self, other: Any) -> bool:
with self._lock:
return (
self.trace_id == other.trace_id
and self.span_id == other.span_id
and self._meta == other._meta
and self._metrics == other._metrics
and self._span_links == other._span_links
Expand All @@ -279,4 +288,7 @@ def __repr__(self) -> str:
self._is_remote,
)

def __hash__(self) -> int:
return hash(self.trace_id)

__str__ = __repr__
3 changes: 3 additions & 0 deletions ddtrace/contrib/trace_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ddtrace.internal.compat import ensure_text
from ddtrace.internal.compat import ip_is_global
from ddtrace.internal.compat import parse
from ddtrace.internal.core.event_hub import dispatch
from ddtrace.internal.logger import get_logger
from ddtrace.internal.utils.cache import cached
from ddtrace.internal.utils.http import normalize_header_name
Expand Down Expand Up @@ -589,6 +590,8 @@ def activate_distributed_headers(tracer, int_config=None, request_headers=None,
# have a context with the same trace id active
tracer.context_provider.activate(context)

dispatch("distributed_context.activated", (context,))
P403n1x87 marked this conversation as resolved.
Show resolved Hide resolved


def _flatten(
obj, # type: Any
Expand Down
4 changes: 4 additions & 0 deletions ddtrace/debugging/_exception/replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ddtrace._trace.span import Span
from ddtrace.debugging._probe.model import LiteralTemplateSegment
from ddtrace.debugging._probe.model import LogLineProbe
from ddtrace.debugging._session import Session
from ddtrace.debugging._signal.snapshot import DEFAULT_CAPTURE_LIMITS
from ddtrace.debugging._signal.snapshot import Snapshot
from ddtrace.debugging._uploader import LogsIntakeUploaderV1
Expand Down Expand Up @@ -193,6 +194,9 @@ def can_capture(span: Span) -> bool:
return True

if info_captured is None:
if Session.from_trace():
# If we are in a debug session we always capture
return True
result = GLOBAL_RATE_LIMITER.limit() is not RateLimitExceeded
root.set_tag_str(CAPTURE_TRACE_TAG, str(result).lower())
return result
Expand Down
10 changes: 10 additions & 0 deletions ddtrace/debugging/_live.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from ddtrace.debugging._session import Session
from ddtrace.internal import core


def enable() -> None:
core.on("distributed_context.activated", Session.activate_distributed, "live_debugger")


def disable() -> None:
core.reset_listeners("distributed_context.activated", Session.activate_distributed)
133 changes: 77 additions & 56 deletions ddtrace/debugging/_origin/span.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
from dataclasses import dataclass
from functools import partial
from itertools import count
from pathlib import Path
import sys
import time

# from threading import current_thread
from threading import current_thread
from time import monotonic_ns
from types import FrameType
from types import FunctionType
import typing as t
import uuid

import ddtrace

# from ddtrace import config
from ddtrace._trace.processor import SpanProcessor

# from ddtrace.debugging._debugger import Debugger
from ddtrace.debugging._probe.model import DEFAULT_CAPTURE_LIMITS
from ddtrace.debugging._probe.model import LiteralTemplateSegment
from ddtrace.debugging._probe.model import LogFunctionProbe
from ddtrace.debugging._probe.model import LogLineProbe
from ddtrace.debugging._probe.model import ProbeEvalTiming

# from ddtrace.debugging._signal.snapshot import Snapshot
from ddtrace.debugging._signal.model import Signal
from ddtrace.debugging._session import Session
from ddtrace.debugging._signal.collector import SignalCollector
from ddtrace.debugging._signal.snapshot import Snapshot
from ddtrace.debugging._uploader import LogsIntakeUploaderV1
from ddtrace.debugging._uploader import UploaderProduct
from ddtrace.ext import EXIT_SPAN_TYPES
from ddtrace.internal import core
from ddtrace.internal.packages import is_user_code
Expand All @@ -40,13 +38,13 @@ def frame_stack(frame: FrameType) -> t.Iterator[FrameType]:
_frame = _frame.f_back


def wrap_entrypoint(f: t.Callable) -> None:
def wrap_entrypoint(collector: SignalCollector, f: t.Callable) -> None:
if not _isinstance(f, FunctionType):
return

_f = t.cast(FunctionType, f)
if not EntrySpanWrappingContext.is_wrapped(_f):
EntrySpanWrappingContext(_f).wrap()
EntrySpanWrappingContext(collector, _f).wrap()


@dataclass
Expand Down Expand Up @@ -120,9 +118,13 @@ class EntrySpanLocation:


class EntrySpanWrappingContext(WrappingContext):
def __init__(self, f):
__priority__ = 199

def __init__(self, collector: SignalCollector, f: FunctionType) -> None:
super().__init__(f)

self.collector = collector

filename = str(Path(f.__code__.co_filename).resolve())
name = f.__qualname__
module = f.__module__
Expand Down Expand Up @@ -152,36 +154,37 @@ def __enter__(self):
s.set_tag_str("_dd.code_origin.frames.0.type", location.module)
s.set_tag_str("_dd.code_origin.frames.0.method", location.name)

# TODO[gab]: This will be enabled as part of the live debugger/distributed debugging
# if ld_config.enabled:
# # Create a snapshot
# snapshot = Snapshot(
# probe=location.probe,
# frame=self.__frame__,
# thread=current_thread(),
# trace_context=root,
# )

# # Capture on entry
# context = Debugger.get_collector().attach(snapshot)

# # Correlate the snapshot with the span
# root.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)
# span.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)

# self.set("context", context)
# self.set("start_time", time.monotonic_ns())
self.set("start_time", monotonic_ns())

return self

def _close_signal(self, retval=None, exc_info=(None, None, None)):
try:
signal: Signal = t.cast(Signal, self.get("signal"))
except KeyError:
# No snapshot was created
root = ddtrace.tracer.current_root_span()
span = ddtrace.tracer.current_span()
if root is None or span is None:
return

signal.do_exit(retval, exc_info, time.monotonic_ns() - self.get("start_time"))
# Check if we have any level 2 debugging sessions running for the
# current trace
if any(s.level >= 2 for s in Session.from_trace()):
P403n1x87 marked this conversation as resolved.
Show resolved Hide resolved
# Create a snapshot
snapshot = Snapshot(
probe=self.location.probe,
frame=self.__frame__,
thread=current_thread(),
trace_context=root,
)

# Capture on entry
snapshot.do_enter()

# Correlate the snapshot with the span
root.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)
span.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)

snapshot.do_exit(retval, exc_info, monotonic_ns() - self.get("start_time"))

self.collector.push(snapshot)

def __return__(self, retval):
self._close_signal(retval=retval)
Expand All @@ -194,7 +197,10 @@ def __exit__(self, exc_type, exc_value, traceback):

@dataclass
class SpanCodeOriginProcessor(SpanProcessor):
__uploader__ = LogsIntakeUploaderV1

_instance: t.Optional["SpanCodeOriginProcessor"] = None
_handler: t.Optional[t.Callable] = None

def on_span_start(self, span: Span) -> None:
if span.span_type not in EXIT_SPAN_TYPES:
Expand All @@ -219,24 +225,25 @@ def on_span_start(self, span: Span) -> None:
# DEV: Without a function object we cannot infer the function
# and any potential class name.

# TODO[gab]: This will be enabled as part of the live debugger/distributed debugging
# if ld_config.enabled:
# # Create a snapshot
# snapshot = Snapshot(
# probe=ExitSpanProbe.from_frame(frame),
# frame=frame,
# thread=current_thread(),
# trace_context=span,
# )
# Check if we have any level 2 debugging sessions running for
# the current trace
if any(s.level >= 2 for s in Session.from_trace()):
# Create a snapshot
snapshot = Snapshot(
probe=ExitSpanProbe.from_frame(frame),
frame=frame,
thread=current_thread(),
trace_context=span,
)

# # Capture on entry
# snapshot.line()
# Capture on entry
snapshot.do_line()

# # Collect
# Debugger.get_collector().push(snapshot)
# Collect
self.__uploader__.get_collector().push(snapshot)

# # Correlate the snapshot with the span
# span.set_tag_str(f"_dd.code_origin.frames.{n}.snapshot_id", snapshot.uuid)
# Correlate the snapshot with the span
span.set_tag_str(f"_dd.code_origin.frames.{n}.snapshot_id", snapshot.uuid)

def on_span_finish(self, span: Span) -> None:
pass
Expand All @@ -246,17 +253,31 @@ def enable(cls):
if cls._instance is not None:
return

core.on("service_entrypoint.patch", wrap_entrypoint)

instance = cls._instance = cls()

# Register code origin for span with the snapshot uploader
cls.__uploader__.register(UploaderProduct.CODE_ORIGIN_SPAN)

# Register the processor for exit spans
instance.register()

# Register the entrypoint wrapping for entry spans
cls._handler = handler = partial(wrap_entrypoint, cls.__uploader__.get_collector())
core.on("service_entrypoint.patch", handler)

@classmethod
def disable(cls):
if cls._instance is None:
return

# Unregister the entrypoint wrapping for entry spans
core.reset_listeners("service_entrypoint.patch", cls._handler)
cls._handler = None

# Unregister the processor for exit spans
cls._instance.unregister()
cls._instance = None

core.reset_listeners("service_entrypoint.patch", wrap_entrypoint)
# Unregister code origin for span with the snapshot uploader
cls.__uploader__.unregister(UploaderProduct.CODE_ORIGIN_SPAN)

cls._instance = None
25 changes: 23 additions & 2 deletions ddtrace/debugging/_probe/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from typing import Union

from ddtrace.debugging._expressions import DDExpression
from ddtrace.debugging._session import SessionId
from ddtrace.internal.compat import maybe_stringify
from ddtrace.internal.logger import get_logger
from ddtrace.internal.module import _resolve
Expand All @@ -26,6 +27,7 @@

DEFAULT_PROBE_RATE = 5000.0
DEFAULT_SNAPSHOT_PROBE_RATE = 1.0
DEFAULT_TRIGGER_PROBE_RATE = 1.0 / 60.0 # 1 per minute
DEFAULT_PROBE_CONDITION_ERROR_RATE = 1.0 / 60 / 5


Expand Down Expand Up @@ -301,12 +303,31 @@ class SpanDecorationFunctionProbe(Probe, FunctionLocationMixin, TimingMixin, Spa
pass


LineProbe = Union[LogLineProbe, MetricLineProbe, SpanDecorationLineProbe]
FunctionProbe = Union[LogFunctionProbe, MetricFunctionProbe, SpanFunctionProbe, SpanDecorationFunctionProbe]
@dataclass
class SessionMixin:
session_id: SessionId
level: int


@dataclass
class TriggerLineProbe(Probe, LineLocationMixin, SessionMixin, ProbeConditionMixin, RateLimitMixin):
pass


@dataclass
class TriggerFunctionProbe(Probe, FunctionLocationMixin, SessionMixin, ProbeConditionMixin, RateLimitMixin):
pass


LineProbe = Union[LogLineProbe, MetricLineProbe, SpanDecorationLineProbe, TriggerLineProbe]
FunctionProbe = Union[
LogFunctionProbe, MetricFunctionProbe, SpanFunctionProbe, SpanDecorationFunctionProbe, TriggerFunctionProbe
]


class ProbeType(str, Enum):
LOG_PROBE = "LOG_PROBE"
METRIC_PROBE = "METRIC_PROBE"
SPAN_PROBE = "SPAN_PROBE"
SPAN_DECORATION_PROBE = "SPAN_DECORATION_PROBE"
TRIGGER_PROBE = "TRIGGER_PROBE"
Loading
Loading