From 690580632263ca732a79e8ca92fce16fc7e28a83 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Fri, 13 Dec 2024 15:00:42 -0800 Subject: [PATCH 1/5] Integration Exception Tracking Collect, dedupe, ddtrace.contrib logs, and send to the telemetry. --- ddtrace/internal/logger.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/ddtrace/internal/logger.py b/ddtrace/internal/logger.py index 0592afa0a74..bc7a3dd3da4 100644 --- a/ddtrace/internal/logger.py +++ b/ddtrace/internal/logger.py @@ -120,6 +120,12 @@ def __init__(self, *args, **kwargs): else: self.rate_limit = 60 + self.telemetry_log_buckets = None + if _TelemetryConfig.LOG_COLLECTION_ENABLED and self.name.startswith("ddtrace.contrib."): + self.telemetry_log_buckets = collections.defaultdict( + lambda: DDLogger.LoggingBucket(0, 0) + ) # type: DefaultDict[Tuple[str, int, str, int], DDLogger.LoggingBucket] + def handle(self, record): # type: (logging.LogRecord) -> None """ @@ -142,6 +148,10 @@ def handle(self, record): full_file_name = os.path.join(record.pathname, record.filename) telemetry.telemetry_writer.add_error(1, record.msg % record.args, full_file_name, record.lineno) + if self.telemetry_log_buckets is not None: + # TODO exclude debug logs:: and record.levelno > logging.DEBUG: + self._report_telemetry_log(record) + # If rate limiting has been disabled (`DD_TRACE_LOGGING_RATE=0`) then apply no rate limit # If the logging is in debug, then do not apply any limits to any log if not self.rate_limit or self.getEffectiveLevel() == logging.DEBUG: @@ -178,3 +188,29 @@ def handle(self, record): # Increment the count of records we have skipped # DEV: `self.buckets[key]` is a tuple which is immutable so recreate instead self.buckets[key] = DDLogger.LoggingBucket(logging_bucket.bucket, logging_bucket.skipped + 1) + + def _report_telemetry_log(self, record): + from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL + key = (record.name, record.levelno, record.pathname, record.lineno) + current_bucket = int(record.created / _TelemetryConfig.TELEMETRY_HEARTBEAT_INTERVAL) + key_bucket = self.telemetry_log_buckets[key] + if key_bucket.bucket == current_bucket: + self.telemetry_log_buckets[key] = DDLogger.LoggingBucket(key_bucket.bucket, key_bucket.skipped + 1) + else: + self.telemetry_log_buckets[key] = DDLogger.LoggingBucket(current_bucket, 0) + level = ( + TELEMETRY_LOG_LEVEL.ERROR if record.levelno >= logging.ERROR + else TELEMETRY_LOG_LEVEL.WARNING if record.levelno == logging.WARNING + else TELEMETRY_LOG_LEVEL.DEBUG + ) + from ddtrace.internal import telemetry + # TODO stack_trace = record.exc_info if "".join(format_exception(*record.exc_info)) else None + # TODO tags + telemetry.telemetry_writer.add_log(level, record.msg) # TODO stacktrace and tags + +class _TelemetryConfig: + TELEMETRY_ENABLED = os.getenv("DD_INSTRUMENTATION_TELEMETRY_ENABLED", "true").lower() in ("true", "1") + LOG_COLLECTION_ENABLED = TELEMETRY_ENABLED and os.getenv("DD_TELEMETRY_LOG_COLLECTION_ENABLED", "true").lower() in ("true", "1") + TELEMETRY_HEARTBEAT_INTERVAL = float(os.getenv("DD_TELEMETRY_HEARTBEAT_INTERVAL", "60")) + + From c4091439989ec63d95504bec65f199ecd9a398b7 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Fri, 13 Dec 2024 17:41:00 -0800 Subject: [PATCH 2/5] Integration Exception Tracking Report only an error or an exception with a stack trace. Added tags and stack trace (without redaction) --- ddtrace/internal/logger.py | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/ddtrace/internal/logger.py b/ddtrace/internal/logger.py index bc7a3dd3da4..157ee8f50ab 100644 --- a/ddtrace/internal/logger.py +++ b/ddtrace/internal/logger.py @@ -1,6 +1,7 @@ import collections import logging import os +import traceback import typing from typing import Optional # noqa:F401 from typing import cast # noqa:F401 @@ -121,10 +122,12 @@ def __init__(self, *args, **kwargs): self.rate_limit = 60 self.telemetry_log_buckets = None - if _TelemetryConfig.LOG_COLLECTION_ENABLED and self.name.startswith("ddtrace.contrib."): - self.telemetry_log_buckets = collections.defaultdict( - lambda: DDLogger.LoggingBucket(0, 0) - ) # type: DefaultDict[Tuple[str, int, str, int], DDLogger.LoggingBucket] + if _TelemetryConfig.LOG_COLLECTION_ENABLED: + if self.name.startswith("ddtrace.contrib."): + # Collect only errors logged within the integration package + self.telemetry_log_buckets = collections.defaultdict( + lambda: DDLogger.LoggingBucket(0, 0) + ) # type: DefaultDict[Tuple[str, int, str, int], DDLogger.LoggingBucket] def handle(self, record): # type: (logging.LogRecord) -> None @@ -149,7 +152,6 @@ def handle(self, record): telemetry.telemetry_writer.add_error(1, record.msg % record.args, full_file_name, record.lineno) if self.telemetry_log_buckets is not None: - # TODO exclude debug logs:: and record.levelno > logging.DEBUG: self._report_telemetry_log(record) # If rate limiting has been disabled (`DD_TRACE_LOGGING_RATE=0`) then apply no rate limit @@ -190,6 +192,7 @@ def handle(self, record): self.buckets[key] = DDLogger.LoggingBucket(logging_bucket.bucket, logging_bucket.skipped + 1) def _report_telemetry_log(self, record): + # type: (logging.LogRecord) -> None from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL key = (record.name, record.levelno, record.pathname, record.lineno) current_bucket = int(record.created / _TelemetryConfig.TELEMETRY_HEARTBEAT_INTERVAL) @@ -204,9 +207,19 @@ def _report_telemetry_log(self, record): else TELEMETRY_LOG_LEVEL.DEBUG ) from ddtrace.internal import telemetry - # TODO stack_trace = record.exc_info if "".join(format_exception(*record.exc_info)) else None - # TODO tags - telemetry.telemetry_writer.add_log(level, record.msg) # TODO stacktrace and tags + tags = { + "lib_language": "python", + } + stack_trace = None + if record.exc_info: + _, _, traceback_object = record.exc_info + if traceback_object: + stack_trace = ''.join(traceback.format_tb(traceback_object)) + # TODO redact absolute file paths and unknown packages + if record.levelno >= logging.ERROR or stack_trace is not None: + # Report only an error or an exception with a stack trace + telemetry.telemetry_writer.add_log(level, record.msg, tags=tags, stack_trace=stack_trace) + class _TelemetryConfig: TELEMETRY_ENABLED = os.getenv("DD_INSTRUMENTATION_TELEMETRY_ENABLED", "true").lower() in ("true", "1") From 1a2ac4fc02962f4c83d1ca02152d145a45a478da Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Fri, 13 Dec 2024 17:52:39 -0800 Subject: [PATCH 3/5] Integration Exception Tracking Add count --- ddtrace/internal/logger.py | 2 +- ddtrace/internal/telemetry/writer.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/ddtrace/internal/logger.py b/ddtrace/internal/logger.py index 157ee8f50ab..81b88060738 100644 --- a/ddtrace/internal/logger.py +++ b/ddtrace/internal/logger.py @@ -218,7 +218,7 @@ def _report_telemetry_log(self, record): # TODO redact absolute file paths and unknown packages if record.levelno >= logging.ERROR or stack_trace is not None: # Report only an error or an exception with a stack trace - telemetry.telemetry_writer.add_log(level, record.msg, tags=tags, stack_trace=stack_trace) + telemetry.telemetry_writer.add_log(level, record.msg, tags=tags, stack_trace=stack_trace, count=key_bucket.skipped + 1) class _TelemetryConfig: diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index 71de6b03907..b0ee860f5af 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -478,8 +478,8 @@ def add_configurations(self, configuration_list): "value": value, } - def add_log(self, level, message, stack_trace="", tags=None): - # type: (TELEMETRY_LOG_LEVEL, str, str, Optional[Dict]) -> None + def add_log(self, level, message, stack_trace="", tags=None, count=1): + # type: (TELEMETRY_LOG_LEVEL, str, str, Optional[Dict], int) -> None """ Queues log. This event is meant to send library logs to Datadog’s backend through the Telemetry intake. This will make support cycles easier and ensure we know about potentially silent issues in libraries. @@ -499,6 +499,8 @@ def add_log(self, level, message, stack_trace="", tags=None): data["tags"] = ",".join(["%s:%s" % (k, str(v).lower()) for k, v in tags.items()]) if stack_trace: data["stack_trace"] = stack_trace + if count > 1: + data["count"] = count self._logs.add(data) def add_gauge_metric(self, namespace, name, value, tags=None): From ec8f7cad460b7bc617927928d105282abcfb1136 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Mon, 6 Jan 2025 15:56:12 -0800 Subject: [PATCH 4/5] Fix format --- ddtrace/internal/logger.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/ddtrace/internal/logger.py b/ddtrace/internal/logger.py index 81b88060738..3d08bd8f28a 100644 --- a/ddtrace/internal/logger.py +++ b/ddtrace/internal/logger.py @@ -194,6 +194,7 @@ def handle(self, record): def _report_telemetry_log(self, record): # type: (logging.LogRecord) -> None from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL + key = (record.name, record.levelno, record.pathname, record.lineno) current_bucket = int(record.created / _TelemetryConfig.TELEMETRY_HEARTBEAT_INTERVAL) key_bucket = self.telemetry_log_buckets[key] @@ -202,11 +203,14 @@ def _report_telemetry_log(self, record): else: self.telemetry_log_buckets[key] = DDLogger.LoggingBucket(current_bucket, 0) level = ( - TELEMETRY_LOG_LEVEL.ERROR if record.levelno >= logging.ERROR - else TELEMETRY_LOG_LEVEL.WARNING if record.levelno == logging.WARNING + TELEMETRY_LOG_LEVEL.ERROR + if record.levelno >= logging.ERROR + else TELEMETRY_LOG_LEVEL.WARNING + if record.levelno == logging.WARNING else TELEMETRY_LOG_LEVEL.DEBUG ) from ddtrace.internal import telemetry + tags = { "lib_language": "python", } @@ -214,16 +218,19 @@ def _report_telemetry_log(self, record): if record.exc_info: _, _, traceback_object = record.exc_info if traceback_object: - stack_trace = ''.join(traceback.format_tb(traceback_object)) + stack_trace = "".join(traceback.format_tb(traceback_object)) # TODO redact absolute file paths and unknown packages if record.levelno >= logging.ERROR or stack_trace is not None: # Report only an error or an exception with a stack trace - telemetry.telemetry_writer.add_log(level, record.msg, tags=tags, stack_trace=stack_trace, count=key_bucket.skipped + 1) + telemetry.telemetry_writer.add_log( + level, record.msg, tags=tags, stack_trace=stack_trace, count=key_bucket.skipped + 1 + ) class _TelemetryConfig: TELEMETRY_ENABLED = os.getenv("DD_INSTRUMENTATION_TELEMETRY_ENABLED", "true").lower() in ("true", "1") - LOG_COLLECTION_ENABLED = TELEMETRY_ENABLED and os.getenv("DD_TELEMETRY_LOG_COLLECTION_ENABLED", "true").lower() in ("true", "1") + LOG_COLLECTION_ENABLED = TELEMETRY_ENABLED and os.getenv("DD_TELEMETRY_LOG_COLLECTION_ENABLED", "true").lower() in ( + "true", + "1", + ) TELEMETRY_HEARTBEAT_INTERVAL = float(os.getenv("DD_TELEMETRY_HEARTBEAT_INTERVAL", "60")) - - From b132c4e81f2da11f247ffa7df3fcbd1d28483331 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Wed, 8 Jan 2025 22:13:40 -0800 Subject: [PATCH 5/5] Extract DDTelemetryLogger --- ddtrace/internal/logger.py | 43 +++++++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/ddtrace/internal/logger.py b/ddtrace/internal/logger.py index 3d08bd8f28a..ed190314503 100644 --- a/ddtrace/internal/logger.py +++ b/ddtrace/internal/logger.py @@ -47,7 +47,7 @@ def get_logger(name): logger = manager.loggerDict[name] if isinstance(manager.loggerDict[name], logging.PlaceHolder): placeholder = logger - logger = DDLogger(name=name) + logger = _new_logger(name=name) manager.loggerDict[name] = logger # DEV: `_fixupChildren` and `_fixupParents` have been around for awhile, # DEV: but add the `hasattr` guard... just in case. @@ -56,7 +56,7 @@ def get_logger(name): if hasattr(manager, "_fixupParents"): manager._fixupParents(logger) else: - logger = DDLogger(name=name) + logger = _new_logger(name=name) manager.loggerDict[name] = logger if hasattr(manager, "_fixupParents"): manager._fixupParents(logger) @@ -65,6 +65,13 @@ def get_logger(name): return cast(DDLogger, logger) +def _new_logger(name): + if _TelemetryConfig.LOG_COLLECTION_ENABLED: + if name.startswith("ddtrace.contrib."): + return DDTelemetryLogger(name=name) + return DDLogger(name=name) + + def hasHandlers(self): # type: (DDLogger) -> bool """ @@ -121,14 +128,6 @@ def __init__(self, *args, **kwargs): else: self.rate_limit = 60 - self.telemetry_log_buckets = None - if _TelemetryConfig.LOG_COLLECTION_ENABLED: - if self.name.startswith("ddtrace.contrib."): - # Collect only errors logged within the integration package - self.telemetry_log_buckets = collections.defaultdict( - lambda: DDLogger.LoggingBucket(0, 0) - ) # type: DefaultDict[Tuple[str, int, str, int], DDLogger.LoggingBucket] - def handle(self, record): # type: (logging.LogRecord) -> None """ @@ -151,9 +150,6 @@ def handle(self, record): full_file_name = os.path.join(record.pathname, record.filename) telemetry.telemetry_writer.add_error(1, record.msg % record.args, full_file_name, record.lineno) - if self.telemetry_log_buckets is not None: - self._report_telemetry_log(record) - # If rate limiting has been disabled (`DD_TRACE_LOGGING_RATE=0`) then apply no rate limit # If the logging is in debug, then do not apply any limits to any log if not self.rate_limit or self.getEffectiveLevel() == logging.DEBUG: @@ -191,8 +187,25 @@ def handle(self, record): # DEV: `self.buckets[key]` is a tuple which is immutable so recreate instead self.buckets[key] = DDLogger.LoggingBucket(logging_bucket.bucket, logging_bucket.skipped + 1) - def _report_telemetry_log(self, record): + +class DDTelemetryLogger(DDLogger): + """ + Logger that intercepts and reports exceptions to the telemetry. + """ + + def __init__(self, *args, **kwargs): + # type: (*Any, **Any) -> None + """Constructor for ``DDTelemetryLogger``""" + super(DDTelemetryLogger, self).__init__(*args, **kwargs) + + self.telemetry_log_buckets = collections.defaultdict( + lambda: DDLogger.LoggingBucket(0, 0) + ) # type: DefaultDict[Tuple[str, int, str, int], DDLogger.LoggingBucket] + + + def handle(self, record): # type: (logging.LogRecord) -> None + from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL key = (record.name, record.levelno, record.pathname, record.lineno) @@ -226,6 +239,8 @@ def _report_telemetry_log(self, record): level, record.msg, tags=tags, stack_trace=stack_trace, count=key_bucket.skipped + 1 ) + super().handle(record) + class _TelemetryConfig: TELEMETRY_ENABLED = os.getenv("DD_INSTRUMENTATION_TELEMETRY_ENABLED", "true").lower() in ("true", "1")