diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index e36fcfa3320..a0184960616 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -1,18 +1,17 @@ # -*- coding: utf-8 -*- -import http.client as httplib # noqa: E402 +import http.client as httplib import itertools import os import sys import time import traceback -from typing import TYPE_CHECKING # noqa:F401 -from typing import Any # noqa:F401 -from typing import Dict # noqa:F401 -from typing import List # noqa:F401 -from typing import Optional # noqa:F401 -from typing import Set # noqa:F401 -from typing import Tuple # noqa:F401 -from typing import Union # noqa:F401 +from typing import Any +from typing import Dict +from typing import List +from typing import Optional +from typing import Set +from typing import Tuple +from typing import Union import urllib.parse as parse from ddtrace.internal.endpoints import endpoint_collection @@ -31,10 +30,8 @@ from ..utils.version import version as tracer_version from . import modules from .constants import TELEMETRY_APM_PRODUCT -from .constants import TELEMETRY_LOG_LEVEL # noqa:F401 +from .constants import TELEMETRY_LOG_LEVEL from .constants import TELEMETRY_NAMESPACE -from .constants import TELEMETRY_TYPE_DISTRIBUTION -from .constants import TELEMETRY_TYPE_GENERATE_METRICS from .constants import TELEMETRY_TYPE_LOGS from .data import get_application from .data import get_host_info @@ -66,8 +63,7 @@ class _TelemetryClient: AGENT_ENDPOINT = "telemetry/proxy/api/v2/apmtelemetry" AGENTLESS_ENDPOINT_V2 = "api/v2/apmtelemetry" - def __init__(self, agentless): - # type: (bool) -> None + def __init__(self, agentless: bool) -> None: self._telemetry_url = self.get_host(config.SITE, agentless) self._endpoint = self.get_endpoint(agentless) self._encoder = JSONEncoderV2() @@ -86,7 +82,7 @@ def __init__(self, agentless): def url(self): return parse.urljoin(self._telemetry_url, self._endpoint) - def send_event(self, request: Dict) -> Optional[httplib.HTTPResponse]: + def send_event(self, request: Dict, payload_type: str) -> Optional[httplib.HTTPResponse]: """Sends a telemetry request to the trace agent""" resp = None conn = None @@ -99,24 +95,33 @@ def send_event(self, request: Dict) -> Optional[httplib.HTTPResponse]: resp = conn.getresponse() if resp.status < 300: log.debug( - "Instrumentation Telemetry sent %d bytes in %.5fs to %s. Event: %s. Response: %s", + "Instrumentation Telemetry sent %d bytes in %.5fs to %s. Event(s): %s. Response: %s", len(rb_json), sw.elapsed(), self.url, - request["request_type"], + payload_type, resp.status, ) else: - log.debug("Failed to send Instrumentation Telemetry to %s. response: %s", self.url, resp.status) - except Exception as e: - log.debug("Failed to send Instrumentation Telemetry to %s. Error: %s", self.url, str(e)) + log.debug( + "Failed to send Instrumentation Telemetry to %s. Event(s): %s. Response: %s", + self.url, + payload_type, + resp.status, + ) + except Exception: + log.debug( + "Failed to send Instrumentation Telemetry to %s. Event(s): %s", + self.url, + payload_type, + exc_info=True, + ) finally: if conn is not None: conn.close() return resp - def get_headers(self, request): - # type: (Dict) -> Dict + def get_headers(self, request: Dict) -> Dict: """Get all telemetry api v2 request headers""" headers = self._headers.copy() headers["DD-Telemetry-Debug-Enabled"] = request["debug"] @@ -151,8 +156,7 @@ class TelemetryWriter(PeriodicService): _ORIGINAL_EXCEPTHOOK = staticmethod(sys.excepthook) CWD = os.getcwd() - def __init__(self, is_periodic=True, agentless=None): - # type: (bool, Optional[bool]) -> None + def __init__(self, is_periodic: bool = True, agentless: Optional[bool] = None) -> None: super(TelemetryWriter, self).__init__(interval=min(config.HEARTBEAT_INTERVAL, 10)) # Decouples the aggregation and sending of the telemetry events @@ -161,16 +165,16 @@ def __init__(self, is_periodic=True, agentless=None): self._periodic_threshold = int(config.HEARTBEAT_INTERVAL // self.interval) - 1 self._periodic_count = 0 self._is_periodic = is_periodic - self._integrations_queue = dict() # type: Dict[str, Dict] + self._integrations_queue: Dict[str, Dict] = dict() # Currently telemetry only supports reporting a single error. # If we'd like to report multiple errors in the future # we could hack it in by xor-ing error codes and concatenating strings - self._error = (0, "") # type: Tuple[int, str] + self._error: Tuple[int, str] = (0, "") self._namespace = MetricNamespace() - self._logs = set() # type: Set[Dict[str, Any]] - self._forked = False # type: bool - self._events_queue = [] # type: List[Dict] - self._configuration_queue = [] # type: List[Dict] + self._logs: Set[Dict[str, Any]] = set() + self._forked: bool = False + self._events_queue: List[Dict] = [] + self._configuration_queue: List[Dict] = [] self._imported_dependencies: Dict[str, str] = dict() self._modules_already_imported: Set[str] = set() self._product_enablement = {product.value: False for product in TELEMETRY_APM_PRODUCT} @@ -207,13 +211,11 @@ def __init__(self, is_periodic=True, agentless=None): # This will occur when the agent writer starts. self.enable() # Force app started for unit tests - if config.FORCE_START: - self._app_started() - # Send logged error to telemetry + if config.FORCE_START and (app_started := self._app_started()): + self._events_queue.append(app_started) get_logger("ddtrace").addHandler(DDTelemetryErrorHandler(self)) - def enable(self): - # type: () -> bool + def enable(self) -> bool: """ Enable the instrumentation telemetry collection service. If the service has already been activated before, this method does nothing. Use ``disable`` to turn off the telemetry collection service. @@ -232,8 +234,7 @@ def enable(self): self.status = ServiceStatus.RUNNING return True - def disable(self): - # type: () -> None + def disable(self) -> None: """ Disable the telemetry collection service and drop the existing integrations and events Once disabled, telemetry collection can not be re-enabled. @@ -241,21 +242,17 @@ def disable(self): self._enabled = False self.reset_queues() - def enable_agentless_client(self, enabled=True): - # type: (bool) -> None - + def enable_agentless_client(self, enabled: bool = True) -> None: if self._client._agentless == enabled: return self._client = _TelemetryClient(enabled) - def _is_running(self): - # type: () -> bool + def _is_running(self) -> bool: """Returns True when the telemetry writer worker thread is running""" return self._is_periodic and self._worker is not None and self.status is ServiceStatus.RUNNING - def add_event(self, payload, payload_type): - # type: (Union[Dict[str, Any], List[Any]], str) -> None + def add_event(self, payload: Union[Dict[str, Any], List[Any]], payload_type: str) -> None: """ Adds a Telemetry event to the TelemetryWriter event buffer @@ -264,27 +261,34 @@ def add_event(self, payload, payload_type): Payload types accepted by telemetry/proxy v2: app-started, app-closing, app-integrations-change """ if self.enable(): - event = { - "tracer_time": int(time.time()), - "runtime_id": get_runtime_id(), - "api_version": "v2", - "seq_id": next(self._sequence_payloads), - "debug": self._debug, - "application": get_application(config.SERVICE, config.VERSION, config.ENV), - "host": get_host_info(), - "payload": payload, - "request_type": payload_type, - } - self._events_queue.append(event) + with self._service_lock: + self._events_queue.append({"payload": payload, "request_type": payload_type}) - def add_integration(self, integration_name, patched, auto_patched=None, error_msg=None, version=""): - # type: (str, bool, Optional[bool], Optional[str], Optional[str]) -> None + def add_events(self, events: List[Dict[str, Any]]) -> None: + """ + Adds a list of Telemetry events to the TelemetryWriter event buffer + """ + if self.enable(): + with self._service_lock: + self._events_queue.extend(events) + + def add_integration( + self, + integration_name: str, + patched: bool, + auto_patched: Optional[bool] = None, + error_msg: Optional[str] = None, + version: str = "", + ) -> None: """ Creates and queues the names and settings of a patched module :param str integration_name: name of patched module :param bool auto_enabled: True if module is enabled in _monkey.PATCH_MODULES """ + if not self.enable(): + return + # Integrations can be patched before the telemetry writer is enabled. with self._service_lock: if integration_name not in self._integrations_queue: @@ -300,8 +304,7 @@ def add_integration(self, integration_name, patched, auto_patched=None, error_ms self._integrations_queue[integration_name]["compatible"] = error_msg == "" self._integrations_queue[integration_name]["error"] = error_msg - def add_error(self, code, msg, filename, line_number): - # type: (int, str, Optional[str], Optional[int]) -> None + def add_error(self, code: int, msg: str, filename: Optional[str], line_number: Optional[int]) -> None: """Add an error to be submitted with an event. Note that this overwrites any previously set errors. """ @@ -309,12 +312,11 @@ def add_error(self, code, msg, filename, line_number): msg = "%s:%s: %s" % (filename, line_number, msg) self._error = (code, msg) - def _app_started(self, register_app_shutdown=True): - # type: (bool) -> None + def _app_started(self, register_app_shutdown: bool = True) -> Optional[Dict[str, Any]]: """Sent when TelemetryWriter is enabled or forks""" if self._forked or self.started: # app-started events should only be sent by the main process - return + return None # List of configurations to be collected self.started = True @@ -334,7 +336,7 @@ def _app_started(self, register_app_shutdown=True): "message": self._error[1], }, "products": products, - } # type: Dict[str, Union[Dict[str, Any], List[Any]]] + } # Add time to value telemetry metrics for single step instrumentation if config.INSTALL_ID or config.INSTALL_TYPE or config.INSTALL_TIME: payload["install_signature"] = { @@ -345,10 +347,9 @@ def _app_started(self, register_app_shutdown=True): # Reset the error after it has been reported. self._error = (0, "") - self.add_event(payload, "app-started") + return {"payload": payload, "request_type": "app-started"} - def _app_heartbeat_event(self): - # type: () -> None + def _app_heartbeat_event(self) -> Dict[str, Any]: if config.DEPENDENCY_COLLECTION and time.monotonic() - self._extended_time > self._extended_heartbeat_interval: self._extended_time += self._extended_heartbeat_interval self._app_dependencies_loaded_event() @@ -357,70 +358,67 @@ def _app_heartbeat_event(self): {"name": name, "version": version} for name, version in self._imported_dependencies.items() ] } - self.add_event(payload, "app-extended-heartbeat") + request_type = "app-extended-heartbeat" else: - self.add_event({}, "app-heartbeat") + payload = {} + request_type = "app-heartbeat" + return {"payload": payload, "request_type": request_type} - def _app_closing_event(self): - # type: () -> None + def _app_closing_event(self) -> Optional[Dict[str, Any]]: """Adds a Telemetry event which notifies the agent that an application instance has terminated""" if self._forked: # app-closing event should only be sent by the main process - return - payload = {} # type: Dict - self.add_event(payload, "app-closing") + return None + return {"payload": {}, "request_type": "app-closing"} - def _app_integrations_changed_event(self, integrations): - # type: (List[Dict]) -> None + def _app_integrations_changed_event(self, integrations: List[Dict]) -> Dict: """Adds a Telemetry event which sends a list of configured integrations to the agent""" - payload = { - "integrations": integrations, + return { + "payload": { + "integrations": integrations, + }, + "request_type": "app-integrations-change", } - self.add_event(payload, "app-integrations-change") - def _flush_integrations_queue(self): - # type: () -> List[Dict] + def _flush_integrations_queue(self) -> List[Dict]: """Flushes and returns a list of all queued integrations""" with self._service_lock: integrations = list(self._integrations_queue.values()) self._integrations_queue = dict() return integrations - def _flush_configuration_queue(self): - # type: () -> List[Dict] + def _flush_configuration_queue(self) -> List[Dict]: """Flushes and returns a list of all queued configurations""" with self._service_lock: configurations = self._configuration_queue self._configuration_queue = [] return configurations - def _app_client_configuration_changed_event(self, configurations): - # type: (List[Dict]) -> None + def _app_client_configuration_changed_event(self, configurations: List[Dict]) -> Dict[str, Any]: """Adds a Telemetry event which sends list of modified configurations to the agent""" - payload = { - "configuration": configurations, + return { + "payload": { + "configuration": configurations, + }, + "request_type": "app-client-configuration-change", } - self.add_event(payload, "app-client-configuration-change") - def _app_dependencies_loaded_event(self): + def _app_dependencies_loaded_event(self) -> Optional[Dict[str, Any]]: """Adds events to report imports done since the last periodic run""" - if not config.DEPENDENCY_COLLECTION or not self._enabled: - return + return None with self._service_lock: newly_imported_deps = modules.get_newly_imported_modules(self._modules_already_imported) if not newly_imported_deps: - return + return None with self._service_lock: - packages = update_imported_dependencies(self._imported_dependencies, newly_imported_deps) - - if packages: - payload = {"dependencies": packages} - self.add_event(payload, "app-dependencies-loaded") + if packages := update_imported_dependencies(self._imported_dependencies, newly_imported_deps): + return {"payload": {"dependencies": packages}, "request_type": "app-dependencies-loaded"} + return None - def _add_endpoints_event(self): + def _flush_app_endpoints(self): """Adds a Telemetry event which sends the list of HTTP endpoints found at startup to the agent""" import ddtrace.settings.asm as asm_config_module @@ -432,27 +430,26 @@ def _add_endpoints_event(self): with self._service_lock: payload = endpoint_collection.flush(asm_config_module.config._api_security_endpoint_collection_limit) + return {"payload": payload, "request_type": "app-endpoints"} - self.add_event(payload, "app-endpoints") - - def _app_product_change(self): - # type: () -> None + def _app_product_change(self) -> Optional[Dict[str, Any]]: """Adds a Telemetry event which reports the enablement of an APM product""" if not self._send_product_change_updates: - return + return None - payload = { - "products": { - product: {"version": tracer_version, "enabled": status} - for product, status in self._product_enablement.items() - } - } - self.add_event(payload, "app-product-change") self._send_product_change_updates = False + return { + "payload": { + "products": { + product: {"version": tracer_version, "enabled": status} + for product, status in self._product_enablement.items() + } + }, + "request_type": "app-product-change", + } - def product_activated(self, product, enabled): - # type: (str, bool) -> None + def product_activated(self, product: str, enabled: bool) -> None: """Updates the product enablement dict""" if self._product_enablement.get(product, False) is enabled: @@ -464,8 +461,13 @@ def product_activated(self, product, enabled): if self.started: self._send_product_change_updates = True - def add_configuration(self, configuration_name, configuration_value, origin="unknown", config_id=None): - # type: (str, Any, str, Optional[str]) -> None + def add_configuration( + self, + configuration_name: str, + configuration_value: Any, + origin: str = "unknown", + config_id: Optional[str] = None, + ) -> None: """Creates and queues the name, origin, value of a configuration""" if isinstance(configuration_value, dict): configuration_value = ",".join(":".join((k, str(v))) for k, v in configuration_value.items()) @@ -502,7 +504,7 @@ def add_configurations(self, configuration_list: List[Tuple[str, str, str]]): def add_log(self, level, message, stack_trace="", tags=None): """ - Queues log. This event is meant to send library logs to Datadog’s backend through the Telemetry intake. + Queues log. This event is meant to send library logs to Datadog's backend through the Telemetry intake. This will make support cycles easier and ensure we know about potentially silent issues in libraries. """ if tags is None: @@ -620,31 +622,32 @@ def add_distribution_metric( tags, ) - def _flush_log_metrics(self): - # type () -> Set[Metric] + def _flush_log_metrics(self) -> Set[Dict[str, Any]]: with self._service_lock: log_metrics = self._logs self._logs = set() return log_metrics - def _generate_metrics_event(self, namespace_metrics) -> None: + def _generate_metrics_event( + self, namespace_metrics: Dict[str, Dict[str, List[Dict[str, Any]]]] + ) -> Dict[str, Any]: for payload_type, namespaces in namespace_metrics.items(): for namespace, metrics in namespaces.items(): if metrics: - payload = { - "namespace": namespace, - "series": metrics, - } - log.debug("%s request payload, namespace %s", payload_type, namespace) - if payload_type == TELEMETRY_TYPE_DISTRIBUTION: - self.add_event(payload, TELEMETRY_TYPE_DISTRIBUTION) - elif payload_type == TELEMETRY_TYPE_GENERATE_METRICS: - self.add_event(payload, TELEMETRY_TYPE_GENERATE_METRICS) - - def _generate_logs_event(self, logs): - # type: (Set[Dict[str, str]]) -> None + metric_payloads.append( + { + "payload": { + "namespace": namespace, + "series": metrics, + }, + "request_type": payload_type, + } + ) + return metric_payloads + + def _generate_logs_event(self, logs: Set[Dict[str, str]]) -> Dict[str, Any]: log.debug("%s request payload", TELEMETRY_TYPE_LOGS) - self.add_event({"logs": list(logs)}, TELEMETRY_TYPE_LOGS) + return {"payload": {"logs": list(logs)}, "request_type": TELEMETRY_TYPE_LOGS} def _dispatch(self): # moved core here to avoid circular import @@ -653,55 +656,104 @@ def _dispatch(self): core.dispatch("telemetry.periodic") def periodic(self, force_flush=False, shutting_down=False): - # ensure app_started is called at least once in case traces weren't flushed - self._app_started() - self._app_product_change() + """Process and send telemetry events in batches. + + This method handles the periodic collection and sending of telemetry data with two main timing intervals: + 1. Metrics collection interval (10 seconds by default): Collects metrics and logs + 2. Heartbeat interval (60 seconds by default): Sends all collected data to the telemetry endpoint + + The method follows this flow: + 1. Collects metrics and logs that have accumulated since last collection + 2. If not at heartbeat interval and not force_flush: + - Queues the metrics and logs for future sending + - Returns early + 3. At heartbeat interval or force_flush: + - Collects app status (started, product changes) + - Collects integration changes + - Collects configuration changes + - Collects dependency changes + - Collects stored events (ex: metrics and logs) + - Sends everything as a single batch + + Args: + force_flush: If True, bypasses the heartbeat interval check and sends immediately + shutting_down: If True, includes app-closing event in the batch + + Note: + - Metrics are collected every 10 seconds to ensure accurate time-based data + - All data is sent in a single batch every 60 seconds to minimize network overhead + - A heartbeat event is always included to keep RC connections alive + """ self._dispatch() + # Collect metrics and logs that have accumulated since last batch + events = [] + if namespace_metrics := self._namespace.flush(float(self.interval)): + if metrics_events := self._generate_metrics_events(namespace_metrics): + events.extend(metrics_events) - namespace_metrics = self._namespace.flush(float(self.interval)) - if namespace_metrics: - self._generate_metrics_event(namespace_metrics) - - logs_metrics = self._flush_log_metrics() - if logs_metrics: - self._generate_logs_event(logs_metrics) + if logs_metrics := self._flush_log_metrics(): + events.append(self._generate_logs_event(logs_metrics)) - # Telemetry metrics and logs should be aggregated into payloads every time periodic is called. - # This ensures metrics and logs are submitted in 10 second time buckets. + # Queue metrics if not at heartbeat interval if self._is_periodic and force_flush is False: if self._periodic_count < self._periodic_threshold: self._periodic_count += 1 + if events: + self.add_events(events) return self._periodic_count = 0 - integrations = self._flush_integrations_queue() - if integrations: - self._app_integrations_changed_event(integrations) + # At heartbeat interval, collect and send all telemetry data + if app_started := self._app_started(): + # app-started should be the first event in the batch + events = [app_started] + events - configurations = self._flush_configuration_queue() - if configurations: - self._app_client_configuration_changed_event(configurations) + if app_product_change := self._app_product_change(): + events.append(app_product_change) - self._app_dependencies_loaded_event() - self._add_endpoints_event() + if integrations := self._flush_integrations_queue(): + events.append(self._app_integrations_changed_event(integrations)) - if shutting_down: - self._app_closing_event() + if endpoints_payload := self._flush_app_endpoints(): + events.append(endpoints_payload) - # Send a heartbeat event to the agent, this is required to keep RC connections alive - self._app_heartbeat_event() + if configurations := self._flush_configuration_queue(): + events.append(self._app_client_configuration_changed_event(configurations)) - telemetry_events = self._flush_events_queue() - for telemetry_event in telemetry_events: - self._client.send_event(telemetry_event) + if app_dependencies_loaded := self._app_dependencies_loaded_event(): + events.append(app_dependencies_loaded) + + if shutting_down and (app_closing := self._app_closing_event()): + events.append(app_closing) + + # Always include a heartbeat to keep RC connections alive + events.append(self._app_heartbeat_event()) + + # Get any queued events and combine with current batch + if queued_events := self._flush_events_queue(): + events.extend(queued_events) + + payload_types = ", ".join([event["request_type"] for event in events]) + # Prepare and send the final batch + batch_event = { + "tracer_time": int(time.time()), + "runtime_id": get_runtime_id(), + "api_version": "v2", + "seq_id": next(self._sequence_payloads), + "debug": self._debug, + "application": get_application(config.SERVICE, config.VERSION, config.ENV), + "host": get_host_info(), + "payload": events, + "request_type": "message-batch", + } + self._client.send_event(batch_event, payload_types) def app_shutdown(self): if self.started: self.periodic(force_flush=True, shutting_down=True) self.disable() - def reset_queues(self): - # type: () -> None + def reset_queues(self) -> None: self._events_queue = [] self._integrations_queue = dict() self._namespace.flush() @@ -709,16 +761,14 @@ def reset_queues(self): self._imported_dependencies = {} self._configuration_queue = [] - def _flush_events_queue(self): - # type: () -> List[Dict] + def _flush_events_queue(self) -> List[Dict]: """Flushes and returns a list of all telemtery event""" with self._service_lock: events = self._events_queue self._events_queue = [] return events - def _fork_writer(self): - # type: () -> None + def _fork_writer(self) -> None: self._forked = True # Avoid sending duplicate events. # Queued events should be sent in the main process. @@ -736,8 +786,7 @@ def _restart_sequence(self): self._sequence_payloads = itertools.count(1) self._sequence_configurations = itertools.count(1) - def _stop_service(self, join=True, *args, **kwargs): - # type: (...) -> None + def _stop_service(self, join: bool = True, *args, **kwargs) -> None: super(TelemetryWriter, self)._stop_service(*args, **kwargs) if join: self.join(timeout=2) @@ -776,8 +825,8 @@ def _telemetry_excepthook(self, tp, value, root_traceback): error_msg = "{}:{} {}".format(filename, lineno, str(value)) self.add_integration(integration_name, True, error_msg=error_msg) - if self._enabled and not self.started: - self._app_started(False) + if app_started := self._app_started(False): + self._events_queue.append(app_started) self.app_shutdown() diff --git a/tests/appsec/architectures/mini.py b/tests/appsec/architectures/mini.py index f0981167a55..687ba77ee3b 100644 --- a/tests/appsec/architectures/mini.py +++ b/tests/appsec/architectures/mini.py @@ -19,25 +19,27 @@ _TELEMETRY_DEPENDENCIES = [] # intercept telemetry events -from ddtrace.internal.telemetry.writer import TelemetryWriter # noqa: E402 +from ddtrace.internal.telemetry.writer import _TelemetryClient # noqa: E402 -_flush_events = TelemetryWriter._flush_events_queue +_send_event = _TelemetryClient.send_event -def _flush_events_wrapper(self): +def _send_event_wrapper(self, event, payload_type): global _TELEMETRY_DEPENDENCIES - res = _flush_events(self) - if res: - dependencies = [v.get("payload", {}).get("dependencies", {}) for v in res] - dependencies = [d for d in dependencies if d] + print(f"Captured telemetry event: {event}", flush=True) + if event: + if event.get("request_type") == "message-batch": + dependencies = [v.get("payload", {}).get("dependencies", []) for v in event.get("payload", [])] + else: + dependencies = event.get("payload", {}).get("dependencies", []) for lst in dependencies: _TELEMETRY_DEPENDENCIES.extend(lst) - print(f"flushed events {dependencies}", flush=True) - return res + print(f"Captured dependencies: {dependencies}", flush=True) + return _send_event(self, event, payload_type) -TelemetryWriter._flush_events_queue = _flush_events_wrapper +_TelemetryClient.send_event = _send_event_wrapper @app.route("/") diff --git a/tests/conftest.py b/tests/conftest.py index 48ef8521ddb..9742fc2ef81 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,7 @@ import ast import base64 import contextlib +import copy import functools import http.client as httplib import importlib @@ -19,7 +20,9 @@ from tempfile import gettempdir import time from typing import Any # noqa:F401 +from typing import Dict from typing import Generator # noqa:F401 +from typing import List from typing import Tuple # noqa:F401 from unittest import mock from urllib import parse @@ -32,7 +35,6 @@ from ddtrace.internal.core import crashtracking from ddtrace.internal.remoteconfig.client import RemoteConfigClient from ddtrace.internal.remoteconfig.worker import remoteconfig_poller -from ddtrace.internal.runtime import get_runtime_id from ddtrace.internal.service import ServiceStatus from ddtrace.internal.service import ServiceStatusError from ddtrace.internal.telemetry import TelemetryWriter @@ -580,7 +582,7 @@ def clear(self): pytest.fail("Failed to clear session: %s" % self.token) return True - def get_requests(self, request_type=None, filter_heartbeats=True): + def get_requests(self, filter_heartbeats=True): """Get a list of the requests sent to the test agent Results are in reverse order by ``seq_id`` @@ -595,25 +597,42 @@ def get_requests(self, request_type=None, filter_heartbeats=True): # /test/session/requests captures non telemetry payloads, ignore these requests continue req["body"] = json.loads(base64.b64decode(req["body"])) - # filter heartbeat requests to reduce noise + if req["body"]["request_type"] == "app-heartbeat" and filter_heartbeats: continue - if request_type is None or req["body"]["request_type"] == request_type: - requests.append(req) + requests.append(req) return sorted(requests, key=lambda r: r["body"]["seq_id"], reverse=True) - def get_events(self, event_type=None, filter_heartbeats=True, subprocess=False): + def get_events(self, event_type=None, filter_heartbeats=True): """Get a list of the event payloads sent to the test agent Results are in reverse order by ``seq_id`` """ - requests = self.get_requests(event_type, filter_heartbeats) - if subprocess: - # Use get_runtime_id to filter telemetry events generated in the current process - runtime_id = get_runtime_id() - requests = [req for req in requests if req["body"]["runtime_id"] != runtime_id] - return [req["body"] for req in requests] + requests = self.get_requests() + events = [] + for req in requests: + for req_body in self._get_request_bodies(req): + if filter_heartbeats and req_body["request_type"] == "app-heartbeat": + # filter heartbeat events to reduce noise + continue + if event_type is None or req_body["request_type"] == event_type: + events.append(req_body) + return events + + def _get_request_bodies(self, req: Dict[str, Any]) -> List[Dict[str, Any]]: + if req["body"]["request_type"] == "message-batch": + payloads = req["body"]["payload"] + else: + payloads = [{"payload": req["body"]["payload"], "request_type": req["body"]["request_type"]}] + + requests = [] + for payload in payloads: + req_body = copy.deepcopy(req["body"]) + req_body["request_type"] = payload["request_type"] + req_body["payload"] = payload["payload"] + requests.append(req_body) + return requests def get_metrics(self, name=None): metrics = [] diff --git a/tests/integration/test_settings.py b/tests/integration/test_settings.py index c2f1291b419..8d5aa263d9e 100644 --- a/tests/integration/test_settings.py +++ b/tests/integration/test_settings.py @@ -59,6 +59,7 @@ def test_setting_origin_code(test_agent_session, run_python_code_in_subprocess): "DD_TAGS": "team:apm,component:web", "DD_TRACE_ENABLED": "true", "DD_CIVISIBILITY_AGENTLESS_ENABLED": "false", + "_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED": "true", } ) out, err, status, _ = run_python_code_in_subprocess( @@ -69,10 +70,6 @@ def test_setting_origin_code(test_agent_session, run_python_code_in_subprocess): config._trace_http_header_tags = {"header": "value"} config.tags = {"header": "value"} config._tracing_enabled = False - -from ddtrace.internal.telemetry import telemetry_writer -# simulate app start event, this occurs when the first span is sent to the datadog agent -telemetry_writer._app_started() """, env=env, ) diff --git a/tests/telemetry/test_telemetry.py b/tests/telemetry/test_telemetry.py index 623a23c47d1..dab5b2ee06f 100644 --- a/tests/telemetry/test_telemetry.py +++ b/tests/telemetry/test_telemetry.py @@ -55,12 +55,12 @@ def test_enable_fork(test_agent_session, run_python_code_in_subprocess): runtime_id = stdout.strip().decode("utf-8") # Validate that one app-closing event was sent and it was queued in the parent process - app_closing = test_agent_session.get_events("app-closing", subprocess=True) + app_closing = test_agent_session.get_events("app-closing") assert len(app_closing) == 1 assert app_closing[0]["runtime_id"] == runtime_id # Validate that one app-started event was sent and it was queued in the parent process - app_started = test_agent_session.get_events("app-started", subprocess=True) + app_started = test_agent_session.get_events("app-started") assert len(app_started) == 1 assert app_started[0]["runtime_id"] == runtime_id @@ -93,7 +93,7 @@ def test_enable_fork_heartbeat(test_agent_session, run_python_code_in_subprocess assert stderr == b"", stderr # Allow test agent session to capture all heartbeat events - app_heartbeats = test_agent_session.get_events("app-heartbeat", filter_heartbeats=False, subprocess=True) + app_heartbeats = test_agent_session.get_events("app-heartbeat", filter_heartbeats=False) assert len(app_heartbeats) > 1 @@ -161,13 +161,13 @@ def process_trace(self, trace): # force app_started event (instead of waiting for 10 seconds) from ddtrace.internal.telemetry import telemetry_writer -telemetry_writer._app_started() +telemetry_writer.periodic(force_flush=True) """ _, stderr, status, _ = run_python_code_in_subprocess(code) assert status == 0, stderr assert b"Exception raised in trace filter" in stderr - events = test_agent_session.get_events("app-started", subprocess=True) + events = test_agent_session.get_events("app-started") assert len(events) == 1 @@ -209,7 +209,7 @@ def pre_ddtrace_exc_hook(exctype, value, traceback): # Regression test for invalid number of arguments in wrapped exception hook assert b"3 positional arguments but 4 were given" not in stderr - app_starteds = test_agent_session.get_events("app-started", subprocess=True) + app_starteds = test_agent_session.get_events("app-started") assert len(app_starteds) == 1 # app-started captures unhandled exceptions raised in application code assert app_starteds[0]["payload"]["error"]["code"] == 1 @@ -239,7 +239,7 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro assert status == 0, stderr assert b"failed to enable ddtrace support for sqlite3" in stderr - integrations_events = test_agent_session.get_events("app-integrations-change", subprocess=True) + integrations_events = test_agent_session.get_events("app-integrations-change") assert len(integrations_events) == 1 assert ( integrations_events[0]["payload"]["integrations"][0]["error"] == "module 'sqlite3' has no attribute 'connect'" @@ -269,13 +269,13 @@ def test_unhandled_integration_error(test_agent_session, ddtrace_run_python_code assert b"not enough values to unpack (expected 2, got 0)" in stderr, stderr - app_started_event = test_agent_session.get_events("app-started", subprocess=True) + app_started_event = test_agent_session.get_events("app-started") assert len(app_started_event) == 1 assert app_started_event[0]["payload"]["error"]["code"] == 1 assert "ddtrace/contrib/internal/flask/patch.py" in app_started_event[0]["payload"]["error"]["message"] assert "not enough values to unpack (expected 2, got 0)" in app_started_event[0]["payload"]["error"]["message"] - integration_events = test_agent_session.get_events("app-integrations-change", subprocess=True) + integration_events = test_agent_session.get_events("app-integrations-change") integrations = integration_events[0]["payload"]["integrations"] (flask_integration,) = [integration for integration in integrations if integration["name"] == "flask"] @@ -308,7 +308,7 @@ def test_app_started_with_install_metrics(test_agent_session, run_python_code_in _, stderr, status, _ = run_python_code_in_subprocess("import ddtrace", env=env) assert status == 0, stderr - app_started_event = test_agent_session.get_events("app-started", subprocess=True) + app_started_event = test_agent_session.get_events("app-started") assert len(app_started_event) == 1 assert app_started_event[0]["payload"]["install_signature"] == { "install_id": "68e75c48-57ca-4a12-adfc-575c4b05fcbe", @@ -331,7 +331,7 @@ def test_instrumentation_telemetry_disabled(test_agent_session, run_python_code_ """ _, stderr, status, _ = run_python_code_in_subprocess(code, env=env) - events = test_agent_session.get_events(subprocess=True) + events = test_agent_session.get_events() assert len(events) == 0 assert status == 0, stderr diff --git a/tests/telemetry/test_writer.py b/tests/telemetry/test_writer.py index 74458d3edbe..455425aa772 100644 --- a/tests/telemetry/test_writer.py +++ b/tests/telemetry/test_writer.py @@ -4,6 +4,7 @@ import time from typing import Any # noqa:F401 from typing import Dict # noqa:F401 +from typing import Optional # noqa:F401 import httpretty import mock @@ -18,7 +19,6 @@ from ddtrace.internal.telemetry.writer import TelemetryWriter from ddtrace.internal.telemetry.writer import get_runtime_id from ddtrace.internal.utils.version import _pep440_to_semver -from ddtrace.settings._config import DD_TRACE_OBFUSCATION_QUERY_STRING_REGEXP_DEFAULT from ddtrace.settings._telemetry import config as telemetry_config from tests.conftest import DEFAULT_DDTRACE_SUBPROCESS_TEST_SERVICE_NAME from tests.utils import call_program @@ -34,15 +34,18 @@ def test_add_event(telemetry_writer, test_agent_session, mock_time): # send request to the agent telemetry_writer.periodic(force_flush=True) - requests = test_agent_session.get_requests(payload_type) + requests = test_agent_session.get_requests() assert len(requests) == 1 assert requests[0]["headers"]["Content-Type"] == "application/json" assert requests[0]["headers"]["DD-Client-Library-Language"] == "python" assert requests[0]["headers"]["DD-Client-Library-Version"] == _pep440_to_semver() - assert requests[0]["headers"]["DD-Telemetry-Request-Type"] == payload_type + assert requests[0]["headers"]["DD-Telemetry-Request-Type"] == "message-batch" assert requests[0]["headers"]["DD-Telemetry-API-Version"] == "v2" assert requests[0]["headers"]["DD-Telemetry-Debug-Enabled"] == "False" - assert requests[0]["body"] == _get_request_body(payload, payload_type) + + events = test_agent_session.get_events(payload_type) + assert len(events) == 1 + validate_request_body(events[0], payload, payload_type) def test_add_event_disabled_writer(telemetry_writer, test_agent_session): @@ -54,7 +57,7 @@ def test_add_event_disabled_writer(telemetry_writer, test_agent_session): # ensure no request were sent telemetry_writer.periodic(force_flush=True) - assert len(test_agent_session.get_requests(payload_type)) == 1 + assert len(test_agent_session.get_events(payload_type)) == 1 @pytest.mark.parametrize( @@ -88,129 +91,22 @@ def test_app_started_event(telemetry_writer, test_agent_session, mock_time): """asserts that app_started() queues a valid telemetry request which is then sent by periodic()""" with override_global_config(dict(_telemetry_dependency_collection=False)): # queue an app started event - telemetry_writer._app_started() + event = telemetry_writer._app_started() + assert event is not None, "app_started() did not return an event" + telemetry_writer.add_event(event["payload"], "app-started") # force a flush telemetry_writer.periodic(force_flush=True) - requests = test_agent_session.get_requests("app-started") + requests = test_agent_session.get_requests() assert len(requests) == 1 - assert requests[0]["headers"]["DD-Telemetry-Request-Type"] == "app-started" - - payload = { - "configuration": sorted( - [ - {"name": "DD_AGENT_HOST", "origin": "unknown", "value": None}, - {"name": "DD_AGENT_PORT", "origin": "unknown", "value": None}, - {"name": "DD_DOGSTATSD_PORT", "origin": "unknown", "value": None}, - {"name": "DD_DOGSTATSD_URL", "origin": "unknown", "value": None}, - {"name": "DD_DYNAMIC_INSTRUMENTATION_ENABLED", "origin": "unknown", "value": False}, - {"name": "DD_EXCEPTION_REPLAY_ENABLED", "origin": "unknown", "value": False}, - {"name": "DD_FASTAPI_ASYNC_BODY_TIMEOUT_SECONDS", "origin": "default", "value": 0.1}, - {"name": "DD_INSTRUMENTATION_TELEMETRY_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_PROFILING_STACK_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_PROFILING_MEMORY_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_PROFILING_HEAP_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_PROFILING_LOCK_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_PROFILING_CAPTURE_PCT", "origin": "unknown", "value": 1.0}, - {"name": "DD_PROFILING_UPLOAD_INTERVAL", "origin": "unknown", "value": 60.0}, - {"name": "DD_PROFILING_MAX_FRAMES", "origin": "unknown", "value": 64}, - {"name": "DD_REMOTE_CONFIGURATION_ENABLED", "origin": "unknown", "value": False}, - {"name": "DD_REMOTE_CONFIG_POLL_INTERVAL_SECONDS", "origin": "unknown", "value": 5.0}, - {"name": "DD_RUNTIME_METRICS_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_SERVICE_MAPPING", "origin": "unknown", "value": ""}, - {"name": "DD_SPAN_SAMPLING_RULES", "origin": "unknown", "value": None}, - {"name": "DD_SPAN_SAMPLING_RULES_FILE", "origin": "unknown", "value": None}, - {"name": "DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_TRACE_AGENT_HOSTNAME", "origin": "default", "value": None}, - {"name": "DD_TRACE_AGENT_PORT", "origin": "default", "value": None}, - {"name": "DD_TRACE_AGENT_TIMEOUT_SECONDS", "origin": "unknown", "value": 2.0}, - {"name": "DD_TRACE_API_VERSION", "origin": "unknown", "value": None}, - {"name": "DD_TRACE_CLIENT_IP_ENABLED", "origin": "unknown", "value": None}, - {"name": "DD_TRACE_COMPUTE_STATS", "origin": "unknown", "value": False}, - {"name": "DD_TRACE_DEBUG", "origin": "unknown", "value": False}, - {"name": "DD_TRACE_HEALTH_METRICS_ENABLED", "origin": "unknown", "value": False}, - { - "name": "DD_TRACE_OBFUSCATION_QUERY_STRING_REGEXP", - "origin": "unknown", - "value": DD_TRACE_OBFUSCATION_QUERY_STRING_REGEXP_DEFAULT, - }, - {"name": "DD_TRACE_OTEL_ENABLED", "origin": "unknown", "value": False}, - {"name": "DD_TRACE_PARTIAL_FLUSH_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_TRACE_PARTIAL_FLUSH_MIN_SPANS", "origin": "unknown", "value": 300}, - { - "name": "DD_TRACE_PEER_SERVICE_DEFAULTS_ENABLED", - "origin": "default", - "value": False, - }, - { - "name": "DD_TRACE_PEER_SERVICE_MAPPING", - "origin": "env_var", - "value": "default_service:remapped_service", - }, - {"name": "DD_TRACE_PEER_SERVICE_DEFAULTS_ENABLED", "origin": "unknown", "value": False}, - {"name": "DD_TRACE_PEER_SERVICE_MAPPING", "origin": "unknown", "value": ""}, - { - "name": "DD_TRACE_PROPAGATION_STYLE_EXTRACT", - "origin": "unknown", - "value": "datadog,tracecontext", - }, - {"name": "DD_TRACE_PROPAGATION_STYLE_INJECT", "origin": "unknown", "value": "datadog,tracecontext"}, - {"name": "DD_TRACE_RATE_LIMIT", "origin": "unknown", "value": 100}, - {"name": "DD_TRACE_REMOVE_INTEGRATION_SERVICE_NAMES_ENABLED", "origin": "unknown", "value": False}, - {"name": "DD_TRACE_SPAN_ATTRIBUTE_SCHEMA", "origin": "unknown", "value": "v0"}, - {"name": "DD_TRACE_STARTUP_LOGS", "origin": "unknown", "value": False}, - {"name": "DD_TRACE_WRITER_BUFFER_SIZE_BYTES", "origin": "unknown", "value": 20 << 20}, - {"name": "DD_TRACE_WRITER_INTERVAL_SECONDS", "origin": "unknown", "value": 1.0}, - {"name": "DD_TRACE_WRITER_MAX_PAYLOAD_SIZE_BYTES", "origin": "unknown", "value": 20 << 20}, - {"name": "DD_TRACE_WRITER_REUSE_CONNECTIONS", "origin": "unknown", "value": False}, - {"name": "instrumentation_source", "origin": "code", "value": "manual"}, - {"name": "profiling_enabled", "origin": "default", "value": "false"}, - {"name": "data_streams_enabled", "origin": "default", "value": "false"}, - {"name": "appsec_enabled", "origin": "default", "value": "false"}, - {"name": "crashtracking_create_alt_stack", "origin": "unknown", "value": True}, - {"name": "crashtracking_use_alt_stack", "origin": "unknown", "value": True}, - {"name": "crashtracking_available", "origin": "unknown", "value": sys.platform == "linux"}, - {"name": "crashtracking_debug_url", "origin": "unknown", "value": None}, - {"name": "crashtracking_enabled", "origin": "unknown", "value": sys.platform == "linux"}, - {"name": "crashtracking_stacktrace_resolver", "origin": "unknown", "value": "full"}, - {"name": "crashtracking_started", "origin": "unknown", "value": False}, - {"name": "crashtracking_stderr_filename", "origin": "unknown", "value": None}, - {"name": "crashtracking_stdout_filename", "origin": "unknown", "value": None}, - { - "name": "python_build_gnu_type", - "origin": "unknown", - "value": sysconfig.get_config_var("BUILD_GNU_TYPE"), - }, - { - "name": "python_host_gnu_type", - "origin": "unknown", - "value": sysconfig.get_config_var("HOST_GNU_TYPE"), - }, - {"name": "python_soabi", "origin": "unknown", "value": sysconfig.get_config_var("SOABI")}, - {"name": "trace_sample_rate", "origin": "default", "value": "1.0"}, - {"name": "trace_sampling_rules", "origin": "default", "value": ""}, - {"name": "trace_header_tags", "origin": "default", "value": ""}, - {"name": "logs_injection_enabled", "origin": "default", "value": True}, - {"name": "trace_tags", "origin": "default", "value": ""}, - {"name": "trace_enabled", "origin": "default", "value": "true"}, - {"name": "instrumentation_config_id", "origin": "default", "value": ""}, - {"name": "DD_INJECT_FORCE", "origin": "unknown", "value": True}, - {"name": "DD_LIB_INJECTED", "origin": "unknown", "value": False}, - {"name": "DD_LIB_INJECTION_ATTEMPTED", "origin": "unknown", "value": False}, - ], - key=lambda x: x["name"], - ), - "error": { - "code": 0, - "message": "", - }, - } - requests[0]["body"]["payload"]["configuration"].sort(key=lambda c: c["name"]) - result = _get_request_body(payload, "app-started") - result["payload"]["configuration"] = [ - a for a in result["payload"]["configuration"] if a["name"] != "DD_TRACE_AGENT_URL" - ] - assert payload == result["payload"] + assert requests[0]["headers"]["DD-Telemetry-Request-Type"] == "message-batch" + app_started_events = test_agent_session.get_events("app-started") + assert len(app_started_events) == 1 + validate_request_body(app_started_events[0], None, "app-started") + assert len(app_started_events[0]["payload"]) == 3 + assert app_started_events[0]["payload"].get("configuration") + assert app_started_events[0]["payload"].get("products") + assert app_started_events[0]["payload"].get("error") == {"code": 0, "message": ""} def test_app_started_event_configuration_override(test_agent_session, run_python_code_in_subprocess, tmpdir): @@ -288,9 +184,8 @@ def test_app_started_event_configuration_override(test_agent_session, run_python env["DD_INJECT_FORCE"] = "true" env["DD_INJECTION_ENABLED"] = "tracer" - # By default telemetry collection is enabled after 10 seconds, so we either need to - # to sleep for 10 seconds or manually call _app_started() to generate the app started event. - # This delay allows us to collect start up errors and dynamic configurations + # Ensures app-started event is queued immediately after ddtrace is imported + # instead of waiting for 10 seconds. env["_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED"] = "true" _, stderr, status, _ = run_python_code_in_subprocess(code, env=env) @@ -706,7 +601,7 @@ def test_update_dependencies_event_when_disabled(test_agent_session, ddtrace_run # Import httppretty after ddtrace is imported, this ensures that the module is sent in a dependencies event # Imports httpretty twice and ensures only one dependency entry is sent _, stderr, status, _ = ddtrace_run_python_code_in_subprocess("import xmltodict", env=env) - events = test_agent_session.get_events("app-dependencies-loaded", subprocess=True) + events = test_agent_session.get_events("app-dependencies-loaded") assert len(events) == 0, events @@ -734,17 +629,17 @@ def test_update_dependencies_event_not_stdlib(test_agent_session, ddtrace_run_py def test_app_closing_event(telemetry_writer, test_agent_session, mock_time): """asserts that app_shutdown() queues and sends an app-closing telemetry request""" - # app started event must be queued before any other telemetry event - telemetry_writer._app_started(register_app_shutdown=False) - assert telemetry_writer.started + # Telemetry writer must start before app-closing event is queued + telemetry_writer.started = True # send app closed event telemetry_writer.app_shutdown() - requests = test_agent_session.get_requests("app-closing") - assert len(requests) == 1 + num_requests = len(test_agent_session.get_requests()) + assert num_requests == 1 # ensure a valid request body was sent - totel_events = len(test_agent_session.get_events()) - assert requests[0]["body"] == _get_request_body({}, "app-closing", totel_events) + events = test_agent_session.get_events("app-closing") + assert len(events) == 1 + validate_request_body(events[0], {}, "app-closing", num_requests) def test_add_integration(telemetry_writer, test_agent_session, mock_time): @@ -756,12 +651,11 @@ def test_add_integration(telemetry_writer, test_agent_session, mock_time): # send integrations to the agent telemetry_writer.periodic(force_flush=True) - requests = test_agent_session.get_requests("app-integrations-change") + events = test_agent_session.get_events("app-integrations-change") # assert integration change telemetry request was sent - assert len(requests) == 1 - + assert len(events) == 1 # assert that the request had a valid request body - requests[0]["body"]["payload"]["integrations"].sort(key=lambda x: x["name"]) + events[0]["payload"]["integrations"].sort(key=lambda x: x["name"]) expected_payload = { "integrations": [ { @@ -782,7 +676,7 @@ def test_add_integration(telemetry_writer, test_agent_session, mock_time): }, ] } - assert requests[0]["body"] == _get_request_body(expected_payload, "app-integrations-change", seq_id=2) + validate_request_body(events[0], expected_payload, "app-integrations-change") def test_app_client_configuration_changed_event(telemetry_writer, test_agent_session, mock_time): @@ -822,7 +716,7 @@ def test_add_integration_disabled_writer(telemetry_writer, test_agent_session): telemetry_writer.add_integration("integration-name", True, False, "") telemetry_writer.periodic(force_flush=True) - assert len(test_agent_session.get_requests("app-integrations-change")) == 0 + assert len(test_agent_session.get_events("app-integrations-change")) == 0 @pytest.mark.parametrize("mock_status", [300, 400, 401, 403, 500]) @@ -839,8 +733,9 @@ def test_send_failing_request(mock_status, telemetry_writer): telemetry_writer.periodic(force_flush=True) # asserts unsuccessful status code was logged log.debug.assert_called_with( - "Failed to send Instrumentation Telemetry to %s. response: %s", + "Failed to send Instrumentation Telemetry to %s. Event(s): %s. Response: %s", telemetry_writer._client.url, + mock.ANY, mock_status, ) @@ -859,7 +754,7 @@ def test_app_heartbeat_event_periodic(mock_time, telemetry_writer, test_agent_se # Assert next flush contains app-heartbeat event for _ in range(telemetry_writer._periodic_threshold): telemetry_writer.periodic() - assert test_agent_session.get_events("app-heartbeat", filter_heartbeats=False) == [] + assert test_agent_session.get_events(mock.ANY, filter_heartbeats=False) == [] telemetry_writer.periodic() heartbeat_events = test_agent_session.get_events("app-heartbeat", filter_heartbeats=False) @@ -871,7 +766,7 @@ def test_app_heartbeat_event(mock_time, telemetry_writer, test_agent_session): """asserts that we queue/send app-heartbeat event every 60 seconds when app_heartbeat_event() is called""" # Assert a maximum of one heartbeat is queued per flush telemetry_writer.periodic(force_flush=True) - events = test_agent_session.get_events("app-heartbeat", filter_heartbeats=False) + events = test_agent_session.get_events(mock.ANY, filter_heartbeats=False) assert len(events) > 0 @@ -888,7 +783,7 @@ def test_app_product_change_event(mock_time, telemetry_writer, test_agent_sessio telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.APPSEC, True) assert all(telemetry_writer._product_enablement.values()) - telemetry_writer._app_started() + telemetry_writer.periodic(force_flush=True) # Assert that there's only an app_started event (since product activation happened before the app started) events = test_agent_session.get_events("app-product-change") @@ -920,20 +815,21 @@ def test_app_product_change_event(mock_time, telemetry_writer, test_agent_sessio } -def _get_request_body(payload, payload_type, seq_id=1): - # type: (Dict, str, int) -> Dict +def validate_request_body(received_body, payload, payload_type, seq_id=None): + # type: (Dict, Dict, str, Optional[int]) -> Dict """used to test the body of requests received by the testagent""" - return { - "tracer_time": time.time(), - "runtime_id": get_runtime_id(), - "api_version": "v2", - "debug": False, - "seq_id": seq_id, - "application": get_application(config.service, config.version, config.env), - "host": get_host_info(), - "payload": payload, - "request_type": payload_type, - } + assert len(received_body) == 9 + assert received_body["tracer_time"] == time.time() + assert received_body["runtime_id"] == get_runtime_id() + assert received_body["api_version"] == "v2" + assert received_body["debug"] is False + if seq_id is not None: + assert received_body["seq_id"] == seq_id + assert received_body["application"] == get_application(config.service, config.version, config.env) + assert received_body["host"] == get_host_info() + if payload is not None: + assert received_body["payload"] == payload + assert received_body["request_type"] == payload_type def test_telemetry_writer_agent_setup(): @@ -1130,7 +1026,7 @@ def test_add_integration_error_log_with_log_collection_disabled(mock_time, telem telemetry_writer.add_integration_error_log("Test error message", e) telemetry_writer.periodic(force_flush=True) - log_events = test_agent_session.get_events("logs", subprocess=True) + log_events = test_agent_session.get_events("logs") assert len(log_events) == 0 finally: telemetry_config.LOG_COLLECTION_ENABLED = original_value