From 1602e0648c0cf13355f8511ee388e082083ed166 Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Tue, 6 Aug 2024 11:11:14 +0200 Subject: [PATCH] some scenarios --- .../v0/cloud_config_requirer.py | 16 +- src/charm.py | 3 - src/grafana_agent.py | 70 ++++---- tests/scenario/test_tracing_integration.py | 162 ++++++++++++++++++ 4 files changed, 207 insertions(+), 44 deletions(-) create mode 100644 tests/scenario/test_tracing_integration.py diff --git a/lib/charms/grafana_cloud_integrator/v0/cloud_config_requirer.py b/lib/charms/grafana_cloud_integrator/v0/cloud_config_requirer.py index 6e01c26..7228a40 100644 --- a/lib/charms/grafana_cloud_integrator/v0/cloud_config_requirer.py +++ b/lib/charms/grafana_cloud_integrator/v0/cloud_config_requirer.py @@ -6,7 +6,7 @@ LIBID = "e6f580481c1b4388aa4d2cdf412a47fa" LIBAPI = 0 -LIBPATCH = 4 +LIBPATCH = 5 DEFAULT_RELATION_NAME = "grafana-cloud-config" @@ -121,12 +121,16 @@ def loki_endpoint(self) -> dict: def prometheus_ready(self): return self._is_not_empty(self.prometheus_url) + @property + def tempo_ready(self): + return self._is_not_empty(self.tempo_url) + @property def prometheus_endpoint(self) -> dict: """Return the prometheus endpoint dict.""" if not self.prometheus_ready: return {} - + endpoint = {} endpoint["url"] = self.prometheus_url if self.credentials: @@ -134,11 +138,15 @@ def prometheus_endpoint(self) -> dict: return endpoint @property - def loki_url(self): + def loki_url(self) -> str: return self._data.get("loki_url", "") @property - def prometheus_url(self): + def tempo_url(self) -> str: + return self._data.get("tempo_url", "") + + @property + def prometheus_url(self) -> str: return self._data.get("prometheus_url", "") @property diff --git a/src/charm.py b/src/charm.py index be2a31f..8adc571 100755 --- a/src/charm.py +++ b/src/charm.py @@ -17,7 +17,6 @@ from charms.loki_k8s.v1.loki_push_api import LokiPushApiProvider from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointConsumer from charms.tempo_k8s.v1.charm_tracing import trace_charm -from charms.tempo_k8s.v2.tracing import TracingEndpointRequirer from grafana_agent import CONFIG_PATH, GrafanaAgentCharm logger = logging.getLogger(__name__) @@ -63,8 +62,6 @@ def __init__(self, *args): self._loki_provider = LokiPushApiProvider( self, relation_name="logging-provider", port=self._http_listen_port ) - self._tracing = TracingEndpointRequirer(self, protocols=["otlp_http"]) - self.framework.observe( self._loki_provider.on.loki_push_api_alert_rules_changed, # pyright: ignore self._on_loki_push_api_alert_rules_changed, diff --git a/src/grafana_agent.py b/src/grafana_agent.py index 6c62f87..596389e 100644 --- a/src/grafana_agent.py +++ b/src/grafana_agent.py @@ -43,7 +43,7 @@ from requests.packages.urllib3.util import Retry # type: ignore from yaml.parser import ParserError -from charms.tempo_k8s.v2.tracing import charm_tracing_config, TracingEndpointRequirer, TracingEndpointProvider, Receiver +from charms.tempo_k8s.v2.tracing import charm_tracing_config, TracingEndpointRequirer, TracingEndpointProvider, ReceiverProtocol, receiver_protocol_to_transport_protocol, TransportProtocolType logger = logging.getLogger(__name__) @@ -75,13 +75,6 @@ class CompoundStatus: update_config: Optional[Union[BlockedStatus, WaitingStatus]] = None validation_error: Optional[BlockedStatus] = None -class TransportProtocolType(str, enum.Enum): - """Receiver Type.""" - - http = "http" - grpc = "grpc" - -ReceiverProtocol = Literal["otlp_grpc", "otlp_http", "zipkin", "jaeger_thrift_http", "jaeger_grpc"] class GrafanaAgentCharm(CharmBase): """Grafana Agent Charm.""" @@ -95,16 +88,6 @@ class GrafanaAgentCharm(CharmBase): _ca_path = "/usr/local/share/ca-certificates/grafana-agent-operator.crt" _ca_folder_path = "/usr/local/share/ca-certificates" - receiver_protocol_to_transport_protocol: Dict[ReceiverProtocol, TransportProtocolType] = { - "zipkin": TransportProtocolType.http, - "kafka": TransportProtocolType.http, - "tempo_http": TransportProtocolType.http, - "tempo_grpc": TransportProtocolType.grpc, - "otlp_grpc": TransportProtocolType.grpc, - "otlp_http": TransportProtocolType.http, - "jaeger_thrift_http": TransportProtocolType.http, - } - # mapping from tempo-supported receivers to the receiver ports to be opened on the grafana-agent host # to ingest traces for them. Note that we 'support' more receivers here than tempo currently does, so # that if in the future we decide to enable the receivers in tempo, we don't need to make changes @@ -211,6 +194,14 @@ def __init__(self, *args): self._tracing.on.endpoint_removed, # pyright: ignore self._on_tracing_endpoint_removed, ) + self.framework.observe( + self._tracing_provider.on.request, # pyright: ignore + self._on_tracing_provider_request, + ) + self.framework.observe( + self._tracing_provider.on.broken, # pyright: ignore + self._on_tracing_provider_broken, + ) self.framework.observe(self.cert.on.cert_changed, self._on_cert_changed) # pyright: ignore self.framework.observe( @@ -262,7 +253,7 @@ def __init__(self, *args): self.framework.observe(self.on[outgoing].relation_joined, self._update_status) self.framework.observe(self.on[outgoing].relation_broken, self._update_status) - def _get_tracing_receiver_url(self, protocol: str): + def _get_tracing_receiver_url(self, protocol: ReceiverProtocol): scheme = "http" try: if self._charm.cert.available: # type: ignore @@ -271,7 +262,7 @@ def _get_tracing_receiver_url(self, protocol: str): pass # assume we're doing this in-model, since this charm doesn't have ingress - if self.receiver_protocol_to_transport_protocol[protocol] == TransportProtocolType.grpc: + if receiver_protocol_to_transport_protocol[protocol] == TransportProtocolType.grpc: return f"{socket.getfqdn()}:{self._tracing_receivers_ports[protocol]}" return f"{scheme}://{socket.getfqdn()}:{self._tracing_receivers_ports[protocol]}" @@ -305,11 +296,25 @@ def _on_tracing_endpoint_changed(self, _event) -> None: """Event handler for the tracing endpoint-changed event.""" self._update_config() self._update_status() + self._update_tracing_provider() def _on_tracing_endpoint_removed(self, _event) -> None: """Event handler for the tracing endpoint-removed event.""" self._update_config() self._update_status() + self._update_tracing_provider() + + def _on_tracing_provider_request(self, _event) -> None: + """Event handler for the tracing-provider request event.""" + self._update_config() + self._update_status() + self._update_tracing_provider() + + def _on_tracing_provider_broken(self, _event) -> None: + """Event handler for the tracing-provider broken event.""" + self._update_config() + self._update_status() + self._update_tracing_provider() def _on_upgrade_charm(self, _event=None): """Refresh alerts if the charm is updated.""" @@ -317,6 +322,7 @@ def _on_upgrade_charm(self, _event=None): self._update_loki_alerts() self._update_config() self._update_status() + self._update_tracing_provider() def _on_loki_push_api_endpoint_joined(self, _event=None): """Rebuild the config with correct Loki sinks.""" @@ -332,14 +338,17 @@ def _on_config_changed(self, _event=None): """Rebuild the config.""" self._update_config() self._update_status() + self._update_tracing_provider() def _on_cloud_config_available(self, _) -> None: logger.info("cloud config available") self._update_config() + self._update_tracing_provider() def _on_cloud_config_revoked(self, _) -> None: logger.info("cloud config revoked") self._update_config() + self._update_tracing_provider() def _on_cert_transfer_available(self, event: CertificateTransferAvailableEvent): cert_filename = ( @@ -528,7 +537,7 @@ def _update_status(self, *_): """Determine the charm status based on relation health and grafana-agent service readiness. This is a centralized status setter. Status should only be calculated here, or, if you need - to temporarily change the status (e.g. during snap install), always call this method after + to temporarily change the status (e.g. during install), always call this method after so the status is re-calculated (exceptions: on_install, on_remove). TODO: Rework this when "compound status" is implemented https://github.com/canonical/operator/issues/665 @@ -595,13 +604,6 @@ def _update_config(self) -> None: self.write_file(self._key_path, self.cert.private_key) self.write_file(self._ca_path, self.cert.ca_cert) - if os.path.exists(self._snap_folder_path): - # copy cert files within the snap context as well until snap is able to connect to certificate files. - # ref: https://github.com/canonical/grafana-agent-snap/issues/71 - self.write_file(self._snap_cert_path, self.cert.server_cert) - self.write_file(self._snap_key_path, self.cert.private_key) - self.write_file(self._snap_ca_path, self.cert.ca_cert) - # push CA certificate to charm container ca_cert_path = Path(self._ca_path) ca_cert_path.parent.mkdir(exist_ok=True, parents=True) @@ -612,9 +614,6 @@ def _update_config(self) -> None: self._delete_file_if_exists(self._cert_path) self._delete_file_if_exists(self._key_path) self._delete_file_if_exists(self._ca_path) - self._delete_file_if_exists(self._snap_cert_path) - self._delete_file_if_exists(self._snap_key_path) - self._delete_file_if_exists(self._snap_ca_path) # charm container CA cert Path(self._ca_path).unlink(missing_ok=True) @@ -855,9 +854,6 @@ def _tracing_receivers(self) -> Dict[str, Union[Any, List[Any]]]: Returns: a dict with the receivers config. """ - if not self._tracing.is_ready(): - return {} - receivers_set = self._requested_tracing_protocols if not receivers_set: @@ -867,9 +863,9 @@ def _tracing_receivers(self) -> Dict[str, Union[Any, List[Any]]]: if self.cert.enabled: base_receiver_config: Dict[str, Union[str, Dict]] = { "tls": { - "ca_file": str(self._snap_ca_path), - "cert_file": str(self._snap_cert_path), - "key_file": str(self._snap_key_path), + "ca_file": str(self._ca_path), + "cert_file": str(self._cert_path), + "key_file": str(self._key_path), "min_version": "", } } diff --git a/tests/scenario/test_tracing_integration.py b/tests/scenario/test_tracing_integration.py new file mode 100644 index 0000000..6c66de4 --- /dev/null +++ b/tests/scenario/test_tracing_integration.py @@ -0,0 +1,162 @@ +from unittest.mock import patch + +import pytest +import scenario +import yaml +from ops import pebble + +from charm import GrafanaAgentK8sCharm +from charms.tempo_k8s.v1.charm_tracing import charm_tracing_disabled +from charms.tempo_k8s.v2.tracing import TracingProviderAppData, Receiver, TracingRequirerAppData +from grafana_agent import CONFIG_PATH + + +@pytest.fixture +def ctx(vroot): + with charm_tracing_disabled(): + with patch("socket.getfqdn", new=lambda: "localhost"): + yield scenario.Context(GrafanaAgentK8sCharm, charm_root=vroot) + + +@pytest.fixture +def base_state(): + yield scenario.State( + leader=True, + containers=[ + scenario.Container( + "agent", can_connect=True, + # set it to inactive so we can detect when an event has caused it to start + service_status={"agent": pebble.ServiceStatus.INACTIVE} + ) + ] + ) + + +def test_tracing_relation(ctx, base_state): + # GIVEN a tracing relation over the tracing-provider endpoint + tracing = scenario.Relation("tracing-provider", + remote_app_data=TracingRequirerAppData(receivers=["otlp_http", "otlp_grpc"]).dump() + ) + + # TracingProviderAppData( + # receivers=[Receiver(protocol={"name": "otlp_grpc", "type": "grpc"}, + # url="http:foo.com:1111")]).dump()) + + state = base_state.replace( + relations=[ + tracing + ] + ) + # WHEN we process any setup event for the relation + state_out = ctx.run(tracing.changed_event, state) + + agent = state_out.get_container('agent') + + # THEN the agent has started + assert agent.services['agent'].is_running() + # AND the grafana agent config has a traces config section + fs = agent.get_filesystem(ctx) + gagent_config = fs.joinpath(*CONFIG_PATH.strip('/').split('/')) + assert gagent_config.exists() + yml = yaml.safe_load(gagent_config.read_text()) + assert yml["traces"] + + +def test_tracing_relations_in_and_out(ctx, base_state): + # GIVEN a tracing relation over the tracing-provider endpoint and one over tracing + tracing_provider = scenario.Relation("tracing-provider", + remote_app_data=TracingRequirerAppData( + receivers=["otlp_http", "otlp_grpc"]).dump() + ) + tracing = scenario.Relation("tracing", + remote_app_data=TracingProviderAppData( + receivers=[Receiver(protocol={"name": "otlp_grpc", "type": "grpc"}, + url="http:foo.com:1111")]).dump() + ) + + state = base_state.replace( + relations=[ + tracing, tracing_provider + ] + ) + # WHEN we process any setup event for the relation + state_out = ctx.run(tracing.changed_event, state) + + agent = state_out.get_container('agent') + + # THEN the agent has started + assert agent.services['agent'].is_running() + # AND the grafana agent config has a traces config section + fs = agent.get_filesystem(ctx) + gagent_config = fs.joinpath(*CONFIG_PATH.strip('/').split('/')) + assert gagent_config.exists() + yml = yaml.safe_load(gagent_config.read_text()) + assert yml["traces"] + + +def test_tracing_relation_passthrough(ctx, base_state): + # GIVEN a tracing relation over the tracing-provider endpoint and one over tracing + tracing_provider = scenario.Relation("tracing-provider", + remote_app_data=TracingRequirerAppData( + receivers=["otlp_http", "otlp_grpc"]).dump() + ) + tracing = scenario.Relation("tracing", + remote_app_data=TracingProviderAppData( + receivers=[Receiver(protocol={"name": "otlp_grpc", "type": "grpc"}, + url="http:foo.com:1111")]).dump() + ) + + state = base_state.replace( + relations=[ + tracing, tracing_provider + ] + ) + # WHEN we process any setup event for the relation + state_out = ctx.run(tracing.changed_event, state) + + # THEN we act as a tracing provider for 'tracing-provider', and as requirer for 'tracing' + tracing_out = TracingRequirerAppData.load(state_out.get_relations("tracing")[0].local_app_data) + tracing_provider_out = TracingProviderAppData.load(state_out.get_relations("tracing-provider")[0].local_app_data) + assert set(tracing_out.receivers) == {'otlp_grpc', 'otlp_http'} + otlp_grpc_provider_def = [r for r in tracing_provider_out.receivers if r.protocol.name == 'otlp_grpc'][0] + otlp_http_provider_def = [r for r in tracing_provider_out.receivers if r.protocol.name == 'otlp_http'][0] + assert otlp_grpc_provider_def.url == "localhost:4317" + assert otlp_http_provider_def.url == "http://localhost:4318" + + +@pytest.mark.parametrize("force_enable", ( + ["zipkin", "jaeger_thrift_http", "jaeger_grpc"], + ["zipkin", "jaeger_thrift_http"], + ["jaeger_thrift_http"], +)) +def test_tracing_relation_passthrough_with_force_enable(ctx, base_state, force_enable): + # GIVEN a tracing relation over the tracing-provider endpoint and one over tracing + tracing_provider = scenario.Relation("tracing-provider", + remote_app_data=TracingRequirerAppData( + receivers=["otlp_http", "otlp_grpc"]).dump() + ) + tracing = scenario.Relation("tracing", + remote_app_data=TracingProviderAppData( + receivers=[Receiver(protocol={"name": "otlp_grpc", "type": "grpc"}, + url="http:foo.com:1111")]).dump() + ) + + # AND given we're configured to always enable some protocols + state = base_state.replace( + config = {f'always_enable_{proto}': True for proto in force_enable}, + relations=[ + tracing, tracing_provider + ] + ) + # WHEN we process any setup event for the relation + state_out = ctx.run(tracing.changed_event, state) + + # THEN we act as a tracing provider for 'tracing-provider', and as requirer for 'tracing' + tracing_out = TracingRequirerAppData.load(state_out.get_relations("tracing")[0].local_app_data) + tracing_provider_out = TracingProviderAppData.load(state_out.get_relations("tracing-provider")[0].local_app_data) + + # we still only request otlp grpc and http for charm traces and because gagent funnels all to grpc + assert set(tracing_out.receivers) == {'otlp_grpc', 'otlp_http'} + # but we provide all + providing_protocols = set(r.protocol.name for r in tracing_provider_out.receivers) + assert providing_protocols == {'otlp_grpc', 'otlp_http'}.union(force_enable) \ No newline at end of file