diff --git a/lib/charms/tempo_k8s/v1/charm_tracing.py b/lib/charms/tempo_k8s/v1/charm_tracing.py new file mode 100644 index 0000000..c146e6d --- /dev/null +++ b/lib/charms/tempo_k8s/v1/charm_tracing.py @@ -0,0 +1,557 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +"""This charm library contains utilities to instrument your Charm with opentelemetry tracing data collection. + +(yes! charm code, not workload code!) + +This means that, if your charm is related to, for example, COS' Tempo charm, you will be able to inspect +in real time from the Grafana dashboard the execution flow of your charm. + +To start using this library, you need to do two things: +1) decorate your charm class with + +`@trace_charm(tracing_endpoint="my_tracing_endpoint")` + +2) add to your charm a "my_tracing_endpoint" (you can name this attribute whatever you like) **property** +that returns an otlp http/https endpoint url. If you are using the `TracingEndpointProvider` as +`self.tracing = TracingEndpointProvider(self)`, the implementation could be: + +``` + @property + def my_tracing_endpoint(self) -> Optional[str]: + '''Tempo endpoint for charm tracing''' + if self.tracing.is_ready(): + return self.tracing.otlp_http_endpoint() + else: + return None +``` + +At this point your charm will be automatically instrumented so that: +- charm execution starts a trace, containing + - every event as a span (including custom events) + - every charm method call (except dunders) as a span + +if you wish to add more fine-grained information to the trace, you can do so by getting a hold of the tracer like so: +``` +import opentelemetry +... + @property + def tracer(self) -> opentelemetry.trace.Tracer: + return opentelemetry.trace.get_tracer(type(self).__name__) +``` + +By default, the tracer is named after the charm type. If you wish to override that, you can pass +a different `service_name` argument to `trace_charm`. + +*Upgrading from `v0`:* + +If you are upgrading from `charm_tracing` v0, you need to take the following steps (assuming you already +have the newest version of the library in your charm): +1) If you need the dependency for your tests, add the following dependency to your charm project +(or, if your project had a dependency on `opentelemetry-exporter-otlp-proto-grpc` only because +of `charm_tracing` v0, you can replace it with): + +`opentelemetry-exporter-otlp-proto-http>=1.21.0`. + +2) Update the charm method referenced to from `@trace` and `@trace_charm`, +to return from `TracingEndpointRequirer.otlp_http_endpoint()` instead of `grpc_http`. For example: + +``` + from charms.tempo_k8s.v0.charm_tracing import trace_charm + + @trace_charm( + tracing_endpoint="my_tracing_endpoint", + ) + class MyCharm(CharmBase): + + ... + + @property + def my_tracing_endpoint(self) -> Optional[str]: + '''Tempo endpoint for charm tracing''' + if self.tracing.is_ready(): + return self.tracing.otlp_grpc_endpoint() + else: + return None +``` + +needs to be replaced with: + +``` + from charms.tempo_k8s.v1.charm_tracing import trace_charm + + @trace_charm( + tracing_endpoint="my_tracing_endpoint", + ) + class MyCharm(CharmBase): + + ... + + @property + def my_tracing_endpoint(self) -> Optional[str]: + '''Tempo endpoint for charm tracing''' + if self.tracing.is_ready(): + return self.tracing.otlp_http_endpoint() + else: + return None +``` + +3) If you were passing a certificate using `server_cert`, you need to change it to provide an *absolute* path to +the certificate file. +""" + +import functools +import inspect +import logging +import os +from contextlib import contextmanager +from contextvars import Context, ContextVar, copy_context +from pathlib import Path +from typing import ( + Any, + Callable, + Generator, + Optional, + Sequence, + Type, + TypeVar, + Union, + cast, +) + +import opentelemetry +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import Span, TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.trace import INVALID_SPAN, Tracer +from opentelemetry.trace import get_current_span as otlp_get_current_span +from opentelemetry.trace import ( + get_tracer, + get_tracer_provider, + set_span_in_context, + set_tracer_provider, +) +from ops.charm import CharmBase +from ops.framework import Framework + +# The unique Charmhub library identifier, never change it +LIBID = "cb1705dcd1a14ca09b2e60187d1215c7" + +# Increment this major API version when introducing breaking changes +LIBAPI = 1 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version + +LIBPATCH = 2 + +PYDEPS = ["opentelemetry-exporter-otlp-proto-http>=1.21.0"] + +logger = logging.getLogger("tracing") + +tracer: ContextVar[Tracer] = ContextVar("tracer") +_GetterType = Union[Callable[[CharmBase], Optional[str]], property] + +CHARM_TRACING_ENABLED = "CHARM_TRACING_ENABLED" + + +def is_enabled() -> bool: + """Whether charm tracing is enabled.""" + return os.getenv(CHARM_TRACING_ENABLED, "1") == "1" + + +@contextmanager +def charm_tracing_disabled(): + """Contextmanager to temporarily disable charm tracing. + + For usage in tests. + """ + previous = os.getenv(CHARM_TRACING_ENABLED, "1") + os.environ[CHARM_TRACING_ENABLED] = "0" + yield + os.environ[CHARM_TRACING_ENABLED] = previous + + +def get_current_span() -> Union[Span, None]: + """Return the currently active Span, if there is one, else None. + + If you'd rather keep your logic unconditional, you can use opentelemetry.trace.get_current_span, + which will return an object that behaves like a span but records no data. + """ + span = otlp_get_current_span() + if span is INVALID_SPAN: + return None + return cast(Span, span) + + +def _get_tracer_from_context(ctx: Context) -> Optional[ContextVar]: + tracers = [v for v in ctx if v is not None and v.name == "tracer"] + if tracers: + return tracers[0] + return None + + +def _get_tracer() -> Optional[Tracer]: + """Find tracer in context variable and as a fallback locate it in the full context.""" + try: + return tracer.get() + except LookupError: + try: + ctx: Context = copy_context() + if context_tracer := _get_tracer_from_context(ctx): + return context_tracer.get() + else: + return None + except LookupError as err: + return None + + +@contextmanager +def _span(name: str) -> Generator[Optional[Span], Any, Any]: + """Context to create a span if there is a tracer, otherwise do nothing.""" + if tracer := _get_tracer(): + with tracer.start_as_current_span(name) as span: + yield cast(Span, span) + else: + yield None + + +_C = TypeVar("_C", bound=Type[CharmBase]) +_T = TypeVar("_T", bound=type) +_F = TypeVar("_F", bound=Type[Callable]) + + +class TracingError(RuntimeError): + """Base class for errors raised by this module.""" + + +class UntraceableObjectError(TracingError): + """Raised when an object you're attempting to instrument cannot be autoinstrumented.""" + + +def _get_tracing_endpoint(tracing_endpoint_getter, self, charm): + if isinstance(tracing_endpoint_getter, property): + tracing_endpoint = tracing_endpoint_getter.__get__(self) + else: # method or callable + tracing_endpoint = tracing_endpoint_getter(self) + + if tracing_endpoint is None: + logger.debug( + "Charm tracing is disabled. Tracing endpoint is not defined - " + "tracing is not available or relation is not set." + ) + return + elif not isinstance(tracing_endpoint, str): + raise TypeError( + f"{charm}.{tracing_endpoint_getter} should return a tempo endpoint (string); " + f"got {tracing_endpoint} instead." + ) + else: + logger.debug(f"Setting up span exporter to endpoint: {tracing_endpoint}/v1/traces") + return f"{tracing_endpoint}/v1/traces" + + +def _get_server_cert(server_cert_getter, self, charm): + if isinstance(server_cert_getter, property): + server_cert = server_cert_getter.__get__(self) + else: # method or callable + server_cert = server_cert_getter(self) + + if server_cert is None: + logger.warning( + f"{charm}.{server_cert_getter} returned None; sending traces over INSECURE connection." + ) + return + elif not Path(server_cert).is_absolute(): + raise ValueError( + f"{charm}.{server_cert_getter} should return a valid tls cert absolute path (string | Path)); " + f"got {server_cert} instead." + ) + return server_cert + + +def _setup_root_span_initializer( + charm: Type[CharmBase], + tracing_endpoint_getter: _GetterType, + server_cert_getter: Optional[_GetterType], + service_name: Optional[str] = None, +): + """Patch the charm's initializer.""" + original_init = charm.__init__ + + @functools.wraps(original_init) + def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): + original_init(self, framework, *args, **kwargs) + if not is_enabled(): + logger.info("Tracing DISABLED: skipping root span initialization") + return + + # already init some attrs that will be reinited later by calling original_init: + # self.framework = framework + # self.handle = Handle(None, self.handle_kind, None) + + original_event_context = framework._event_context + + _service_name = service_name or self.app.name + + resource = Resource.create( + attributes={ + "service.name": _service_name, + "compose_service": _service_name, + "charm_type": type(self).__name__, + # juju topology + "juju_unit": self.unit.name, + "juju_application": self.app.name, + "juju_model": self.model.name, + "juju_model_uuid": self.model.uuid, + } + ) + provider = TracerProvider(resource=resource) + tracing_endpoint = _get_tracing_endpoint(tracing_endpoint_getter, self, charm) + if not tracing_endpoint: + return + + server_cert: Optional[Union[str, Path]] = ( + _get_server_cert(server_cert_getter, self, charm) if server_cert_getter else None + ) + + exporter = OTLPSpanExporter( + endpoint=tracing_endpoint, + certificate_file=str(Path(server_cert).absolute()) if server_cert else None, + timeout=2, + ) + + processor = BatchSpanProcessor(exporter) + provider.add_span_processor(processor) + set_tracer_provider(provider) + _tracer = get_tracer(_service_name) # type: ignore + _tracer_token = tracer.set(_tracer) + + dispatch_path = os.getenv("JUJU_DISPATCH_PATH", "") + + # all these shenanigans are to work around the fact that the opentelemetry tracing API is built + # on the assumption that spans will be used as contextmanagers. + # Since we don't (as we need to close the span on framework.commit), + # we need to manually set the root span as current. + span = _tracer.start_span("charm exec", attributes={"juju.dispatch_path": dispatch_path}) + ctx = set_span_in_context(span) + + # log a trace id so we can look it up in tempo. + root_trace_id = hex(span.get_span_context().trace_id)[2:] # strip 0x prefix + logger.debug(f"Starting root trace with id={root_trace_id!r}.") + + span_token = opentelemetry.context.attach(ctx) # type: ignore + + @contextmanager + def wrap_event_context(event_name: str): + # when the framework enters an event context, we create a span. + with _span("event: " + event_name) as event_context_span: + if event_context_span: + # todo: figure out how to inject event attrs in here + event_context_span.add_event(event_name) + yield original_event_context(event_name) + + framework._event_context = wrap_event_context # type: ignore + + original_close = framework.close + + @functools.wraps(original_close) + def wrap_close(): + span.end() + opentelemetry.context.detach(span_token) # type: ignore + tracer.reset(_tracer_token) + tp = cast(TracerProvider, get_tracer_provider()) + tp.force_flush(timeout_millis=1000) # don't block for too long + tp.shutdown() + original_close() + + framework.close = wrap_close + return + + charm.__init__ = wrap_init + + +def trace_charm( + tracing_endpoint: str, + server_cert: Optional[str] = None, + service_name: Optional[str] = None, + extra_types: Sequence[type] = (), +): + """Autoinstrument the decorated charm with tracing telemetry. + + Use this function to get out-of-the-box traces for all events emitted on this charm and all + method calls on instances of this class. + + Usage: + >>> from charms.tempo_k8s.v1.charm_tracing import trace_charm + >>> from charms.tempo_k8s.v1.tracing import TracingEndpointProvider + >>> from ops import CharmBase + >>> + >>> @trace_charm( + >>> tracing_endpoint="tempo_otlp_http_endpoint", + >>> ) + >>> class MyCharm(CharmBase): + >>> + >>> def __init__(self, framework: Framework): + >>> ... + >>> self.tracing = TracingEndpointProvider(self) + >>> + >>> @property + >>> def tempo_otlp_http_endpoint(self) -> Optional[str]: + >>> if self.tracing.is_ready(): + >>> return self.tracing.otlp_http_endpoint() + >>> else: + >>> return None + >>> + :param server_cert: method or property on the charm type that returns an + optional absolute path to a tls certificate to be used when sending traces to a remote server. + If it returns None, an _insecure_ connection will be used. + :param tracing_endpoint: name of a property on the charm type that returns an + optional (fully resolvable) tempo url. If None, tracing will be effectively disabled. Else, traces will be + pushed to that endpoint. + :param service_name: service name tag to attach to all traces generated by this charm. + Defaults to the juju application name this charm is deployed under. + :param extra_types: pass any number of types that you also wish to autoinstrument. + For example, charm libs, relation endpoint wrappers, workload abstractions, ... + """ + + def _decorator(charm_type: Type[CharmBase]): + """Autoinstrument the wrapped charmbase type.""" + _autoinstrument( + charm_type, + tracing_endpoint_getter=getattr(charm_type, tracing_endpoint), + server_cert_getter=getattr(charm_type, server_cert) if server_cert else None, + service_name=service_name, + extra_types=extra_types, + ) + return charm_type + + return _decorator + + +def _autoinstrument( + charm_type: Type[CharmBase], + tracing_endpoint_getter: _GetterType, + server_cert_getter: Optional[_GetterType] = None, + service_name: Optional[str] = None, + extra_types: Sequence[type] = (), +) -> Type[CharmBase]: + """Set up tracing on this charm class. + + Use this function to get out-of-the-box traces for all events emitted on this charm and all + method calls on instances of this class. + + Usage: + + >>> from charms.tempo_k8s.v1.charm_tracing import _autoinstrument + >>> from ops.main import main + >>> _autoinstrument( + >>> MyCharm, + >>> tracing_endpoint_getter=MyCharm.tempo_otlp_http_endpoint, + >>> service_name="MyCharm", + >>> extra_types=(Foo, Bar) + >>> ) + >>> main(MyCharm) + + :param charm_type: the CharmBase subclass to autoinstrument. + :param server_cert_getter: method or property on the charm type that returns an + optional absolute path to a tls certificate to be used when sending traces to a remote server. + This needs to be a valid path to a certificate. + :param tracing_endpoint_getter: method or property on the charm type that returns an + optional tempo url. If None, tracing will be effectively disabled. Else, traces will be + pushed to that endpoint. + :param service_name: service name tag to attach to all traces generated by this charm. + Defaults to the juju application name this charm is deployed under. + :param extra_types: pass any number of types that you also wish to autoinstrument. + For example, charm libs, relation endpoint wrappers, workload abstractions, ... + """ + logger.info(f"instrumenting {charm_type}") + _setup_root_span_initializer( + charm_type, + tracing_endpoint_getter, + server_cert_getter=server_cert_getter, + service_name=service_name, + ) + trace_type(charm_type) + for type_ in extra_types: + trace_type(type_) + + return charm_type + + +def trace_type(cls: _T) -> _T: + """Set up tracing on this class. + + Use this decorator to get out-of-the-box traces for all method calls on instances of this class. + It assumes that this class is only instantiated after a charm type decorated with `@trace_charm` + has been instantiated. + """ + logger.info(f"instrumenting {cls}") + for name, method in inspect.getmembers(cls, predicate=inspect.isfunction): + logger.info(f"discovered {method}") + + if method.__name__.startswith("__"): + logger.info(f"skipping {method} (dunder)") + continue + + isstatic = isinstance(inspect.getattr_static(cls, method.__name__), staticmethod) + setattr(cls, name, trace_method(method, static=isstatic)) + + return cls + + +def trace_method(method: _F, static: bool = False) -> _F: + """Trace this method. + + A span will be opened when this method is called and closed when it returns. + """ + return _trace_callable(method, "method", static=static) + + +def trace_function(function: _F) -> _F: + """Trace this function. + + A span will be opened when this function is called and closed when it returns. + """ + return _trace_callable(function, "function") + + +def _trace_callable(callable: _F, qualifier: str, static: bool = False) -> _F: + logger.info(f"instrumenting {callable}") + + # sig = inspect.signature(callable) + @functools.wraps(callable) + def wrapped_function(*args, **kwargs): # type: ignore + name = getattr(callable, "__qualname__", getattr(callable, "__name__", str(callable))) + with _span(f"{'(static) ' if static else ''}{qualifier} call: {name}"): # type: ignore + if static: + return callable(*args[1:], **kwargs) # type: ignore + return callable(*args, **kwargs) # type: ignore + + # wrapped_function.__signature__ = sig + return wrapped_function # type: ignore + + +def trace(obj: Union[Type, Callable]): + """Trace this object and send the resulting spans to Tempo. + + It will dispatch to ``trace_type`` if the decorated object is a class, otherwise + ``trace_function``. + """ + if isinstance(obj, type): + if issubclass(obj, CharmBase): + raise ValueError( + "cannot use @trace on CharmBase subclasses: use @trace_charm instead " + "(we need some arguments!)" + ) + return trace_type(obj) + else: + try: + return trace_function(obj) + except Exception: + raise UntraceableObjectError( + f"cannot create span from {type(obj)}; instrument {obj} manually." + ) diff --git a/lib/charms/tempo_k8s/v2/tracing.py b/lib/charms/tempo_k8s/v2/tracing.py new file mode 100644 index 0000000..1660c97 --- /dev/null +++ b/lib/charms/tempo_k8s/v2/tracing.py @@ -0,0 +1,846 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +"""## Overview. + +This document explains how to integrate with the Tempo charm for the purpose of pushing traces to a +tracing endpoint provided by Tempo. It also explains how alternative implementations of the Tempo charm +may maintain the same interface and be backward compatible with all currently integrated charms. + +## Requirer Library Usage + +Charms seeking to push traces to Tempo, must do so using the `TracingEndpointRequirer` +object from this charm library. For the simplest use cases, using the `TracingEndpointRequirer` +object only requires instantiating it, typically in the constructor of your charm. The +`TracingEndpointRequirer` constructor requires the name of the relation over which a tracing endpoint + is exposed by the Tempo charm, and a list of protocols it intends to send traces with. + This relation must use the `tracing` interface. + The `TracingEndpointRequirer` object may be instantiated as follows + + from charms.tempo_k8s.v2.tracing import TracingEndpointRequirer + + def __init__(self, *args): + super().__init__(*args) + # ... + self.tracing = TracingEndpointRequirer(self, + protocols=['otlp_grpc', 'otlp_http', 'jaeger_http_thrift'] + ) + # ... + +Note that the first argument (`self`) to `TracingEndpointRequirer` is always a reference to the +parent charm. + +Alternatively to providing the list of requested protocols at init time, the charm can do it at +any point in time by calling the +`TracingEndpointRequirer.request_protocols(*protocol:str, relation:Optional[Relation])` method. +Using this method also allows you to use per-relation protocols. + +Units of provider charms obtain the tempo endpoint to which they will push their traces by calling +`TracingEndpointRequirer.get_endpoint(protocol: str)`, where `protocol` is, for example: +- `otlp_grpc` +- `otlp_http` +- `zipkin` +- `tempo` + +If the `protocol` is not in the list of protocols that the charm requested at endpoint set-up time, +the library will raise an error. + +## Requirer Library Usage + +The `TracingEndpointProvider` object may be used by charms to manage relations with their +trace sources. For this purposes a Tempo-like charm needs to do two things + +1. Instantiate the `TracingEndpointProvider` object by providing it a +reference to the parent (Tempo) charm and optionally the name of the relation that the Tempo charm +uses to interact with its trace sources. This relation must conform to the `tracing` interface +and it is strongly recommended that this relation be named `tracing` which is its +default value. + +For example a Tempo charm may instantiate the `TracingEndpointProvider` in its constructor as +follows + + from charms.tempo_k8s.v2.tracing import TracingEndpointProvider + + def __init__(self, *args): + super().__init__(*args) + # ... + self.tracing = TracingEndpointProvider(self) + # ... + + + +""" # noqa: W505 +import json +import logging +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Iterable, + List, + Literal, + MutableMapping, + Optional, + Sequence, + Tuple, + cast, +) + +import pydantic +from ops.charm import ( + CharmBase, + CharmEvents, + RelationBrokenEvent, + RelationEvent, + RelationRole, +) +from ops.framework import EventSource, Object +from ops.model import ModelError, Relation +from pydantic import BaseModel + +# The unique Charmhub library identifier, never change it +LIBID = "12977e9aa0b34367903d8afeb8c3d85d" + +# Increment this major API version when introducing breaking changes +LIBAPI = 2 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 1 + +PYDEPS = ["pydantic"] + +logger = logging.getLogger(__name__) + +DEFAULT_RELATION_NAME = "tracing" +RELATION_INTERFACE_NAME = "tracing" + +ReceiverProtocol = Literal[ + "zipkin", + "kafka", + "opencensus", + "tempo", # legacy, renamed to tempo_http + "tempo_http", + "tempo_grpc", + "otlp_grpc", + "otlp_http", + "jaeger_grpc", + "jaeger_thrift_compact", + "jaeger_thrift_http", + "jaeger_http_thrift", # legacy, renamed to jaeger_thrift_http + "jaeger_thrift_binary", +] + +RawReceiver = Tuple[ReceiverProtocol, int] +BUILTIN_JUJU_KEYS = {"ingress-address", "private-address", "egress-subnets"} + + +class TracingError(Exception): + """Base class for custom errors raised by this library.""" + + +class NotReadyError(TracingError): + """Raised by the provider wrapper if a requirer hasn't published the required data (yet).""" + + +class ProtocolNotRequestedError(TracingError): + """Raised if the user attempts to obtain an endpoint for a protocol it did not request.""" + + +class DataValidationError(TracingError): + """Raised when data validation fails on IPU relation data.""" + + +class AmbiguousRelationUsageError(TracingError): + """Raised when one wrongly assumes that there can only be one relation on an endpoint.""" + + +if int(pydantic.version.VERSION.split(".")[0]) < 2: + + class DatabagModel(BaseModel): # type: ignore + """Base databag model.""" + + class Config: + """Pydantic config.""" + + # ignore any extra fields in the databag + extra = "ignore" + """Ignore any extra fields in the databag.""" + allow_population_by_field_name = True + """Allow instantiating this class by field name (instead of forcing alias).""" + + _NEST_UNDER = None + + @classmethod + def load(cls, databag: MutableMapping): + """Load this model from a Juju databag.""" + if cls._NEST_UNDER: + return cls.parse_obj(json.loads(databag[cls._NEST_UNDER])) + + try: + data = { + k: json.loads(v) + for k, v in databag.items() + # Don't attempt to parse model-external values + if k in {f.alias for f in cls.__fields__.values()} + } + except json.JSONDecodeError as e: + msg = f"invalid databag contents: expecting json. {databag}" + logger.error(msg) + raise DataValidationError(msg) from e + + try: + return cls.parse_raw(json.dumps(data)) # type: ignore + except pydantic.ValidationError as e: + msg = f"failed to validate databag: {databag}" + logger.debug(msg, exc_info=True) + raise DataValidationError(msg) from e + + def dump(self, databag: Optional[MutableMapping] = None, clear: bool = True): + """Write the contents of this model to Juju databag. + + :param databag: the databag to write the data to. + :param clear: ensure the databag is cleared before writing it. + """ + if clear and databag: + databag.clear() + + if databag is None: + databag = {} + + if self._NEST_UNDER: + databag[self._NEST_UNDER] = self.json(by_alias=True) + return databag + + dct = self.dict() + for key, field in self.__fields__.items(): # type: ignore + value = dct[key] + databag[field.alias or key] = json.dumps(value) + + return databag + +else: + from pydantic import ConfigDict + + class DatabagModel(BaseModel): + """Base databag model.""" + + model_config = ConfigDict( + # ignore any extra fields in the databag + extra="ignore", + # Allow instantiating this class by field name (instead of forcing alias). + populate_by_name=True, + # Custom config key: whether to nest the whole datastructure (as json) + # under a field or spread it out at the toplevel. + _NEST_UNDER=None, # type: ignore + ) + """Pydantic config.""" + + @classmethod + def load(cls, databag: MutableMapping): + """Load this model from a Juju databag.""" + nest_under = cls.model_config.get("_NEST_UNDER") # type: ignore + if nest_under: + return cls.model_validate(json.loads(databag[nest_under])) # type: ignore + + try: + data = { + k: json.loads(v) + for k, v in databag.items() + # Don't attempt to parse model-external values + if k in {(f.alias or n) for n, f in cls.__fields__.items()} + } + except json.JSONDecodeError as e: + msg = f"invalid databag contents: expecting json. {databag}" + logger.error(msg) + raise DataValidationError(msg) from e + + try: + return cls.model_validate_json(json.dumps(data)) # type: ignore + except pydantic.ValidationError as e: + msg = f"failed to validate databag: {databag}" + logger.debug(msg, exc_info=True) + raise DataValidationError(msg) from e + + def dump(self, databag: Optional[MutableMapping] = None, clear: bool = True): + """Write the contents of this model to Juju databag. + + :param databag: the databag to write the data to. + :param clear: ensure the databag is cleared before writing it. + """ + if clear and databag: + databag.clear() + + if databag is None: + databag = {} + nest_under = self.model_config.get("_NEST_UNDER") + if nest_under: + databag[nest_under] = self.model_dump_json( # type: ignore + by_alias=True, + # skip keys whose values are default + exclude_defaults=True, + ) + return databag + + dct = self.model_dump() # type: ignore + for key, field in self.model_fields.items(): # type: ignore + value = dct[key] + if value == field.default: + continue + databag[field.alias or key] = json.dumps(value) + + return databag + + +# todo use models from charm-relation-interfaces +class Receiver(BaseModel): # noqa: D101 + """Receiver data structure.""" + + protocol: ReceiverProtocol + port: int + + +class TracingProviderAppData(DatabagModel): # noqa: D101 + """Application databag model for the tracing provider.""" + + host: str + """Server hostname.""" + + receivers: List[Receiver] + """Enabled receivers and ports at which they are listening.""" + + +class TracingRequirerAppData(DatabagModel): # noqa: D101 + """Application databag model for the tracing requirer.""" + + receivers: List[ReceiverProtocol] + """Requested receivers.""" + + +class _AutoSnapshotEvent(RelationEvent): + __args__: Tuple[str, ...] = () + __optional_kwargs__: Dict[str, Any] = {} + + @classmethod + def __attrs__(cls): + return cls.__args__ + tuple(cls.__optional_kwargs__.keys()) + + def __init__(self, handle, relation, *args, **kwargs): + super().__init__(handle, relation) + + if not len(self.__args__) == len(args): + raise TypeError("expected {} args, got {}".format(len(self.__args__), len(args))) + + for attr, obj in zip(self.__args__, args): + setattr(self, attr, obj) + for attr, default in self.__optional_kwargs__.items(): + obj = kwargs.get(attr, default) + setattr(self, attr, obj) + + def snapshot(self) -> dict: + dct = super().snapshot() + for attr in self.__attrs__(): + obj = getattr(self, attr) + try: + dct[attr] = obj + except ValueError as e: + raise ValueError( + "cannot automagically serialize {}: " + "override this method and do it " + "manually.".format(obj) + ) from e + + return dct + + def restore(self, snapshot: dict) -> None: + super().restore(snapshot) + for attr, obj in snapshot.items(): + setattr(self, attr, obj) + + +class RelationNotFoundError(Exception): + """Raised if no relation with the given name is found.""" + + def __init__(self, relation_name: str): + self.relation_name = relation_name + self.message = "No relation named '{}' found".format(relation_name) + super().__init__(self.message) + + +class RelationInterfaceMismatchError(Exception): + """Raised if the relation with the given name has an unexpected interface.""" + + def __init__( + self, + relation_name: str, + expected_relation_interface: str, + actual_relation_interface: str, + ): + self.relation_name = relation_name + self.expected_relation_interface = expected_relation_interface + self.actual_relation_interface = actual_relation_interface + self.message = ( + "The '{}' relation has '{}' as interface rather than the expected '{}'".format( + relation_name, actual_relation_interface, expected_relation_interface + ) + ) + + super().__init__(self.message) + + +class RelationRoleMismatchError(Exception): + """Raised if the relation with the given name has a different role than expected.""" + + def __init__( + self, + relation_name: str, + expected_relation_role: RelationRole, + actual_relation_role: RelationRole, + ): + self.relation_name = relation_name + self.expected_relation_interface = expected_relation_role + self.actual_relation_role = actual_relation_role + self.message = "The '{}' relation has role '{}' rather than the expected '{}'".format( + relation_name, repr(actual_relation_role), repr(expected_relation_role) + ) + + super().__init__(self.message) + + +def _validate_relation_by_interface_and_direction( + charm: CharmBase, + relation_name: str, + expected_relation_interface: str, + expected_relation_role: RelationRole, +): + """Validate a relation. + + Verifies that the `relation_name` provided: (1) exists in metadata.yaml, + (2) declares as interface the interface name passed as `relation_interface` + and (3) has the right "direction", i.e., it is a relation that `charm` + provides or requires. + + Args: + charm: a `CharmBase` object to scan for the matching relation. + relation_name: the name of the relation to be verified. + expected_relation_interface: the interface name to be matched by the + relation named `relation_name`. + expected_relation_role: whether the `relation_name` must be either + provided or required by `charm`. + + Raises: + RelationNotFoundError: If there is no relation in the charm's metadata.yaml + with the same name as provided via `relation_name` argument. + RelationInterfaceMismatchError: The relation with the same name as provided + via `relation_name` argument does not have the same relation interface + as specified via the `expected_relation_interface` argument. + RelationRoleMismatchError: If the relation with the same name as provided + via `relation_name` argument does not have the same role as specified + via the `expected_relation_role` argument. + """ + if relation_name not in charm.meta.relations: + raise RelationNotFoundError(relation_name) + + relation = charm.meta.relations[relation_name] + + # fixme: why do we need to cast here? + actual_relation_interface = cast(str, relation.interface_name) + + if actual_relation_interface != expected_relation_interface: + raise RelationInterfaceMismatchError( + relation_name, expected_relation_interface, actual_relation_interface + ) + + if expected_relation_role is RelationRole.provides: + if relation_name not in charm.meta.provides: + raise RelationRoleMismatchError( + relation_name, RelationRole.provides, RelationRole.requires + ) + elif expected_relation_role is RelationRole.requires: + if relation_name not in charm.meta.requires: + raise RelationRoleMismatchError( + relation_name, RelationRole.requires, RelationRole.provides + ) + else: + raise TypeError("Unexpected RelationDirection: {}".format(expected_relation_role)) + + +class RequestEvent(RelationEvent): + """Event emitted when a remote requests a tracing endpoint.""" + + @property + def requested_receivers(self) -> List[ReceiverProtocol]: + """List of receiver protocols that have been requested.""" + relation = self.relation + app = relation.app + if not app: + raise NotReadyError("relation.app is None") + + return TracingRequirerAppData.load(relation.data[app]).receivers + + +class TracingEndpointProviderEvents(CharmEvents): + """TracingEndpointProvider events.""" + + request = EventSource(RequestEvent) + + +class TracingEndpointProvider(Object): + """Class representing a trace receiver service.""" + + on = TracingEndpointProviderEvents() # type: ignore + + def __init__( + self, + charm: CharmBase, + host: str, + relation_name: str = DEFAULT_RELATION_NAME, + ): + """Initialize. + + Args: + charm: a `CharmBase` instance that manages this instance of the Tempo service. + host: address of the node hosting the tempo server. + relation_name: an optional string name of the relation between `charm` + and the Tempo charmed service. The default is "tracing". + + Raises: + RelationNotFoundError: If there is no relation in the charm's metadata.yaml + with the same name as provided via `relation_name` argument. + RelationInterfaceMismatchError: The relation with the same name as provided + via `relation_name` argument does not have the `tracing` relation + interface. + RelationRoleMismatchError: If the relation with the same name as provided + via `relation_name` argument does not have the `RelationRole.requires` + role. + """ + _validate_relation_by_interface_and_direction( + charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.provides + ) + + super().__init__(charm, relation_name + "tracing-provider-v2") + self._charm = charm + self._host = host + self._relation_name = relation_name + self.framework.observe( + self._charm.on[relation_name].relation_joined, self._on_relation_event + ) + self.framework.observe( + self._charm.on[relation_name].relation_created, self._on_relation_event + ) + self.framework.observe( + self._charm.on[relation_name].relation_changed, self._on_relation_event + ) + + def _on_relation_event(self, e: RelationEvent): + """Handle relation created/joined/changed events.""" + if self.is_v2(e.relation): + self.on.request.emit(e.relation) + + def is_v2(self, relation: Relation): + """Attempt to determine if this relation is a tracing v2 relation. + + Assumes that the V2 requirer will, as soon as possible (relation-created), + publish the list of requested ingestion receivers (can be empty too). + """ + try: + self._get_requested_protocols(relation) + except NotReadyError: + return False + return True + + @staticmethod + def _get_requested_protocols(relation: Relation): + app = relation.app + if not app: + raise NotReadyError("relation.app is None") + + try: + databag = TracingRequirerAppData.load(relation.data[app]) + except (json.JSONDecodeError, pydantic.ValidationError, DataValidationError): + logger.info(f"relation {relation} is not ready to talk tracing v2") + raise NotReadyError() + return databag.receivers + + def requested_protocols(self): + """All receiver protocols that have been requested by our related apps.""" + requested_protocols = set() + for relation in self.relations: + try: + protocols = self._get_requested_protocols(relation) + except NotReadyError: + continue + requested_protocols.update(protocols) + return requested_protocols + + @property + def relations(self) -> List[Relation]: + """All v2 relations active on this endpoint.""" + return [r for r in self._charm.model.relations[self._relation_name] if self.is_v2(r)] + + def publish_receivers(self, receivers: Sequence[RawReceiver]): + """Let all requirers know that these receivers are active and listening.""" + if not self._charm.unit.is_leader(): + raise RuntimeError("only leader can do this") + + for relation in self.relations: + try: + TracingProviderAppData( + host=self._host, + receivers=[ + Receiver(port=port, protocol=protocol) for protocol, port in receivers + ], + ).dump(relation.data[self._charm.app]) + + except ModelError as e: + # args are bytes + msg = e.args[0] + if isinstance(msg, bytes): + if msg.startswith( + b"ERROR cannot read relation application settings: permission denied" + ): + logger.error( + f"encountered error {e} while attempting to update_relation_data." + f"The relation must be gone." + ) + continue + raise + + +class EndpointRemovedEvent(RelationBrokenEvent): + """Event representing a change in one of the receiver endpoints.""" + + +class EndpointChangedEvent(_AutoSnapshotEvent): + """Event representing a change in one of the receiver endpoints.""" + + __args__ = ("host", "_ingesters") + + if TYPE_CHECKING: + host = "" # type: str + _ingesters = [] # type: List[dict] + + @property + def receivers(self) -> List[Receiver]: + """Cast receivers back from dict.""" + return [Receiver(**i) for i in self._ingesters] + + +class TracingEndpointRequirerEvents(CharmEvents): + """TracingEndpointRequirer events.""" + + endpoint_changed = EventSource(EndpointChangedEvent) + endpoint_removed = EventSource(EndpointRemovedEvent) + + +class TracingEndpointRequirer(Object): + """A tracing endpoint for Tempo.""" + + on = TracingEndpointRequirerEvents() # type: ignore + + def __init__( + self, + charm: CharmBase, + relation_name: str = DEFAULT_RELATION_NAME, + protocols: Optional[List[ReceiverProtocol]] = None, + ): + """Construct a tracing requirer for a Tempo charm. + + If your application supports pushing traces to a distributed tracing backend, the + `TracingEndpointRequirer` object enables your charm to easily access endpoint information + exchanged over a `tracing` relation interface. + + Args: + charm: a `CharmBase` object that manages this + `TracingEndpointRequirer` object. Typically, this is `self` in the instantiating + class. + relation_name: an optional string name of the relation between `charm` + and the Tempo charmed service. The default is "tracing". It is strongly + advised not to change the default, so that people deploying your charm will have a + consistent experience with all other charms that provide tracing endpoints. + protocols: optional list of protocols that the charm intends to send traces with. + The provider will enable receivers for these and only these protocols, + so be sure to enable all protocols the charm or its workload are going to need. + + Raises: + RelationNotFoundError: If there is no relation in the charm's metadata.yaml + with the same name as provided via `relation_name` argument. + RelationInterfaceMismatchError: The relation with the same name as provided + via `relation_name` argument does not have the `tracing` relation + interface. + RelationRoleMismatchError: If the relation with the same name as provided + via `relation_name` argument does not have the `RelationRole.provides` + role. + """ + _validate_relation_by_interface_and_direction( + charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.requires + ) + + super().__init__(charm, relation_name) + + self._is_single_endpoint = charm.meta.relations[relation_name].limit == 1 + + self._charm = charm + self._relation_name = relation_name + + events = self._charm.on[self._relation_name] + self.framework.observe(events.relation_changed, self._on_tracing_relation_changed) + self.framework.observe(events.relation_broken, self._on_tracing_relation_broken) + + if protocols: + self.request_protocols(protocols) + + def request_protocols( + self, protocols: Sequence[ReceiverProtocol], relation: Optional[Relation] = None + ): + """Publish the list of protocols which the provider should activate.""" + # todo: should we check if _is_single_endpoint and len(self.relations) > 1 and raise, here? + relations = [relation] if relation else self.relations + + if not protocols: + # empty sequence + raise ValueError( + "You need to pass a nonempty sequence of protocols to `request_protocols`." + ) + + try: + if self._charm.unit.is_leader(): + for relation in relations: + TracingRequirerAppData( + receivers=list(protocols), + ).dump(relation.data[self._charm.app]) + + except ModelError as e: + # args are bytes + msg = e.args[0] + if isinstance(msg, bytes): + if msg.startswith( + b"ERROR cannot read relation application settings: permission denied" + ): + logger.error( + f"encountered error {e} while attempting to request_protocols." + f"The relation must be gone." + ) + return + raise + + @property + def relations(self) -> List[Relation]: + """The tracing relations associated with this endpoint.""" + return self._charm.model.relations[self._relation_name] + + @property + def _relation(self) -> Optional[Relation]: + """If this wraps a single endpoint, the relation bound to it, if any.""" + if not self._is_single_endpoint: + objname = type(self).__name__ + raise AmbiguousRelationUsageError( + f"This {objname} wraps a {self._relation_name} endpoint that has " + "limit != 1. We can't determine what relation, of the possibly many, you are " + f"talking about. Please pass a relation instance while calling {objname}, " + "or set limit=1 in the charm metadata." + ) + relations = self.relations + return relations[0] if relations else None + + def is_ready(self, relation: Optional[Relation] = None): + """Is this endpoint ready?""" + relation = relation or self._relation + if not relation: + logger.debug(f"no relation on {self._relation_name !r}: tracing not ready") + return False + if relation.data is None: + logger.error(f"relation data is None for {relation}") + return False + if not relation.app: + logger.error(f"{relation} event received but there is no relation.app") + return False + try: + databag = dict(relation.data[relation.app]) + # "ingesters" Might be populated if the provider sees a v1 relation before a v2 requirer has had time to + # publish the 'receivers' list. This will make Tempo incorrectly assume that this is a v1 + # relation, and act accordingly. Later, when the requirer publishes the requested receivers, + # tempo will be able to course-correct. + if "ingesters" in databag: + del databag["ingesters"] + TracingProviderAppData.load(databag) + + except (json.JSONDecodeError, pydantic.ValidationError, DataValidationError): + logger.info(f"failed validating relation data for {relation}") + return False + return True + + def _on_tracing_relation_changed(self, event): + """Notify the providers that there is new endpoint information available.""" + relation = event.relation + if not self.is_ready(relation): + self.on.endpoint_removed.emit(relation) # type: ignore + return + + data = TracingProviderAppData.load(relation.data[relation.app]) + self.on.endpoint_changed.emit(relation, data.host, [i.dict() for i in data.receivers]) # type: ignore + + def _on_tracing_relation_broken(self, event: RelationBrokenEvent): + """Notify the providers that the endpoint is broken.""" + relation = event.relation + self.on.endpoint_removed.emit(relation) # type: ignore + + def get_all_endpoints( + self, relation: Optional[Relation] = None + ) -> Optional[TracingProviderAppData]: + """Unmarshalled relation data.""" + if not self.is_ready(relation or self._relation): + return + return TracingProviderAppData.load(relation.data[relation.app]) # type: ignore + + def _get_endpoint( + self, relation: Optional[Relation], protocol: ReceiverProtocol, ssl: bool = False + ): + ep = self.get_all_endpoints(relation) + if not ep: + return None + try: + receiver: Receiver = next(filter(lambda i: i.protocol == protocol, ep.receivers)) + if receiver.protocol in ["otlp_grpc", "jaeger_grpc"]: + if ssl: + logger.warning("unused ssl argument - was the right protocol called?") + return f"{ep.host}:{receiver.port}" + if ssl: + return f"https://{ep.host}:{receiver.port}" + return f"http://{ep.host}:{receiver.port}" + except StopIteration: + logger.error(f"no receiver found with protocol={protocol!r}") + return None + + def get_endpoint( + self, protocol: ReceiverProtocol, relation: Optional[Relation] = None + ) -> Optional[str]: + """Receiver endpoint for the given protocol.""" + endpoint = self._get_endpoint(relation or self._relation, protocol=protocol) + if not endpoint: + requested_protocols = set() + for relation in self.relations: + databag = TracingRequirerAppData.load(relation.data[self._charm.app]) + requested_protocols.update(databag.receivers) + + if protocol not in requested_protocols: + raise ProtocolNotRequestedError(protocol, relation) + + return None + return endpoint + + # for backwards compatibility with earlier revisions: + def otlp_grpc_endpoint(self): + """Use TracingEndpointRequirer.get_endpoint('otlp_grpc') instead.""" + logger.warning( + "`TracingEndpointRequirer.otlp_grpc_endpoint` is deprecated. " + "Use `TracingEndpointRequirer.get_endpoint('otlp_grpc') instead.`" + ) + return self.get_endpoint("otlp_grpc") + + def otlp_http_endpoint(self): + """Use TracingEndpointRequirer.get_endpoint('otlp_http') instead.""" + logger.warning( + "`TracingEndpointRequirer.otlp_http_endpoint` is deprecated. " + "Use `TracingEndpointRequirer.get_endpoint('otlp_http') instead.`" + ) + return self.get_endpoint("otlp_http") diff --git a/metadata.yaml b/metadata.yaml index 2e5bda5..d12e0fe 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -59,6 +59,9 @@ requires: interface: certificate_transfer description: | Obtain TLS information (certificate, ca, chain) from another charm. + tracing: + interface: tracing + limit: 1 provides: logging-provider: diff --git a/requirements.txt b/requirements.txt index 9b2304d..2352ec4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,3 +13,5 @@ lightkube lightkube-models cryptography jsonschema < 4 # Pin prevents the machine charm error "ModuleNotFoundError: No module named 'rpds.rpds'" +# Deps: tracing +opentelemetry-exporter-otlp-proto-http>=1.21.0 \ No newline at end of file diff --git a/src/charm.py b/src/charm.py index 9f14dfb..b7aa854 100755 --- a/src/charm.py +++ b/src/charm.py @@ -7,7 +7,7 @@ import json import logging import pathlib -from typing import Any, Dict, List, Union +from typing import Any, Dict, List, Optional, Union import yaml from charms.loki_k8s.v1.loki_push_api import LokiPushApiProvider @@ -16,6 +16,8 @@ ServicePort, ) from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointConsumer +from charms.tempo_k8s.v1.charm_tracing import trace_charm +from charms.tempo_k8s.v2.tracing import TracingEndpointRequirer from cosl import GrafanaDashboard from grafana_agent import CONFIG_PATH, GrafanaAgentCharm from ops.main import main @@ -26,6 +28,17 @@ SCRAPE_RELATION_NAME = "metrics-endpoint" +@trace_charm( + tracing_endpoint="tracing_endpoint", + server_cert="server_cert_path", + extra_types=( + GrafanaAgentCharm, + LokiPushApiProvider, + KubernetesServicePatch, + MetricsEndpointConsumer, + GrafanaDashboard, + ), +) class GrafanaAgentK8sCharm(GrafanaAgentCharm): """K8s version of the Grafana Agent charm.""" @@ -70,6 +83,7 @@ def __init__(self, *args): self._on_loki_push_api_alert_rules_changed, ) + self._tracing = TracingEndpointRequirer(self, protocols=["otlp_http"]) self.framework.observe( self.on["grafana-dashboards-consumer"].relation_changed, self._on_dashboards_changed, @@ -237,6 +251,18 @@ def run(self, cmd: List[str]): """ self._container.exec(cmd) + @property + def tracing_endpoint(self) -> Optional[str]: + """Otlp http endpoint for charm instrumentation.""" + if self._tracing.is_ready(): + return self._tracing.get_endpoint("otlp_http") + return None + + @property + def server_cert_path(self) -> Optional[str]: + """Server certificate path for tls tracing.""" + return self._cert_path + if __name__ == "__main__": main(GrafanaAgentK8sCharm)