diff --git a/.gitignore b/.gitignore index 2bd3cd4fef5..4a25e9c965c 100644 --- a/.gitignore +++ b/.gitignore @@ -18,6 +18,8 @@ ddtrace/internal/_encoding.c ddtrace/internal/_rand.c ddtrace/internal/_tagset.c ddtrace/internal/telemetry/metrics_namespaces.c +ddtrace/_trace/processor/__init__.c +ddtrace/internal/peer_service/processor.c pygoat/* *.so *.dylib diff --git a/ddtrace/_trace/filters.py b/ddtrace/_trace/filters.py index 538a833d2c4..7170b994035 100644 --- a/ddtrace/_trace/filters.py +++ b/ddtrace/_trace/filters.py @@ -6,7 +6,7 @@ from ddtrace._trace.span import Span -class TraceFilter(TraceProcessor): +class TraceFilter(TraceProcessor, metaclass=abc.ABCMeta): @abc.abstractmethod def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: """Processes a trace. diff --git a/ddtrace/_trace/processor/__init__.pxd b/ddtrace/_trace/processor/__init__.pxd new file mode 100644 index 00000000000..05c047dcf9f --- /dev/null +++ b/ddtrace/_trace/processor/__init__.pxd @@ -0,0 +1,2 @@ +cdef class TraceProcessor: + cpdef object process_trace(self, list trace) diff --git a/ddtrace/_trace/processor/__init__.pyi b/ddtrace/_trace/processor/__init__.pyi new file mode 100644 index 00000000000..065aef38cdd --- /dev/null +++ b/ddtrace/_trace/processor/__init__.pyi @@ -0,0 +1,53 @@ +from threading import RLock +from typing import Dict, List, Optional + +from ddtrace._trace.span import Span +from ddtrace.internal.writer.writer import TraceWriter + +class TraceProcessor: + def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: ... + +class SpanProcessor: + __processors__: List["SpanProcessor"] + + def on_span_start(self, span: Span) -> None: ... + def on_span_finish(self, span: Span) -> None: ... + def shutdown(self, timeout: Optional[float]) -> None: ... + def register(self) -> None: ... + def unregister(self) -> None: ... + +class TraceSamplingProcessor(TraceProcessor): ... +class TopLevelSpanProcessor(SpanProcessor): ... +class ServiceNameProcessor(TraceProcessor): ... +class TraceTagsProcessor(TraceProcessor): ... + +class SpanAggregator(SpanProcessor): + partial_flush_enabled: bool + partial_flush_min_spans: int + sampling_processor: TraceSamplingProcessor + tags_processor: TraceTagsProcessor + service_name_processor: ServiceNameProcessor + dd_processors: List[TraceProcessor] + user_processors: List[TraceProcessor] + writer: TraceWriter + _traces: Dict[int, List[Span]] + _spans_created: Dict[str, int] + _spans_finished: Dict[str, int] + _lock: RLock + _total_spans_finished: int + + def __init__( + self, + partial_flush_enabled: bool, + partial_flush_min_spans: int, + dd_processors: Optional[List[TraceProcessor]] = None, + user_processors: Optional[List[TraceProcessor]] = None, + ) -> None: ... + def reset( + self, + user_processors: Optional[List[TraceProcessor]] = None, + compute_stats: Optional[bool] = None, + apm_opt_out: Optional[bool] = None, + appsec_enabled: Optional[bool] = None, + reset_buffer: bool = True, + ) -> None: ... diff --git a/ddtrace/_trace/processor/__init__.py b/ddtrace/_trace/processor/__init__.pyx similarity index 74% rename from ddtrace/_trace/processor/__init__.py rename to ddtrace/_trace/processor/__init__.pyx index 7248695f3b5..83b2b153de2 100644 --- a/ddtrace/_trace/processor/__init__.py +++ b/ddtrace/_trace/processor/__init__.pyx @@ -1,10 +1,7 @@ -import abc -from collections import defaultdict +# cython: freethreading_compatible=True from itertools import chain import logging from threading import RLock -from typing import DefaultDict -from typing import Dict from typing import List from typing import Optional @@ -36,15 +33,14 @@ log = get_logger(__name__) -class TraceProcessor(metaclass=abc.ABCMeta): +cdef class TraceProcessor: def __init__(self) -> None: """Default post initializer which logs the representation of the TraceProcessor at the ``logging.DEBUG`` level. """ pass - @abc.abstractmethod - def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: + cpdef object process_trace(self, list trace): """Processes a trace. ``None`` can be returned to prevent the trace from being further @@ -53,7 +49,7 @@ def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: pass -class SpanProcessor(metaclass=abc.ABCMeta): +cdef class SpanProcessor: """A Processor is used to process spans as they are created and finished by a tracer.""" __processors__: List["SpanProcessor"] = [] @@ -64,7 +60,6 @@ def __init__(self) -> None: """ pass - @abc.abstractmethod def on_span_start(self, span: Span) -> None: """Called when a span is started. @@ -75,7 +70,6 @@ def on_span_start(self, span: Span) -> None: """ pass - @abc.abstractmethod def on_span_finish(self, span: Span) -> None: """Called with the result of any previous processors or initially with the finishing span when a span finishes. @@ -104,7 +98,7 @@ def unregister(self) -> None: log.warning("Span processor %r not registered", self) -class TraceSamplingProcessor(TraceProcessor): +cdef class TraceSamplingProcessor(TraceProcessor): """Processor that runs both trace and span sampling rules. * Span sampling must be applied after trace sampling priority has been set. @@ -115,6 +109,11 @@ class TraceSamplingProcessor(TraceProcessor): Agent even if the dropped trace is not (as is the case when trace stats computation is enabled). """ + # TODO: Make these not public anymore + cdef public bint _compute_stats_enabled, _apm_opt_out + cdef public list single_span_rules + cdef public object sampler + def __init__( self, compute_stats_enabled: bool, @@ -125,7 +124,7 @@ def __init__( self._compute_stats_enabled = compute_stats_enabled self.single_span_rules = single_span_rules self.sampler = DatadogSampler() - self.apm_opt_out = apm_opt_out + self._apm_opt_out = apm_opt_out @property def apm_opt_out(self): @@ -145,7 +144,11 @@ def apm_opt_out(self, value): self.sampler._rate_limit_always_on = False self._apm_opt_out = value - def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: + cpdef object process_trace(self, list trace): + cdef object chunk_root, span + cdef list single_spans + cdef bint can_drop_trace + if trace: chunk_root = trace[0] @@ -187,7 +190,7 @@ def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: return None -class TopLevelSpanProcessor(SpanProcessor): +cdef class TopLevelSpanProcessor(SpanProcessor): """Processor marks spans as top level A span is top level when it is the entrypoint method for a request to a service. @@ -199,29 +202,31 @@ class TopLevelSpanProcessor(SpanProcessor): """ - def on_span_start(self, _: Span) -> None: + cpdef void on_span_start(self, _: Span): pass - def on_span_finish(self, span: Span) -> None: + cpdef void on_span_finish(self, span: Span): # DEV: Update span after finished to avoid race condition if span._is_top_level: span._metrics["_dd.top_level"] = 1 # PERF: avoid setting via Span.set_metric -class ServiceNameProcessor(TraceProcessor): +cdef class ServiceNameProcessor(TraceProcessor): """Processor that adds the service name to the globalconfig.""" - def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: + cpdef object process_trace(self, list trace): + cdef object span for span in trace: if span.service: config._add_extra_service(span.service) return trace -class TraceTagsProcessor(TraceProcessor): +cdef class TraceTagsProcessor(TraceProcessor): """Processor that applies trace-level tags to the trace.""" - def _set_git_metadata(self, chunk_root): + cdef void _set_git_metadata(self, chunk_root: Span) except *: + cdef str repository_url, commit_sha, main_package repository_url, commit_sha, main_package = gitmetadata.get_git_tags() if repository_url: chunk_root.set_tag_str("_dd.git.repository_url", repository_url) @@ -230,7 +235,11 @@ def _set_git_metadata(self, chunk_root): if main_package: chunk_root.set_tag_str("_dd.python_main_package", main_package) - def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: + cpdef object process_trace(self, list trace): + cdef list spans_to_tag + cdef object span + cdef str trace_id_hob + if not trace: return trace @@ -262,13 +271,27 @@ def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: return trace -class _Trace: - def __init__(self, spans=None, num_finished=0): - self.spans = spans if spans is not None else [] - self.num_finished = num_finished +cdef class _Trace: + cdef list spans + cdef int num_finished + def __init__(self, spans: Optional[List[Span]] = None, num_finished: int = 0) -> None: + self.spans: List[Span] = spans if spans is not None else [] + self.num_finished: int = num_finished -class SpanAggregator(SpanProcessor): + cdef list remove_finished(self): + cdef object s + cdef list finished + # perf: Avoid Span.finished which is a computed property and has function call overhead + # so check Span.duration_ns manually. + finished = [s for s in self.spans if s.duration_ns is not None] + if finished: + self.spans[:] = [s for s in self.spans if s.duration_ns is None] + self.num_finished = 0 + return finished + + +cdef class SpanAggregator(SpanProcessor): """Processor that aggregates spans together by trace_id and writes the spans to the provided writer when: - The collection is assumed to be complete. A collection of spans is @@ -286,6 +309,16 @@ class SpanAggregator(SpanProcessor): SPAN_START_DEBUG_MESSAGE = "Starting span: %s, trace has %d spans in the span aggregator" + # TODO: Make these not public anymore + cdef public bint partial_flush_enabled + cdef public int partial_flush_min_spans + cdef public object sampling_processor, tags_processor, service_name_processor + cdef public list dd_processors, user_processors + cdef public object writer + cdef public dict _traces, _spans_created, _spans_finished + cdef public object _lock + cdef public int _total_spans_finished + def __init__( self, partial_flush_enabled: bool, @@ -306,14 +339,13 @@ def __init__( self.service_name_processor = ServiceNameProcessor() self.writer = create_trace_writer(response_callback=self._agent_response_callback) # Initialize the trace buffer and lock - self._traces: DefaultDict[int, _Trace] = defaultdict(lambda: _Trace()) + self._traces = {} self._lock: RLock = RLock() # Track telemetry span metrics by span api # ex: otel api, opentracing api, datadog api - self._span_metrics: Dict[str, DefaultDict] = { - "spans_created": defaultdict(int), - "spans_finished": defaultdict(int), - } + self._spans_created = {} + self._spans_finished = {} + self._total_spans_finished: int = 0 super(SpanAggregator, self).__init__() def __repr__(self) -> str: @@ -326,55 +358,75 @@ def __repr__(self) -> str: f"{self.tags_processor}," f"{self.dd_processors}, " f"{self.user_processors}, " - f"{self._span_metrics}, " + f"{self._spans_finished}, " f"{self.writer})" ) - def on_span_start(self, span: Span) -> None: + cdef void _emit_telemetry_metrics(self, bint force_flush = False) except *: + ns = TELEMETRY_NAMESPACE.TRACERS + add_count_metric = telemetry.telemetry_writer.add_count_metric + + if self._total_spans_finished >= 100 or force_flush: + for tag_value, count in self._spans_created.items(): + add_count_metric(ns, "spans_created", count, tags=(("integration_name", tag_value),)) + self._spans_created.clear() + + for tag_value, count in self._spans_finished.items(): + add_count_metric(ns, "spans_finished", count, tags=(("integration_name", tag_value),)) + + self._spans_finished.clear() + self._total_spans_finished = 0 + + cpdef void on_span_start(self, span: Span) except *: + cdef _Trace trace + cdef str integration_name + with self._lock: - trace = self._traces[span.trace_id] + trace = self._traces.setdefault(span.trace_id, _Trace()) trace.spans.append(span) - integration_name = span._meta.get(COMPONENT, span._span_api) + if config._telemetry_enabled: + integration_name = span._meta.get(COMPONENT, span._span_api) + self._spans_created.setdefault(integration_name, 0) + self._spans_created[integration_name] += 1 + + # perf: Avoid computed arguments unless we are actually going to log + if log.isEnabledFor(logging.DEBUG): + log.debug(self.SPAN_START_DEBUG_MESSAGE, span, len(trace.spans)) + + cpdef void on_span_finish(self, span: Span) except *: + cdef _Trace trace + cdef str integration_name + cdef int num_buffered, num_finished + cdef bint is_trace_complete, should_partial_flush + cdef list finished, spans - self._span_metrics["spans_created"][integration_name] += 1 - self._queue_span_count_metrics("spans_created", "integration_name") - log.debug(self.SPAN_START_DEBUG_MESSAGE, span, len(trace.spans)) - - def on_span_finish(self, span: Span) -> None: # Acquire lock to get finished and update trace.spans with self._lock: - integration_name = span._meta.get(COMPONENT, span._span_api) - self._span_metrics["spans_finished"][integration_name] += 1 + if config._telemetry_enabled: + self._total_spans_finished += 1 + integration_name = span._meta.get(COMPONENT, span._span_api) + self._spans_finished.setdefault(integration_name, 0) + self._spans_finished[integration_name] += 1 + self._emit_telemetry_metrics() if span.trace_id not in self._traces: return trace = self._traces[span.trace_id] - num_buffered = len(trace.spans) trace.num_finished += 1 - should_partial_flush = self.partial_flush_enabled and trace.num_finished >= self.partial_flush_min_spans - is_trace_complete = trace.num_finished >= len(trace.spans) - if not is_trace_complete and not should_partial_flush: - return - - if not is_trace_complete: - finished = [s for s in trace.spans if s.finished] - if not finished: - return - trace.spans[:] = [s for s in trace.spans if not s.finished] # In-place update - trace.num_finished = 0 - else: + num_buffered = len(trace.spans) + is_trace_complete = trace.num_finished >= num_buffered + num_finished = trace.num_finished + should_partial_flush = False + if is_trace_complete: finished = trace.spans del self._traces[span.trace_id] - # perf: Flush span finish metrics to the telemetry writer after the trace is complete - self._queue_span_count_metrics("spans_finished", "integration_name") - - num_finished = len(finished) - if should_partial_flush: - # FIXME(munir): should_partial_flush should return false if all the spans in the trace are finished. - # For example if partial flushing min spans is 10 and the trace has 10 spans, the trace should - # not have a partial flush metric. This trace was processed in its entirety. - finished[0].set_metric("_dd.py.partial_flush", num_finished) + elif self.partial_flush_enabled and num_finished >= self.partial_flush_min_spans: + should_partial_flush = True + finished = trace.remove_finished() + finished[0].set_metric("_dd.py.partial_flush", num_finished) + else: + return # perf: Process spans outside of the span aggregator lock spans = finished @@ -396,21 +448,23 @@ def on_span_finish(self, span: Span) -> None: sampling_priority = root_span.context.sampling_priority sampling_mechanism = root_span.context._meta.get(SAMPLING_DECISION_TRACE_TAG_KEY, "None") - log.debug( - self.SPAN_FINISH_DEBUG_MESSAGE, - len(spans), - num_buffered, - num_finished - len(spans), - num_buffered - num_finished, - spans[0].trace_id, - spans[0].name, - sampling_priority, - sampling_mechanism, - should_partial_flush, - ) + # Avoid computed arguments unless we are actually going to log + if log.isEnabledFor(logging.DEBUG): + log.debug( + self.SPAN_FINISH_DEBUG_MESSAGE, + len(spans), + num_buffered, + num_finished - len(spans), + num_buffered - num_finished, + spans[0].trace_id, + spans[0].name, + sampling_priority, + sampling_mechanism, + should_partial_flush, + ) self.writer.write(spans) - def _agent_response_callback(self, resp: AgentResponse) -> None: + cdef void _agent_response_callback(self, resp: AgentResponse) except *: """Handle the response from the agent. The agent can return updated sample rates for the priority sampler. @@ -423,7 +477,7 @@ def _agent_response_callback(self, resp: AgentResponse) -> None: except ValueError as e: log.error("Failed to set agent service sample rates: %s", str(e)) - def shutdown(self, timeout: Optional[float]) -> None: + cpdef void shutdown(self, timeout: Optional[float]) except *: """ This will stop the background writer/worker and flush any finished traces in the buffer. The tracer cannot be used for tracing after this method has been called. A new tracer instance is required to continue tracing. @@ -432,18 +486,17 @@ def shutdown(self, timeout: Optional[float]) -> None: before exiting or :obj:`None` to block until flushing has successfully completed (default: :obj:`None`) :type timeout: :obj:`int` | :obj:`float` | :obj:`None` """ + cdef _Trace trace + # on_span_start queue span created counts in batches of 100. This ensures all remaining counts are sent # before the tracer is shutdown. - self._queue_span_count_metrics("spans_created", "integration_name", 1) - # on_span_finish(...) queues span finish metrics in batches of 100. - # This ensures all remaining counts are sent before the tracer is shutdown. - self._queue_span_count_metrics("spans_finished", "integration_name", 1) + self._emit_telemetry_metrics(force_flush=True) # Log a warning if the tracer is shutdown before spans are finished if log.isEnabledFor(logging.WARNING): unfinished_spans = [ f"trace_id={s.trace_id} parent_id={s.parent_id} span_id={s.span_id} name={s.name} resource={s.resource} started={s.start} sampling_priority={s.context.sampling_priority}" # noqa: E501 - for t in self._traces.values() - for s in t.spans + for trace in self._traces.values() + for s in trace.spans if not s.finished ] if unfinished_spans: @@ -460,25 +513,14 @@ def shutdown(self, timeout: Optional[float]) -> None: # It's possible the writer never got started in the first place :( pass - def _queue_span_count_metrics(self, metric_name: str, tag_name: str, min_count: int = 100) -> None: - """Queues a telemetry count metric for span created and span finished""" - # perf: telemetry_metrics_writer.add_count_metric(...) is an expensive operation. - # We should avoid calling this method on every invocation of span finish and span start. - if config._telemetry_enabled and sum(self._span_metrics[metric_name].values()) >= min_count: - for tag_value, count in self._span_metrics[metric_name].items(): - telemetry.telemetry_writer.add_count_metric( - TELEMETRY_NAMESPACE.TRACERS, metric_name, count, tags=((tag_name, tag_value),) - ) - self._span_metrics[metric_name] = defaultdict(int) - - def reset( + cpdef void reset( self, user_processors: Optional[List[TraceProcessor]] = None, compute_stats: Optional[bool] = None, apm_opt_out: Optional[bool] = None, appsec_enabled: Optional[bool] = None, reset_buffer: bool = True, - ) -> None: + ) except *: """ Resets the internal state of the SpanAggregator, including the writer, sampling processor, user-defined processors, and optionally the trace buffer and span metrics. @@ -505,8 +547,7 @@ def reset( # Reset the trace buffer and span metrics. # Useful when forking to prevent sending duplicate spans from parent and child processes. if reset_buffer: - self._traces = defaultdict(lambda: _Trace()) - self._span_metrics = { - "spans_created": defaultdict(int), - "spans_finished": defaultdict(int), - } + self._traces.clear() + self._spans_created.clear() + self._spans_finished.clear() + self._total_spans_finished = 0 diff --git a/ddtrace/_trace/provider.py b/ddtrace/_trace/provider.py index c498f3cfe3c..3b53271cded 100644 --- a/ddtrace/_trace/provider.py +++ b/ddtrace/_trace/provider.py @@ -71,7 +71,10 @@ def activate(self, ctx: Optional[ActiveTrace]) -> None: def active(self) -> Optional[ActiveTrace]: """Returns the active span or context for the current execution.""" item = _DD_CONTEXTVAR.get() - if isinstance(item, Span): + + # perf: type(item) == Span is about the same perf as isinstance(item, Span) + # when item is a Span, but slower when item is a Context + if type(item) == Span: return self._update_active(item) return item @@ -82,8 +85,9 @@ def _update_active(self, span: Span) -> Optional[ActiveTrace]: If no parent exists and the context is reactivatable, that context is restored. """ new_active: Optional[Span] = span - # PERF: Avoid calling `Span.finished` more than once per span. This is a computed property. - while new_active and new_active.finished: + # PERF: Avoid checking if the span is finished more than once per span. + # PERF: By-pass Span.finished which is a computed property to avoid the function call overhead + while new_active and new_active.duration_ns is not None: if new_active._parent is None and new_active._parent_context and new_active._parent_context._reactivate: self.activate(new_active._parent_context) return new_active._parent_context diff --git a/ddtrace/_trace/span.py b/ddtrace/_trace/span.py index ab8ad26dba7..482281f5d15 100644 --- a/ddtrace/_trace/span.py +++ b/ddtrace/_trace/span.py @@ -173,18 +173,6 @@ def __init__( :param object context: the Context of the span. :param on_finish: list of functions called when the span finishes. """ - if not (span_id is None or isinstance(span_id, int)): - if config._raise: - raise TypeError("span_id must be an integer") - return - if not (trace_id is None or isinstance(trace_id, int)): - if config._raise: - raise TypeError("trace_id must be an integer") - return - if not (parent_id is None or isinstance(parent_id, int)): - if config._raise: - raise TypeError("parent_id must be an integer") - return self.name = name self.service = service self._resource = [resource or name] diff --git a/ddtrace/_trace/tracer.py b/ddtrace/_trace/tracer.py index d2d6201636e..12cdabc60ca 100644 --- a/ddtrace/_trace/tracer.py +++ b/ddtrace/_trace/tracer.py @@ -24,7 +24,6 @@ from ddtrace._trace.provider import BaseContextProvider from ddtrace._trace.provider import DefaultContextProvider from ddtrace._trace.span import Span -from ddtrace.appsec._constants import APPSEC from ddtrace.constants import _HOSTNAME_KEY from ddtrace.constants import ENV_KEY from ddtrace.constants import PID @@ -42,7 +41,6 @@ from ddtrace.internal.constants import LOG_ATTR_VALUE_EMPTY from ddtrace.internal.constants import LOG_ATTR_VALUE_ZERO from ddtrace.internal.constants import LOG_ATTR_VERSION -from ddtrace.internal.constants import SAMPLING_DECISION_TRACE_TAG_KEY from ddtrace.internal.constants import SPAN_API_DATADOG from ddtrace.internal.hostname import get_hostname from ddtrace.internal.logger import get_logger @@ -52,7 +50,6 @@ from ddtrace.internal.processor.endpoint_call_counter import EndpointCallCounterProcessor from ddtrace.internal.runtime import get_runtime_id from ddtrace.internal.schema.processor import BaseServiceProcessor -from ddtrace.internal.utils import _get_metas_to_propagate from ddtrace.internal.utils.deprecations import DDTraceDeprecationWarning from ddtrace.internal.utils.formats import format_trace_id from ddtrace.internal.writer import AgentWriterInterface @@ -107,7 +104,7 @@ class Tracer(object): trace = tracer.trace('app.request', 'web-server').finish() """ - SHUTDOWN_TIMEOUT = 5 + SHUTDOWN_TIMEOUT = 5.0 _instance = None def __init__(self) -> None: @@ -530,11 +527,6 @@ def _start_span( if span._parent.service == service: span._service_entry_span = parent._service_entry_span - for k, v in _get_metas_to_propagate(context): - # We do not want to propagate AppSec propagation headers - # to children spans, only across distributed spans - if k not in (SAMPLING_DECISION_TRACE_TAG_KEY, APPSEC.PROPAGATION_HEADER): - span._meta[k] = v else: # this is the root span of a new trace span = Span( @@ -589,14 +581,18 @@ def _on_span_finish(self, span: Span) -> None: active = self.current_span() # Debug check: if the finishing span has a parent and its parent # is not the next active span then this is an error in synchronous tracing. - if span._parent is not None and active is not span._parent: - log.debug("span %r closing after its parent %r, this is an error when not using async", span, span._parent) + if log.isEnabledFor(logging.DEBUG): + if span._parent is not None and active is not span._parent: + log.debug( + "span %r closing after its parent %r, this is an error when not using async", + span, + span._parent, + ) # Only call span processors if the tracer is enabled (even if APM opted out) if self.enabled or asm_config._apm_opt_out: for p in chain(self._span_processors, SpanProcessor.__processors__, [self._span_aggregator]): - if p: - p.on_span_finish(span) + p.on_span_finish(span) core.dispatch("trace.span_finish", (span,)) log.debug("finishing span - %r (enabled:%s)", span, self.enabled) diff --git a/ddtrace/internal/peer_service/processor.py b/ddtrace/internal/peer_service/processor.py deleted file mode 100644 index eae78f9b541..00000000000 --- a/ddtrace/internal/peer_service/processor.py +++ /dev/null @@ -1,42 +0,0 @@ -from ddtrace._trace.processor import TraceProcessor -from ddtrace.constants import SPAN_KIND - - -class PeerServiceProcessor(TraceProcessor): - def __init__(self, peer_service_config): - self._config = peer_service_config - self._set_defaults_enabled = self._config.set_defaults_enabled - self._mapping = self._config.peer_service_mapping - - def process_trace(self, trace): - if not trace: - return - - traces_to_process = [] - if not self._set_defaults_enabled: - traces_to_process = filter(lambda x: x.get_tag(self._config.tag_name), trace) - else: - traces_to_process = filter( - lambda x: x.get_tag(self._config.tag_name) or x.get_tag(SPAN_KIND) in self._config.enabled_span_kinds, - trace, - ) - any(map(lambda x: self._update_peer_service_tags(x), traces_to_process)) - - return trace - - def _update_peer_service_tags(self, span): - tag = span.get_tag(self._config.tag_name) - - if tag: # If the tag already exists, assume it is user generated - span.set_tag_str(self._config.source_tag_name, self._config.tag_name) - else: - for data_source in self._config.prioritized_data_sources: - tag = span.get_tag(data_source) - if tag: - span.set_tag_str(self._config.tag_name, tag) - span.set_tag_str(self._config.source_tag_name, data_source) - break - - if tag in self._mapping: - span.set_tag_str(self._config.remap_tag_name, tag) - span.set_tag_str(self._config.tag_name, self._config.peer_service_mapping[tag]) diff --git a/ddtrace/internal/peer_service/processor.pyx b/ddtrace/internal/peer_service/processor.pyx new file mode 100644 index 00000000000..abd1b025f27 --- /dev/null +++ b/ddtrace/internal/peer_service/processor.pyx @@ -0,0 +1,58 @@ +# cython: freethreading_compatible=True +from typing import List +from typing import Optional + +from ddtrace._trace.processor cimport TraceProcessor +from ddtrace._trace.span import Span +from ddtrace.constants import SPAN_KIND + + +cdef class PeerServiceProcessor(TraceProcessor): + cdef object _config + cdef bint _set_defaults_enabled + cdef dict _mapping + + def __init__(self, peer_service_config): + self._config = peer_service_config + self._set_defaults_enabled = self._config.set_defaults_enabled + self._mapping = self._config.peer_service_mapping + + cpdef process_trace(self, trace: List[Span]): + cdef str tag_name + cdef object span, tag + cdef set enabled_span_kinds + + if not trace: + return + + tag_name = self._config.tag_name + + if not self._set_defaults_enabled: + for span in trace: + tag = span.get_tag(tag_name) + if tag: + self._update_peer_service_tags(span, tag) + else: + enabled_span_kinds = self._config.enabled_span_kinds + for span in trace: + tag = span.get_tag(tag_name) + if tag or span.get_tag(SPAN_KIND) in enabled_span_kinds: + self._update_peer_service_tags(span, tag) + return trace + + cdef inline void _update_peer_service_tags(self, span: Span, tag: Optional[str]): + cdef str data_source + + if tag: # If the tag already exists, assume it is user generated + span.set_tag_str(self._config.source_tag_name, self._config.tag_name) + else: + for data_source in self._config.prioritized_data_sources: + tag = span.get_tag(data_source) + if tag: + span.set_tag_str(self._config.tag_name, tag) + span.set_tag_str(self._config.source_tag_name, data_source) + break + + if tag and tag in self._mapping: + span.set_tag_str(self._config.remap_tag_name, tag) + span.set_tag_str(self._config.tag_name, self._config.peer_service_mapping[tag]) diff --git a/ddtrace/internal/schema/processor.py b/ddtrace/internal/schema/processor.py index 1cdf204279b..8b0f90b2da7 100644 --- a/ddtrace/internal/schema/processor.py +++ b/ddtrace/internal/schema/processor.py @@ -1,4 +1,7 @@ +from typing import List + from ddtrace._trace.processor import TraceProcessor +from ddtrace._trace.span import Span from ddtrace.constants import _BASE_SERVICE_KEY from ddtrace.internal.serverless import in_aws_lambda from ddtrace.settings._config import config @@ -8,22 +11,20 @@ class BaseServiceProcessor(TraceProcessor): def __init__(self): - self._global_service = schematize_service_name((config.service or "").lower()) + self._global_service = str(schematize_service_name((config.service or "").lower())) self._in_aws_lambda = in_aws_lambda() - def process_trace(self, trace): + def process_trace(self, trace: List[Span]): # AWS Lambda spans receive unhelpful base_service value of runtime # Remove base_service to prevent service overrides in Lambda spans - if not trace or self._in_aws_lambda: + if self._in_aws_lambda: return trace - traces_to_process = filter( - lambda x: x.service and x.service.lower() != self._global_service, - trace, - ) - any(map(lambda x: self._update_dd_base_service(x), traces_to_process)) + for span in trace: + if not span.service or span.service == self._global_service: + continue - return trace + if span.service.lower() != self._global_service: + span.set_tag_str(_BASE_SERVICE_KEY, self._global_service) - def _update_dd_base_service(self, span): - span.set_tag_str(key=_BASE_SERVICE_KEY, value=self._global_service) + return trace diff --git a/pyproject.toml b/pyproject.toml index bf2080b78d8..ef782734c40 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -114,10 +114,12 @@ exclude = ''' | .venv* | \.sg | \.riot/ + | ddtrace/_trace/processor/__init__.pyx$ | ddtrace/appsec/_ddwaf.pyx$ | ddtrace/internal/_encoding.pyx$ | ddtrace/internal/_rand.pyx$ | ddtrace/internal/_tagset.pyx$ + | ddtrace/internal/peer_service/processor.pyx$ | ddtrace/internal/telemetry/metrics_namespaces.pyx$ | ddtrace/profiling/collector/_traceback.pyx$ | ddtrace/profiling/collector/_task.pyx$ diff --git a/setup.py b/setup.py index 9efe3a7505f..06138fdfa9a 100644 --- a/setup.py +++ b/setup.py @@ -1027,6 +1027,11 @@ def get_exts_for(name): ext_modules=ext_modules + cythonize( [ + Cython.Distutils.Extension( + "ddtrace._trace.processor.__init__", + sources=["ddtrace/_trace/processor/__init__.pyx"], + language="c", + ), Cython.Distutils.Extension( "ddtrace.internal._rand", sources=["ddtrace/internal/_rand.pyx"], @@ -1044,6 +1049,11 @@ def get_exts_for(name): libraries=encoding_libraries, define_macros=[(f"__{sys.byteorder.upper()}_ENDIAN__", "1")], ), + Cython.Distutils.Extension( + "ddtrace.internal.peer_service.processor", + sources=["ddtrace/internal/peer_service/processor.pyx"], + language="c", + ), Extension( "ddtrace.internal.telemetry.metrics_namespaces", ["ddtrace/internal/telemetry/metrics_namespaces.pyx"], diff --git a/tests/tracer/test_processors.py b/tests/tracer/test_processors.py index f91d911b3bd..31aac2f79a4 100644 --- a/tests/tracer/test_processors.py +++ b/tests/tracer/test_processors.py @@ -38,14 +38,6 @@ def process_trace(self, trace): return trace -def test_no_impl(): - class BadProcessor(SpanProcessor): - pass - - with pytest.raises(TypeError): - BadProcessor() - - def test_aggregator_single_span(): class Proc(TraceProcessor): def process_trace(self, trace): @@ -110,25 +102,26 @@ def test_aggregator_reset_default_args(): Test that on reset, the aggregator recreates trace writer but not the sampling processor (by default). Processors and trace buffers should be reset not reset. """ - dd_proc = DummyProcessor() - user_proc = DummyProcessor() - aggr = SpanAggregator( - partial_flush_enabled=False, - partial_flush_min_spans=1, - dd_processors=[dd_proc], - user_processors=[user_proc], - ) - sampling_proc = aggr.sampling_processor - dm_writer = DummyWriter() - aggr.writer = dm_writer - # Generate a span to init _traces and _span_metrics - span = Span("span", on_finish=[aggr.on_span_finish]) - aggr.on_span_start(span) + with override_global_config(dict(_telemetry_enabled=True)): + dd_proc = DummyProcessor() + user_proc = DummyProcessor() + aggr = SpanAggregator( + partial_flush_enabled=False, + partial_flush_min_spans=1, + dd_processors=[dd_proc], + user_processors=[user_proc], + ) + sampling_proc = aggr.sampling_processor + dm_writer = DummyWriter() + aggr.writer = dm_writer + # Generate a span to init _traces and _span_metrics + span = Span("span", on_finish=[aggr.on_span_finish]) + aggr.on_span_start(span) # Expect SpanAggregator to have the processors and span in _traces assert dd_proc in aggr.dd_processors assert user_proc in aggr.user_processors assert span.trace_id in aggr._traces - assert len(aggr._span_metrics["spans_created"]) == 1 + assert len(aggr._spans_created) == 1 # Expect TraceWriter to be recreated and trace buffers to be reset but not the trace processors aggr.reset() assert dd_proc in aggr.dd_processors @@ -136,7 +129,7 @@ def test_aggregator_reset_default_args(): assert aggr.writer is not dm_writer assert sampling_proc is aggr.sampling_processor assert not aggr._traces - assert len(aggr._span_metrics["spans_created"]) == 0 + assert len(aggr._spans_created) == 0 def test_aggregator_reset_apm_opt_out_preserves_sampling(): @@ -181,24 +174,25 @@ def test_aggregator_reset_with_args(writer_class): user processors/filters and trace api version (when ASM is enabled) """ - dd_proc = DummyProcessor() - user_proc = DummyProcessor() - aggr = SpanAggregator( - partial_flush_enabled=False, - partial_flush_min_spans=1, - dd_processors=[dd_proc], - user_processors=[user_proc], - ) - - aggr.writer = writer_class("http://localhost:8126", api_version="v0.5") - span = Span("span", on_finish=[aggr.on_span_finish]) - aggr.on_span_start(span) + with override_global_config(dict(_telemetry_enabled=True)): + dd_proc = DummyProcessor() + user_proc = DummyProcessor() + aggr = SpanAggregator( + partial_flush_enabled=False, + partial_flush_min_spans=1, + dd_processors=[dd_proc], + user_processors=[user_proc], + ) + + aggr.writer = writer_class("http://localhost:8126", api_version="v0.5") + span = Span("span", on_finish=[aggr.on_span_finish]) + aggr.on_span_start(span) # Expect SpanAggregator to have the expected processors, api_version and span in _traces assert dd_proc in aggr.dd_processors assert user_proc in aggr.user_processors assert span.trace_id in aggr._traces - assert len(aggr._span_metrics["spans_created"]) == 1 + assert len(aggr._spans_created) == 1 assert aggr.writer._api_version == "v0.5" # Expect the default value of apm_opt_out and compute_stats to be False assert aggr.sampling_processor.apm_opt_out is False @@ -211,7 +205,7 @@ def test_aggregator_reset_with_args(writer_class): assert aggr.sampling_processor._compute_stats_enabled is True assert aggr.writer._api_version == "v0.4" assert span.trace_id in aggr._traces - assert len(aggr._span_metrics["spans_created"]) == 1 + assert len(aggr._spans_created) == 1 def test_aggregator_bad_processor(): @@ -315,7 +309,8 @@ def test_aggregator_partial_flush_0_spans(): assert parent.get_metric("_dd.py.partial_flush") == 1 child.finish() assert writer.pop() == [child] - assert child.get_metric("_dd.py.partial_flush") == 1 + # Not a partial flush since the trace size at this point is 1 and we finished the last span + assert child.get_metric("_dd.py.partial_flush") is None def test_aggregator_partial_flush_2_spans(): @@ -375,7 +370,9 @@ def test_aggregator_partial_flush_2_spans(): assert parent.get_metric("_dd.py.partial_flush") is None -@pytest.mark.subprocess(env={"DD_TRACE_PARTIAL_FLUSH_ENABLED": "true", "DD_TRACE_PARTIAL_FLUSH_MIN_SPANS": "2"}) +@pytest.mark.subprocess( + env={"DD_TRACE_PARTIAL_FLUSH_ENABLED": "true", "DD_TRACE_PARTIAL_FLUSH_MIN_SPANS": "2"}, check_logs=False +) def test_trace_top_level_span_processor_partial_flushing(): """Parent span and child span have the same service name""" from ddtrace import tracer diff --git a/tests/tracer/test_span.py b/tests/tracer/test_span.py index ee5245c8600..56f79105a20 100644 --- a/tests/tracer/test_span.py +++ b/tests/tracer/test_span.py @@ -859,13 +859,6 @@ def test_on_finish_multi_callback(): m2.assert_called_once_with(s) -@pytest.mark.parametrize("arg", ["span_id", "trace_id", "parent_id"]) -def test_span_preconditions(arg): - Span("test", **{arg: None}) - with pytest.raises(TypeError): - Span("test", **{arg: "foo"}) - - def test_span_pprint(): root = Span("test.span", service="s", resource="r", span_type=SpanTypes.WEB, context=Context(trace_id=1, span_id=2)) root.set_tag("t", "v") diff --git a/tests/tracer/test_tracer.py b/tests/tracer/test_tracer.py index bd5b7909f87..e7237942f73 100644 --- a/tests/tracer/test_tracer.py +++ b/tests/tracer/test_tracer.py @@ -712,7 +712,7 @@ def test_tracer_shutdown_timeout(): with t.trace("something"): pass - t.shutdown(timeout=2) + t.shutdown(timeout=2.0) mock_stop.assert_called_once_with(2) @@ -1257,9 +1257,11 @@ def process_trace(self, trace): def test_early_exit(tracer, test_spans): s1 = tracer.trace("1") s2 = tracer.trace("2") - with mock.patch.object(logging.Logger, "debug") as mock_logger: - s1.finish() - s2.finish() + with mock.patch.object(logging.Logger, "isEnabledFor") as mock_enabled_for: + mock_enabled_for.return_value = True + with mock.patch.object(logging.Logger, "debug") as mock_logger: + s1.finish() + s2.finish() calls = [ mock.call("span %r closing after its parent %r, this is an error when not using async", s2, s1), diff --git a/tests/tracer/test_writer.py b/tests/tracer/test_writer.py index 75de6dba81f..51ca5cc7047 100644 --- a/tests/tracer/test_writer.py +++ b/tests/tracer/test_writer.py @@ -1126,7 +1126,7 @@ def test_writer_reuse_connections_false(writer_class): assert writer._conn is conn -@pytest.mark.subprocess(env=dict(DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED="true")) +@pytest.mark.subprocess(env=dict(DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED="true"), check_logs=False) def test_trace_with_128bit_trace_ids(): """Ensure 128bit trace ids are correctly encoded""" from ddtrace.internal.constants import HIGHER_ORDER_TRACE_ID_BITS