Skip to content

Commit

Permalink
cleanup status management
Browse files Browse the repository at this point in the history
  • Loading branch information
PietroPasotti committed Aug 9, 2024
1 parent 657ecd0 commit 41b3272
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 165 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,8 @@ __pycache__/
tests/integration/*-tester/lib/
.env
cos-tool*

*.egg-info/
loki_alert_rules/
prometheus_alert_rules/
grafana_dashboards/
1 change: 0 additions & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ def _on_agent_pebble_ready(self, _event) -> None:
logger.debug(
"Cannot set workload version at this time: could not get grafana-agent version."
)
self._update_status()

def metrics_rules(self) -> Dict[str, Any]:
"""Return a list of metrics rules."""
Expand Down
153 changes: 59 additions & 94 deletions src/grafana_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
import socket
import subprocess
from collections import namedtuple
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set, Union, get_args

import ops
import yaml
from charms.certificate_transfer_interface.v0.certificate_transfer import (
CertificateAvailableEvent as CertificateTransferAvailableEvent,
Expand Down Expand Up @@ -72,15 +72,6 @@ def __init__(self, message="could not reload configuration"):
super().__init__(self.message)


@dataclass
class CompoundStatus:
"""'Dumb struct' for helping with centralized status setting."""

# None = good; do not use ActiveStatus here.
update_config: Optional[Union[BlockedStatus, WaitingStatus]] = None
validation_error: Optional[BlockedStatus] = None


class GrafanaAgentCharm(CharmBase):
"""Grafana Agent Charm."""

Expand Down Expand Up @@ -124,9 +115,6 @@ def __new__(cls, *args: Any, **kwargs: Dict[Any, Any]):
def __init__(self, *args):
super().__init__(*args)

# Property to facilitate centralized status update
self.status = CompoundStatus()

charm_root = self.charm_dir.absolute()
self.loki_rules_paths = RulesMapping(
# TODO how to inject topology only for this charm's own rules?
Expand Down Expand Up @@ -188,6 +176,11 @@ def __init__(self, *args):

self._cloud = GrafanaCloudConfigRequirer(self)

# allows any event handler to set a status without the collect-unit-status handler having
# to reach into nested components to find out what the status is.
# downside is, this status will effectively only last for a single event.
self.push_status = None

self.framework.observe(
self._tracing.on.endpoint_changed, # pyright: ignore
self._on_tracing_endpoint_changed,
Expand Down Expand Up @@ -245,15 +238,7 @@ def __init__(self, *args):
self.cert_transfer.on.certificate_removed, # pyright: ignore
self._on_cert_transfer_removed,
)

# Register status observers
for incoming, outgoings in self.mandatory_relation_pairs.items():
self.framework.observe(self.on[incoming].relation_joined, self._update_status)
self.framework.observe(self.on[incoming].relation_broken, self._update_status)
for outgoing_list in outgoings:
for outgoing in outgoing_list:
self.framework.observe(self.on[outgoing].relation_joined, self._update_status)
self.framework.observe(self.on[outgoing].relation_broken, self._update_status)
self.framework.observe(self.on.collect_unit_status, self._on_collect_unit_status)

def _get_tracing_receiver_url(self, protocol: ReceiverProtocol):
scheme = "http"
Expand Down Expand Up @@ -295,56 +280,46 @@ def _update_tracing_provider(self):
def _on_cert_changed(self, _event):
"""Event handler for cert change."""
self._update_config()
self._update_ca()
self._update_status()
self._update_tracing_provider()

def _on_tracing_endpoint_changed(self, _event) -> None:
"""Event handler for the tracing endpoint-changed event."""
self._update_config()
self._update_status()
self._update_tracing_provider()

def _on_tracing_endpoint_removed(self, _event) -> None:
"""Event handler for the tracing endpoint-removed event."""
self._update_config()
self._update_status()
self._update_tracing_provider()

def _on_tracing_provider_request(self, _event) -> None:
"""Event handler for the tracing-provider request event."""
self._update_config()
self._update_status()
self._update_tracing_provider()

def _on_tracing_provider_broken(self, _event) -> None:
"""Event handler for the tracing-provider broken event."""
self._update_config()
self._update_status()
self._update_tracing_provider()

def _on_upgrade_charm(self, _event=None):
"""Refresh alerts if the charm is updated."""
self._update_metrics_alerts()
self._update_loki_alerts()
self._update_config()
self._update_status()
self._update_tracing_provider()

def _on_loki_push_api_endpoint_joined(self, _event=None):
"""Rebuild the config with correct Loki sinks."""
self._update_config()
self._update_status()

def _on_loki_push_api_endpoint_departed(self, _event=None):
"""Rebuild the config with correct Loki sinks."""
self._update_config()
self._update_status()

def _on_config_changed(self, _event=None):
"""Rebuild the config."""
self._update_config()
self._update_status()
self._update_tracing_provider()

def _on_cloud_config_available(self, _) -> None:
Expand Down Expand Up @@ -531,55 +506,42 @@ def update_dashboards(
def on_scrape_targets_changed(self, _event) -> None:
"""Event handler for the scrape targets changed event."""
self._update_config()
self._update_status()
self._update_metrics_alerts()

def on_remote_write_changed(self, _event) -> None:
"""Event handler for the remote write changed event."""
self._update_config()
self._update_status()
self._update_metrics_alerts()

def _update_status(self, *_):
"""Determine the charm status based on relation health and grafana-agent service readiness.
This is a centralized status setter. Status should only be calculated here, or, if you need
to temporarily change the status (e.g. during install), always call this method after
so the status is re-calculated (exceptions: on_install, on_remove).
TODO: Rework this when "compound status" is implemented
https://github.com/canonical/operator/issues/665
"""
def _on_collect_unit_status(self, event: ops.CollectStatusEvent):
"""Determine the charm status based on relation health and grafana-agent service readiness."""
if not self.is_ready:
self.unit.status = WaitingStatus("waiting for agent to start")
return
event.add_status(WaitingStatus("waiting for agent to start"))

if self.status.update_config:
self.unit.status = self.status.update_config
return
if self.cert.enabled and not self.cert.available:
event.add_status(WaitingStatus("Waiting for TLS certificate."))

if self.status.validation_error:
self.unit.status = self.status.validation_error
return
if self.push_status:
event.add_status(self.push_status)

# Put charm in blocked status if all incoming relations are missing
active_relations = {k for k, v in self.model.relations.items() if v}
if not set(self.mandatory_relation_pairs.keys()).intersection(active_relations):
self.unit.status = BlockedStatus(
"Missing incoming ('requires') relation: {}".format(
"|".join(self.mandatory_relation_pairs.keys())
event.add_status(
BlockedStatus(
"Missing incoming ('requires') relation: {}".format(
"|".join(self.mandatory_relation_pairs.keys())
)
)
)
return

if missing := MandatoryRelationPairs(self.mandatory_relation_pairs).get_missing_as_str(
*active_relations
):
self.unit.status = BlockedStatus(f"Missing {missing}")
return
event.add_status(BlockedStatus(f"Missing {missing}"))

if not self.is_ready:
self.unit.status = WaitingStatus("waiting for the agent to start")
return
event.add_status(WaitingStatus("waiting for the agent to start"))

# If only _some_ of the COS relations are present, we do not want to block, but we do want
# to inform via the Active message that they are in fact missing ("soft" warning).
Expand All @@ -595,7 +557,7 @@ def _update_status(self, *_):
if cos_rels.intersection(active_relations)
else set()
)
self.unit.status = ActiveStatus(", ".join([f"{x}: off" for x in missing_rels]))
event.add_status(ActiveStatus(", ".join([f"{x}: off" for x in missing_rels])))

def _update_config(self) -> None:
if not self.is_ready:
Expand All @@ -604,8 +566,7 @@ def _update_config(self) -> None:

# Write TLS files
if self.cert.enabled:
if not (self.cert.server_cert and self.cert.private_key and self.cert.ca_cert):
self.status.update_config = WaitingStatus("Waiting for TLS certificate.")
if not self.cert.available:
self.stop()
return
self.write_file(self._cert_path, self.cert.server_cert)
Expand Down Expand Up @@ -636,7 +597,6 @@ def _update_config(self) -> None:

if config == old_config:
# Nothing changed, possibly new installation. Move on.
self.status.update_config = None
return

try:
Expand All @@ -646,12 +606,10 @@ def _update_config(self) -> None:
self.restart()
except GrafanaAgentReloadError as e:
logger.error(str(e))
self.status.update_config = BlockedStatus(str(e))
self.push_status = BlockedStatus(str(e))
except APIError as e:
logger.warning(str(e))
self.status.update_config = WaitingStatus(str(e))

self.status.update_config = None
self.push_status = WaitingStatus(str(e))

def _delete_file_if_exists(self, file_path):
try:
Expand All @@ -667,16 +625,14 @@ def _on_dashboard_status_changed(self, _event=None):
self._grafana_dashboards_provider._reinitialize_dashboard_data(
inject_dropdowns=False
) # noqa
self._update_status()

def _enhance_endpoints_with_tls(self, endpoints) -> List[Dict[str, Any]]:
for endpoint in endpoints:
endpoint["tls_config"] = {
"insecure_skip_verify": self.model.config.get("tls_insecure_skip_verify")
}
return endpoints
def _add_tls_config(self, endpoint):
"""Update an endpoint definition with insecure_skip_verify as per app config."""
endpoint["tls_config"] = {
"insecure_skip_verify": self.model.config.get("tls_insecure_skip_verify")
}

def _prometheus_endpoints_with_tls(self) -> List[Dict[str, Any]]:
def _prometheus_endpoints(self) -> List[Dict[str, Any]]:
"""Add TLS information to Prometheus endpoints.
Also, injects the grafana-cloud-integrator endpoints into those we get from juju relations.
Expand All @@ -693,9 +649,12 @@ def _prometheus_endpoints_with_tls(self) -> List[Dict[str, Any]]:
}
prometheus_endpoints.append(prometheus_endpoint)

return self._enhance_endpoints_with_tls(prometheus_endpoints)
for endpoint in prometheus_endpoints:
self._add_tls_config(endpoint)

def _loki_endpoints_with_tls(self) -> List[Dict[str, Any]]:
return prometheus_endpoints

def _loki_endpoints(self) -> List[Dict[str, Any]]:
"""Add TLS information to Loki endpoints.
Also, injects the grafana-cloud-integrator endpoints into those we get from juju relations.
Expand All @@ -717,25 +676,27 @@ def _loki_endpoints_with_tls(self) -> List[Dict[str, Any]]:
}
loki_endpoints.append(loki_endpoint)

return self._enhance_endpoints_with_tls(loki_endpoints)
for endpoint in loki_endpoints:
self._add_tls_config(endpoint)

return loki_endpoints

def _tempo_endpoints_with_tls(self) -> List[Dict[str, Any]]:
def _tempo_endpoints(self) -> List[Dict[str, Any]]:
"""Add TLS information to Tempo endpoints.
Also, injects the grafana-cloud-integrator endpoints into those we get from juju relations.
FIXME: these should be separate concerns.
"""
tempo_endpoints = []
if self._tracing.is_ready():
tempo_endpoints.append(
{
# outgoing traces are all otlp/grpc
# cit: While Tempo and the Agent both can ingest in multiple formats,
# the Agent only exports in OTLP gRPC and HTTP.
"endpoint": self._tracing.get_endpoint("otlp_grpc"),
"insecure": False if self.cert.enabled else True,
}
)
tempo_endpoint = {
# outgoing traces are all otlp/grpc
# cit: While Tempo and the Agent both can ingest in multiple formats,
# the Agent only exports in OTLP gRPC and HTTP.
"endpoint": self._tracing.get_endpoint("otlp_grpc"),
"insecure": False if self.cert.enabled else True,
}
tempo_endpoints.append(tempo_endpoint)

if self._cloud.tempo_ready:
tempo_endpoint: Dict[str, Any] = {
Expand All @@ -747,7 +708,11 @@ def _tempo_endpoints_with_tls(self) -> List[Dict[str, Any]]:
"password": self._cloud.credentials.password,
}
tempo_endpoints.append(tempo_endpoint)
return self._enhance_endpoints_with_tls(tempo_endpoints)

for endpoint in tempo_endpoints:
self._add_tls_config(endpoint)

return tempo_endpoints

def _cli_args(self) -> str:
"""Return the cli arguments to pass to agent.
Expand Down Expand Up @@ -776,7 +741,7 @@ def _generate_config(self) -> Dict[str, Any]:
{
"name": "agent_scraper",
"scrape_configs": self.metrics_jobs(),
"remote_write": self._prometheus_endpoints_with_tls(),
"remote_write": self._prometheus_endpoints(),
}
],
},
Expand Down Expand Up @@ -853,7 +818,7 @@ def _integrations_config(self) -> dict:
},
],
},
"prometheus_remote_write": self._prometheus_endpoints_with_tls(),
"prometheus_remote_write": self._prometheus_endpoints(),
**self._additional_integrations,
}
return conf
Expand Down Expand Up @@ -919,7 +884,7 @@ def _tempo_config(self) -> Dict[str, Union[Any, List[Any]]]:
Returns:
a dict with the tracing config.
"""
endpoints = self._tempo_endpoints_with_tls()
endpoints = self._tempo_endpoints()
receivers = self._tracing_receivers

if not receivers:
Expand Down Expand Up @@ -948,7 +913,7 @@ def _loki_config(self) -> Dict[str, Union[Any, List[Any]]]:
configs.append(
{
"name": "push_api_server",
"clients": self._loki_endpoints_with_tls(),
"clients": self._loki_endpoints(),
"scrape_configs": [
{
"job_name": "loki",
Expand Down Expand Up @@ -1045,7 +1010,7 @@ def _agent_version(self) -> Optional[str]:
return result.group(1)

def _update_ca(self) -> None:
"""Updates the CA cert on disk from cert_manager."""
"""Updates the CA cert on disk from CertHandler."""
if (not self.cert.enabled) or (not self.cert.ca_cert):
try:
self.read_file(self._ca_path)
Expand Down
Loading

0 comments on commit 41b3272

Please sign in to comment.