From 690580632263ca732a79e8ca92fce16fc7e28a83 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Fri, 13 Dec 2024 15:00:42 -0800 Subject: [PATCH] Integration Exception Tracking Collect, dedupe, ddtrace.contrib logs, and send to the telemetry. --- ddtrace/internal/logger.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/ddtrace/internal/logger.py b/ddtrace/internal/logger.py index 0592afa0a74..bc7a3dd3da4 100644 --- a/ddtrace/internal/logger.py +++ b/ddtrace/internal/logger.py @@ -120,6 +120,12 @@ def __init__(self, *args, **kwargs): else: self.rate_limit = 60 + self.telemetry_log_buckets = None + if _TelemetryConfig.LOG_COLLECTION_ENABLED and self.name.startswith("ddtrace.contrib."): + self.telemetry_log_buckets = collections.defaultdict( + lambda: DDLogger.LoggingBucket(0, 0) + ) # type: DefaultDict[Tuple[str, int, str, int], DDLogger.LoggingBucket] + def handle(self, record): # type: (logging.LogRecord) -> None """ @@ -142,6 +148,10 @@ def handle(self, record): full_file_name = os.path.join(record.pathname, record.filename) telemetry.telemetry_writer.add_error(1, record.msg % record.args, full_file_name, record.lineno) + if self.telemetry_log_buckets is not None: + # TODO exclude debug logs:: and record.levelno > logging.DEBUG: + self._report_telemetry_log(record) + # If rate limiting has been disabled (`DD_TRACE_LOGGING_RATE=0`) then apply no rate limit # If the logging is in debug, then do not apply any limits to any log if not self.rate_limit or self.getEffectiveLevel() == logging.DEBUG: @@ -178,3 +188,29 @@ def handle(self, record): # Increment the count of records we have skipped # DEV: `self.buckets[key]` is a tuple which is immutable so recreate instead self.buckets[key] = DDLogger.LoggingBucket(logging_bucket.bucket, logging_bucket.skipped + 1) + + def _report_telemetry_log(self, record): + from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL + key = (record.name, record.levelno, record.pathname, record.lineno) + current_bucket = int(record.created / _TelemetryConfig.TELEMETRY_HEARTBEAT_INTERVAL) + key_bucket = self.telemetry_log_buckets[key] + if key_bucket.bucket == current_bucket: + self.telemetry_log_buckets[key] = DDLogger.LoggingBucket(key_bucket.bucket, key_bucket.skipped + 1) + else: + self.telemetry_log_buckets[key] = DDLogger.LoggingBucket(current_bucket, 0) + level = ( + TELEMETRY_LOG_LEVEL.ERROR if record.levelno >= logging.ERROR + else TELEMETRY_LOG_LEVEL.WARNING if record.levelno == logging.WARNING + else TELEMETRY_LOG_LEVEL.DEBUG + ) + from ddtrace.internal import telemetry + # TODO stack_trace = record.exc_info if "".join(format_exception(*record.exc_info)) else None + # TODO tags + telemetry.telemetry_writer.add_log(level, record.msg) # TODO stacktrace and tags + +class _TelemetryConfig: + TELEMETRY_ENABLED = os.getenv("DD_INSTRUMENTATION_TELEMETRY_ENABLED", "true").lower() in ("true", "1") + LOG_COLLECTION_ENABLED = TELEMETRY_ENABLED and os.getenv("DD_TELEMETRY_LOG_COLLECTION_ENABLED", "true").lower() in ("true", "1") + TELEMETRY_HEARTBEAT_INTERVAL = float(os.getenv("DD_TELEMETRY_HEARTBEAT_INTERVAL", "60")) + +