From 36cc6c293d007416f824f2a6744f43527cf5646c Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Tue, 7 Jan 2025 14:18:47 +0100 Subject: [PATCH 1/2] chore: dropped `span.add_event()` usage - we realised that this causes a lot of bloat in the console output and isn't actually used yet --- src/schema_registry/reader.py | 5 +---- src/schema_registry/routers/health.py | 7 ------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/src/schema_registry/reader.py b/src/schema_registry/reader.py index b2c5d6f67..67c9e4dc8 100644 --- a/src/schema_registry/reader.py +++ b/src/schema_registry/reader.py @@ -380,10 +380,7 @@ def highest_offset(self) -> int: return max(self._highest_offset, self._offset_watcher.greatest_offset()) def ready(self) -> bool: - with self._tracer.get_tracer().start_as_current_span( - self._tracer.get_name_from_caller_with_class(self, self.ready) - ) as span: - span.add_event("Acquiring ready lock") + with self._tracer.get_tracer().start_as_current_span(self._tracer.get_name_from_caller_with_class(self, self.ready)): with self._ready_lock: return self._ready diff --git a/src/schema_registry/routers/health.py b/src/schema_registry/routers/health.py index f41fa13cb..cad041a7a 100644 --- a/src/schema_registry/routers/health.py +++ b/src/schema_registry/routers/health.py @@ -38,7 +38,6 @@ class HealthCheck(BaseModel): def set_health_status_tracing_attributes(health_check_span: Span, health_status: HealthStatus) -> None: - health_check_span.add_event("Setting health status tracing attributes") health_check_span.set_attribute("schema_registry_ready", health_status.schema_registry_ready) health_check_span.set_attribute("schema_registry_startup_time_sec", health_status.schema_registry_startup_time_sec) health_check_span.set_attribute( @@ -59,15 +58,11 @@ async def health( with tracer.get_tracer().start_as_current_span("APIRouter: health_check") as health_check_span: starttime = 0.0 - health_check_span.add_event("Checking schema-reader is ready") schema_reader_is_ready = schema_registry.schema_reader.ready() if schema_reader_is_ready: starttime = schema_registry.schema_reader.last_check - schema_registry.schema_reader.start_time - health_check_span.add_event("Getting schema-registry master-coordinator status") cs = schema_registry.mc.get_coordinator_status() - - health_check_span.add_event("Building health status response model") health_status = HealthStatus( schema_registry_ready=schema_reader_is_ready, schema_registry_startup_time_sec=starttime, @@ -85,9 +80,7 @@ async def health( # resp["schema_registry_authfile_timestamp"] = self._auth.authfile_last_modified if not await schema_registry.schema_reader.is_healthy(): - health_check_span.add_event("Erroring because schema-reader is not healthy") health_check_span.set_status(status=StatusCode.ERROR, description="Schema reader is not healthy") raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE) - health_check_span.add_event("Returning health check response") return HealthCheck(status=health_status, healthy=True) From 38db86c06be62f26863b34ca8e76dc9c90ed5711 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Tue, 7 Jan 2025 16:02:25 +0100 Subject: [PATCH 2/2] tracer: added noop exporter support --- .github/workflows/lint.yml | 1 - .github/workflows/tests.yml | 1 - container/compose.yml | 1 + src/karapace/config.py | 8 +++ src/schema_registry/telemetry/tracer.py | 44 ++++++++++++--- .../schema_registry/telemetry/test_tracer.py | 56 +++++++++++++++++-- 6 files changed, 97 insertions(+), 14 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index e830d6f2c..dacbbbe13 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -43,7 +43,6 @@ jobs: - uses: actions/setup-python@v5 with: - cache: pip python-version: '3.12' - name: Resolve Karapace version diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 016dff288..844de9861 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -35,7 +35,6 @@ jobs: - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 with: - cache: pip python-version: ${{ matrix.python-version }} - name: Set up Go diff --git a/container/compose.yml b/container/compose.yml index c42d4b428..263685e2f 100644 --- a/container/compose.yml +++ b/container/compose.yml @@ -86,6 +86,7 @@ services: KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true KARAPACE_TAGS__APP: karapace-schema-registry KARAPACE_TELEMETRY__OTEL_ENDPOINT_URL: http://opentelemetry-collector:4317 + KARAPACE_TELEMETRY__OTEL_EXPORTER: OTLP KARAPACE_TELEMETRY__RESOURCE_SERVICE_NAME: karapace-schema-registry KARAPACE_TELEMETRY__RESOURCE_SERVICE_INSTANCE_ID: sr1 KARAPACE_TELEMETRY__RESOURCE_TELEMETRY_SDK_NAME: opentelemetry diff --git a/src/karapace/config.py b/src/karapace/config.py index 0dd811d92..be9d60454 100644 --- a/src/karapace/config.py +++ b/src/karapace/config.py @@ -15,6 +15,7 @@ from pydantic import BaseModel, ImportString from pydantic_settings import BaseSettings, SettingsConfigDict +import enum import logging import os import socket @@ -27,8 +28,15 @@ class KarapaceTags(BaseModel): app: str = "Karapace" +class KarapaceTelemetryOTelExporter(str, enum.Enum): + OTLP = "OTLP" + CONSOLE = "CONSOLE" + NOOP = "NOOP" + + class KarapaceTelemetry(BaseModel): otel_endpoint_url: str | None = None + otel_exporter: KarapaceTelemetryOTelExporter = KarapaceTelemetryOTelExporter.NOOP resource_service_name: str = "karapace" resource_service_instance_id: str = "karapace" resource_telemetry_sdk_name: str = "opentelemetry" diff --git a/src/schema_registry/telemetry/tracer.py b/src/schema_registry/telemetry/tracer.py index e905e0edc..b2c19e583 100644 --- a/src/schema_registry/telemetry/tracer.py +++ b/src/schema_registry/telemetry/tracer.py @@ -3,14 +3,22 @@ See LICENSE for details """ -from collections.abc import Callable +from collections.abc import Callable, Sequence from dependency_injector.wiring import inject, Provide from fastapi import Request, Response -from karapace.config import Config +from karapace.config import Config, KarapaceTelemetryOTelExporter from karapace.container import KarapaceContainer from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SimpleSpanProcessor, SpanProcessor +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import ( + BatchSpanProcessor, + ConsoleSpanExporter, + SimpleSpanProcessor, + SpanExporter, + SpanExportResult, + SpanProcessor, +) from opentelemetry.semconv.attributes import ( client_attributes as C, http_attributes as H, @@ -22,19 +30,41 @@ import inspect +class NOOPSpanExporter(SpanExporter): + """Implementation of :class:`SpanExporter` that does nothing. + + This class is intended to be used when tracing exporting to an OTel backend is disabled + and the ConsoleExporter is too verbose to be used. + """ + + def export(self, _: Sequence[ReadableSpan]) -> SpanExportResult: + return SpanExportResult.SUCCESS + + def force_flush(self, _: int = 0) -> bool: + return False + + class Tracer: @staticmethod @inject def get_tracer(config: Config = Provide[KarapaceContainer.config]) -> trace.Tracer: return trace.get_tracer(f"{config.tags.app}.tracer") + @staticmethod + def get_span_exporter(config: Config) -> SpanExporter: + match config.telemetry.otel_exporter: + case KarapaceTelemetryOTelExporter.NOOP: + return NOOPSpanExporter() + case KarapaceTelemetryOTelExporter.CONSOLE: + return ConsoleSpanExporter() + case KarapaceTelemetryOTelExporter.OTLP: + return OTLPSpanExporter(endpoint=config.telemetry.otel_endpoint_url) + @staticmethod @inject def get_span_processor(config: Config = Provide[KarapaceContainer.config]) -> SpanProcessor: - if config.telemetry.otel_endpoint_url: - otlp_span_exporter = OTLPSpanExporter(endpoint=config.telemetry.otel_endpoint_url) - return BatchSpanProcessor(otlp_span_exporter) - return SimpleSpanProcessor(ConsoleSpanExporter()) + processor = BatchSpanProcessor if config.telemetry.otel_endpoint_url else SimpleSpanProcessor + return processor(Tracer.get_span_exporter(config=config)) @staticmethod def get_name_from_caller() -> str: diff --git a/tests/unit/schema_registry/telemetry/test_tracer.py b/tests/unit/schema_registry/telemetry/test_tracer.py index f1edabde2..6e307f8d2 100644 --- a/tests/unit/schema_registry/telemetry/test_tracer.py +++ b/tests/unit/schema_registry/telemetry/test_tracer.py @@ -8,9 +8,9 @@ from fastapi import Request, Response from karapace.config import KarapaceTelemetry from karapace.container import KarapaceContainer -from opentelemetry.sdk.trace.export import SpanProcessor +from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SpanExporter, SpanProcessor from opentelemetry.trace.span import Span -from schema_registry.telemetry.tracer import Tracer +from schema_registry.telemetry.tracer import NOOPSpanExporter, Tracer from unittest.mock import call, MagicMock, patch @@ -35,9 +35,55 @@ def test_function(self): assert Test().test_function() == "Test.test_function()" +def test_get_span_exporter_noop(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( + new_config={ + "telemetry": KarapaceTelemetry( + otel_endpoint_url="http://otel:4317", + otel_exporter="NOOP", + ) + } + ) + exporter: SpanExporter = Tracer.get_span_exporter(config=config) + assert isinstance(exporter, NOOPSpanExporter) + + +def test_get_span_exporter_console(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( + new_config={ + "telemetry": KarapaceTelemetry( + otel_endpoint_url="http://otel:4317", + otel_exporter="CONSOLE", + ) + } + ) + exporter: SpanExporter = Tracer.get_span_exporter(config=config) + assert isinstance(exporter, ConsoleSpanExporter) + + +def test_get_span_exporter_otlp(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( + new_config={ + "telemetry": KarapaceTelemetry( + otel_endpoint_url="http://otel:4317", + otel_exporter="OTLP", + ) + } + ) + with patch("schema_registry.telemetry.tracer.OTLPSpanExporter") as mock_otlp_exporter: + exporter: SpanExporter = Tracer.get_span_exporter(config=config) + mock_otlp_exporter.assert_called_once_with(endpoint="http://otel:4317") + assert exporter is mock_otlp_exporter.return_value + + def test_get_span_processor_with_otel_endpoint(karapace_container: KarapaceContainer) -> None: config = karapace_container.config().set_config_defaults( - new_config={"telemetry": KarapaceTelemetry(otel_endpoint_url="http://otel:4317")} + new_config={ + "telemetry": KarapaceTelemetry( + otel_endpoint_url="http://otel:4317", + otel_exporter="OTLP", + ) + } ) with ( patch("schema_registry.telemetry.tracer.OTLPSpanExporter") as mock_otlp_exporter, @@ -54,11 +100,11 @@ def test_get_span_processor_without_otel_endpoint(karapace_container: KarapaceCo new_config={"telemetry": KarapaceTelemetry(otel_endpoint_url=None)} ) with ( - patch("schema_registry.telemetry.tracer.ConsoleSpanExporter") as mock_console_exporter, patch("schema_registry.telemetry.tracer.SimpleSpanProcessor") as mock_simple_span_processor, + patch("schema_registry.telemetry.tracer.NOOPSpanExporter") as mock_noop_exporter, ): processor: SpanProcessor = Tracer.get_span_processor(config=config) - mock_simple_span_processor.assert_called_once_with(mock_console_exporter.return_value) + mock_simple_span_processor.assert_called_once_with(mock_noop_exporter.return_value) assert processor is mock_simple_span_processor.return_value