diff --git a/.github/workflows/release-edge.yaml b/.github/workflows/release-edge.yaml index 99ca21fa..abdf3bbf 100644 --- a/.github/workflows/release-edge.yaml +++ b/.github/workflows/release-edge.yaml @@ -1,36 +1,11 @@ name: Release to Edge on: - workflow_dispatch: {} push: branches: - main jobs: - render: - runs-on: ubuntu-20.04 - steps: - - name: Checkout Code - uses: actions/checkout@v3 - - name: Setup Python - uses: actions/setup-python@v4 - with: - python-version: 3.8 - - name: Install dependencies - run: python3 -m pip install tox - - name: Render k8s Charm - run: tox -e render-k8s - - name: Pack Artifact - run: sudo apt-get update && sudo apt-get install tar && tar czvf artifact.tar.gz metadata.yaml src/charm.py - - name: Upload Artifact - uses: actions/upload-artifact@v3 - with: - name: k8s_artifact - path: artifact.tar.gz - release: uses: canonical/observability/.github/workflows/release-charm.yaml@main - needs: render secrets: inherit - with: - artifact: k8s_artifact diff --git a/.gitignore b/.gitignore index 4f6a8f84..8c579e88 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,4 @@ __pycache__/ .tox .idea/ tests/integration/*-tester/lib/ -metadata.yaml -src/charm.py .env diff --git a/charmcraft.yaml b/charmcraft.yaml index c7a23388..e5e07201 100644 --- a/charmcraft.yaml +++ b/charmcraft.yaml @@ -11,9 +11,6 @@ bases: - name: "ubuntu" channel: "22.04" architectures: ["amd64"] - - name: "ubuntu" - channel: "20.04" - architectures: ["amd64"] parts: charm: build-packages: diff --git a/lib/charms/grafana_agent/v0/cos_agent.py b/lib/charms/grafana_agent/v0/cos_agent.py deleted file mode 100644 index 259a9017..00000000 --- a/lib/charms/grafana_agent/v0/cos_agent.py +++ /dev/null @@ -1,819 +0,0 @@ -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - -r"""## Overview. - -This library can be used to manage the cos_agent relation interface: - -- `COSAgentProvider`: Use in machine charms that need to have a workload's metrics - or logs scraped, or forward rule files or dashboards to Prometheus, Loki or Grafana through - the Grafana Agent machine charm. - -- `COSAgentConsumer`: Used in the Grafana Agent machine charm to manage the requirer side of - the `cos_agent` interface. - - -## COSAgentProvider Library Usage - -Grafana Agent machine Charmed Operator interacts with its clients using the cos_agent library. -Charms seeking to send telemetry, must do so using the `COSAgentProvider` object from -this charm library. - -Using the `COSAgentProvider` object only requires instantiating it, -typically in the `__init__` method of your charm (the one which sends telemetry). - -The constructor of `COSAgentProvider` has only one required and nine optional parameters: - -```python - def __init__( - self, - charm: CharmType, - relation_name: str = DEFAULT_RELATION_NAME, - metrics_endpoints: Optional[List[_MetricsEndpointDict]] = None, - metrics_rules_dir: str = "./src/prometheus_alert_rules", - logs_rules_dir: str = "./src/loki_alert_rules", - recurse_rules_dirs: bool = False, - log_slots: Optional[List[str]] = None, - dashboard_dirs: Optional[List[str]] = None, - refresh_events: Optional[List] = None, - scrape_configs: Optional[Union[List[Dict], Callable]] = None, - ): -``` - -### Parameters - -- `charm`: The instance of the charm that instantiates `COSAgentProvider`, typically `self`. - -- `relation_name`: If your charmed operator uses a relation name other than `cos-agent` to use - the `cos_agent` interface, this is where you have to specify that. - -- `metrics_endpoints`: In this parameter you can specify the metrics endpoints that Grafana Agent - machine Charmed Operator will scrape. The configs of this list will be merged with the configs - from `scrape_configs`. - -- `metrics_rules_dir`: The directory in which the Charmed Operator stores its metrics alert rules - files. - -- `logs_rules_dir`: The directory in which the Charmed Operator stores its logs alert rules files. - -- `recurse_rules_dirs`: This parameters set whether Grafana Agent machine Charmed Operator has to - search alert rules files recursively in the previous two directories or not. - -- `log_slots`: Snap slots to connect to for scraping logs in the form ["snap-name:slot", ...]. - -- `dashboard_dirs`: List of directories where the dashboards are stored in the Charmed Operator. - -- `refresh_events`: List of events on which to refresh relation data. - -- `scrape_configs`: List of standard scrape_configs dicts or a callable that returns the list in - case the configs need to be generated dynamically. The contents of this list will be merged - with the configs from `metrics_endpoints`. - - -### Example 1 - Minimal instrumentation: - -In order to use this object the following should be in the `charm.py` file. - -```python -from charms.grafana_agent.v0.cos_agent import COSAgentProvider -... -class TelemetryProviderCharm(CharmBase): - def __init__(self, *args): - ... - self._grafana_agent = COSAgentProvider(self) -``` - -### Example 2 - Full instrumentation: - -In order to use this object the following should be in the `charm.py` file. - -```python -from charms.grafana_agent.v0.cos_agent import COSAgentProvider -... -class TelemetryProviderCharm(CharmBase): - def __init__(self, *args): - ... - self._grafana_agent = COSAgentProvider( - self, - relation_name="custom-cos-agent", - metrics_endpoints=[ - # specify "path" and "port" to scrape from localhost - {"path": "/metrics", "port": 9000}, - {"path": "/metrics", "port": 9001}, - {"path": "/metrics", "port": 9002}, - ], - metrics_rules_dir="./src/alert_rules/prometheus", - logs_rules_dir="./src/alert_rules/loki", - recursive_rules_dir=True, - log_slots=["my-app:slot"], - dashboard_dirs=["./src/dashboards_1", "./src/dashboards_2"], - refresh_events=["update-status", "upgrade-charm"], - scrape_configs=[ - { - "job_name": "custom_job", - "metrics_path": "/metrics", - "authorization": {"credentials": "bearer-token"}, - "static_configs": [ - { - "targets": ["localhost:9003"]}, - "labels": {"key": "value"}, - }, - ], - }, - ] - ) -``` - -### Example 3 - Dynamic scrape configs generation: - -Pass a function to the `scrape_configs` to decouple the generation of the configs -from the instantiation of the COSAgentProvider object. - -```python -from charms.grafana_agent.v0.cos_agent import COSAgentProvider -... - -class TelemetryProviderCharm(CharmBase): - def generate_scrape_configs(self): - return [ - { - "job_name": "custom", - "metrics_path": "/metrics", - "static_configs": [{"targets": ["localhost:9000"]}], - }, - ] - - def __init__(self, *args): - ... - self._grafana_agent = COSAgentProvider( - self, - scrape_configs=self.generate_scrape_configs, - ) -``` - -## COSAgentConsumer Library Usage - -This object may be used by any Charmed Operator which gathers telemetry data by -implementing the consumer side of the `cos_agent` interface. -For instance Grafana Agent machine Charmed Operator. - -For this purpose the charm needs to instantiate the `COSAgentConsumer` object with one mandatory -and two optional arguments. - -### Parameters - -- `charm`: A reference to the parent (Grafana Agent machine) charm. - -- `relation_name`: The name of the relation that the charm uses to interact - with its clients that provides telemetry data using the `COSAgentProvider` object. - - If provided, this relation name must match a provided relation in metadata.yaml with the - `cos_agent` interface. - The default value of this argument is "cos-agent". - -- `refresh_events`: List of events on which to refresh relation data. - - -### Example 1 - Minimal instrumentation: - -In order to use this object the following should be in the `charm.py` file. - -```python -from charms.grafana_agent.v0.cos_agent import COSAgentConsumer -... -class GrafanaAgentMachineCharm(GrafanaAgentCharm) - def __init__(self, *args): - ... - self._cos = COSAgentRequirer(self) -``` - - -### Example 2 - Full instrumentation: - -In order to use this object the following should be in the `charm.py` file. - -```python -from charms.grafana_agent.v0.cos_agent import COSAgentConsumer -... -class GrafanaAgentMachineCharm(GrafanaAgentCharm) - def __init__(self, *args): - ... - self._cos = COSAgentRequirer( - self, - relation_name="cos-agent-consumer", - refresh_events=["update-status", "upgrade-charm"], - ) -``` -""" - -import json -import logging -from collections import namedtuple -from itertools import chain -from pathlib import Path -from typing import TYPE_CHECKING, Any, Callable, ClassVar, Dict, List, Optional, Set, Union - -import pydantic -from cosl import GrafanaDashboard, JujuTopology -from cosl.rules import AlertRules -from ops.charm import RelationChangedEvent -from ops.framework import EventBase, EventSource, Object, ObjectEvents -from ops.model import Relation, Unit -from ops.testing import CharmType - -if TYPE_CHECKING: - try: - from typing import TypedDict - - class _MetricsEndpointDict(TypedDict): - path: str - port: int - - except ModuleNotFoundError: - _MetricsEndpointDict = Dict # pyright: ignore - -LIBID = "dc15fa84cef84ce58155fb84f6c6213a" -LIBAPI = 0 -LIBPATCH = 7 - -PYDEPS = ["cosl", "pydantic < 2"] - -DEFAULT_RELATION_NAME = "cos-agent" -DEFAULT_PEER_RELATION_NAME = "peers" -DEFAULT_SCRAPE_CONFIG = { - "static_configs": [{"targets": ["localhost:80"]}], - "metrics_path": "/metrics", -} - -logger = logging.getLogger(__name__) -SnapEndpoint = namedtuple("SnapEndpoint", "owner, name") - - -class CosAgentProviderUnitData(pydantic.BaseModel): - """Unit databag model for `cos-agent` relation.""" - - # The following entries are the same for all units of the same principal. - # Note that the same grafana agent subordinate may be related to several apps. - # this needs to make its way to the gagent leader - metrics_alert_rules: dict - log_alert_rules: dict - dashboards: List[GrafanaDashboard] - subordinate: Optional[bool] - - # The following entries may vary across units of the same principal app. - # this data does not need to be forwarded to the gagent leader - metrics_scrape_jobs: List[Dict] - log_slots: List[str] - - # when this whole datastructure is dumped into a databag, it will be nested under this key. - # while not strictly necessary (we could have it 'flattened out' into the databag), - # this simplifies working with the model. - KEY: ClassVar[str] = "config" - - -class CosAgentPeersUnitData(pydantic.BaseModel): - """Unit databag model for `peers` cos-agent machine charm peer relation.""" - - # We need the principal unit name and relation metadata to be able to render identifiers - # (e.g. topology) on the leader side, after all the data moves into peer data (the grafana - # agent leader can only see its own principal, because it is a subordinate charm). - principal_unit_name: str - principal_relation_id: str - principal_relation_name: str - - # The only data that is forwarded to the leader is data that needs to go into the app databags - # of the outgoing o11y relations. - metrics_alert_rules: Optional[dict] - log_alert_rules: Optional[dict] - dashboards: Optional[List[GrafanaDashboard]] - - # when this whole datastructure is dumped into a databag, it will be nested under this key. - # while not strictly necessary (we could have it 'flattened out' into the databag), - # this simplifies working with the model. - KEY: ClassVar[str] = "config" - - @property - def app_name(self) -> str: - """Parse out the app name from the unit name. - - TODO: Switch to using `model_post_init` when pydantic v2 is released? - https://github.com/pydantic/pydantic/issues/1729#issuecomment-1300576214 - """ - return self.principal_unit_name.split("/")[0] - - -class COSAgentProvider(Object): - """Integration endpoint wrapper for the provider side of the cos_agent interface.""" - - def __init__( - self, - charm: CharmType, - relation_name: str = DEFAULT_RELATION_NAME, - metrics_endpoints: Optional[List["_MetricsEndpointDict"]] = None, - metrics_rules_dir: str = "./src/prometheus_alert_rules", - logs_rules_dir: str = "./src/loki_alert_rules", - recurse_rules_dirs: bool = False, - log_slots: Optional[List[str]] = None, - dashboard_dirs: Optional[List[str]] = None, - refresh_events: Optional[List] = None, - *, - scrape_configs: Optional[Union[List[dict], Callable]] = None, - ): - """Create a COSAgentProvider instance. - - Args: - charm: The `CharmBase` instance that is instantiating this object. - relation_name: The name of the relation to communicate over. - metrics_endpoints: List of endpoints in the form [{"path": path, "port": port}, ...]. - This argument is a simplified form of the `scrape_configs`. - The contents of this list will be merged with the contents of `scrape_configs`. - metrics_rules_dir: Directory where the metrics rules are stored. - logs_rules_dir: Directory where the logs rules are stored. - recurse_rules_dirs: Whether to recurse into rule paths. - log_slots: Snap slots to connect to for scraping logs - in the form ["snap-name:slot", ...]. - dashboard_dirs: Directory where the dashboards are stored. - refresh_events: List of events on which to refresh relation data. - scrape_configs: List of standard scrape_configs dicts or a callable - that returns the list in case the configs need to be generated dynamically. - The contents of this list will be merged with the contents of `metrics_endpoints`. - """ - super().__init__(charm, relation_name) - dashboard_dirs = dashboard_dirs or ["./src/grafana_dashboards"] - - self._charm = charm - self._relation_name = relation_name - self._metrics_endpoints = metrics_endpoints or [] - self._scrape_configs = scrape_configs or [] - self._metrics_rules = metrics_rules_dir - self._logs_rules = logs_rules_dir - self._recursive = recurse_rules_dirs - self._log_slots = log_slots or [] - self._dashboard_dirs = dashboard_dirs - self._refresh_events = refresh_events or [self._charm.on.config_changed] - - events = self._charm.on[relation_name] - self.framework.observe(events.relation_joined, self._on_refresh) - self.framework.observe(events.relation_changed, self._on_refresh) - for event in self._refresh_events: - self.framework.observe(event, self._on_refresh) - - def _on_refresh(self, event): - """Trigger the class to update relation data.""" - relations = self._charm.model.relations[self._relation_name] - - for relation in relations: - # Before a principal is related to the grafana-agent subordinate, we'd get - # ModelError: ERROR cannot read relation settings: unit "zk/2": settings not found - # Add a guard to make sure it doesn't happen. - if relation.data and self._charm.unit in relation.data: - # Subordinate relations can communicate only over unit data. - try: - data = CosAgentProviderUnitData( - metrics_alert_rules=self._metrics_alert_rules, - log_alert_rules=self._log_alert_rules, - dashboards=self._dashboards, - metrics_scrape_jobs=self._scrape_jobs, - log_slots=self._log_slots, - subordinate=self._charm.meta.subordinate, - ) - relation.data[self._charm.unit][data.KEY] = data.json() - except ( - pydantic.ValidationError, - json.decoder.JSONDecodeError, - ) as e: - logger.error("Invalid relation data provided: %s", e) - - @property - def _scrape_jobs(self) -> List[Dict]: - """Return a prometheus_scrape-like data structure for jobs. - - https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config - """ - if callable(self._scrape_configs): - scrape_configs = self._scrape_configs() - else: - # Create a copy of the user scrape_configs, since we will mutate this object - scrape_configs = self._scrape_configs.copy() - - # Convert "metrics_endpoints" to standard scrape_configs, and add them in - for endpoint in self._metrics_endpoints: - scrape_configs.append( - { - "metrics_path": endpoint["path"], - "static_configs": [{"targets": [f"localhost:{endpoint['port']}"]}], - } - ) - - scrape_configs = scrape_configs or [DEFAULT_SCRAPE_CONFIG] - - # Augment job name to include the app name and a unique id (index) - for idx, scrape_config in enumerate(scrape_configs): - scrape_config["job_name"] = "_".join( - [self._charm.app.name, str(idx), scrape_config.get("job_name", "default")] - ) - - return scrape_configs - - @property - def _metrics_alert_rules(self) -> Dict: - """Use (for now) the prometheus_scrape AlertRules to initialize this.""" - alert_rules = AlertRules( - query_type="promql", topology=JujuTopology.from_charm(self._charm) - ) - alert_rules.add_path(self._metrics_rules, recursive=self._recursive) - return alert_rules.as_dict() - - @property - def _log_alert_rules(self) -> Dict: - """Use (for now) the loki_push_api AlertRules to initialize this.""" - alert_rules = AlertRules(query_type="logql", topology=JujuTopology.from_charm(self._charm)) - alert_rules.add_path(self._logs_rules, recursive=self._recursive) - return alert_rules.as_dict() - - @property - def _dashboards(self) -> List[GrafanaDashboard]: - dashboards: List[GrafanaDashboard] = [] - for d in self._dashboard_dirs: - for path in Path(d).glob("*"): - dashboard = GrafanaDashboard._serialize(path.read_bytes()) - dashboards.append(dashboard) - return dashboards - - -class COSAgentDataChanged(EventBase): - """Event emitted by `COSAgentRequirer` when relation data changes.""" - - -class COSAgentValidationError(EventBase): - """Event emitted by `COSAgentRequirer` when there is an error in the relation data.""" - - def __init__(self, handle, message: str = ""): - super().__init__(handle) - self.message = message - - def snapshot(self) -> Dict: - """Save COSAgentValidationError source information.""" - return {"message": self.message} - - def restore(self, snapshot): - """Restore COSAgentValidationError source information.""" - self.message = snapshot["message"] - - -class COSAgentRequirerEvents(ObjectEvents): - """`COSAgentRequirer` events.""" - - data_changed = EventSource(COSAgentDataChanged) - validation_error = EventSource(COSAgentValidationError) - - -class MultiplePrincipalsError(Exception): - """Custom exception for when there are multiple principal applications.""" - - pass - - -class COSAgentRequirer(Object): - """Integration endpoint wrapper for the Requirer side of the cos_agent interface.""" - - on = COSAgentRequirerEvents() # pyright: ignore - - def __init__( - self, - charm: CharmType, - *, - relation_name: str = DEFAULT_RELATION_NAME, - peer_relation_name: str = DEFAULT_PEER_RELATION_NAME, - refresh_events: Optional[List[str]] = None, - ): - """Create a COSAgentRequirer instance. - - Args: - charm: The `CharmBase` instance that is instantiating this object. - relation_name: The name of the relation to communicate over. - peer_relation_name: The name of the peer relation to communicate over. - refresh_events: List of events on which to refresh relation data. - """ - super().__init__(charm, relation_name) - self._charm = charm - self._relation_name = relation_name - self._peer_relation_name = peer_relation_name - self._refresh_events = refresh_events or [self._charm.on.config_changed] - - events = self._charm.on[relation_name] - self.framework.observe( - events.relation_joined, self._on_relation_data_changed - ) # TODO: do we need this? - self.framework.observe(events.relation_changed, self._on_relation_data_changed) - for event in self._refresh_events: - self.framework.observe(event, self.trigger_refresh) # pyright: ignore - - # Peer relation events - # A peer relation is needed as it is the only mechanism for exchanging data across - # subordinate units. - # self.framework.observe( - # self.on[self._peer_relation_name].relation_joined, self._on_peer_relation_joined - # ) - peer_events = self._charm.on[peer_relation_name] - self.framework.observe(peer_events.relation_changed, self._on_peer_relation_changed) - - @property - def peer_relation(self) -> Optional["Relation"]: - """Helper function for obtaining the peer relation object. - - Returns: peer relation object - (NOTE: would return None if called too early, e.g. during install). - """ - return self.model.get_relation(self._peer_relation_name) - - def _on_peer_relation_changed(self, _): - # Peer data is used for forwarding data from principal units to the grafana agent - # subordinate leader, for updating the app data of the outgoing o11y relations. - if self._charm.unit.is_leader(): - self.on.data_changed.emit() # pyright: ignore - - def _on_relation_data_changed(self, event: RelationChangedEvent): - # Peer data is the only means of communication between subordinate units. - if not self.peer_relation: - event.defer() - return - - cos_agent_relation = event.relation - if not event.unit or not cos_agent_relation.data.get(event.unit): - return - principal_unit = event.unit - - # Coherence check - units = cos_agent_relation.units - if len(units) > 1: - # should never happen - raise ValueError( - f"unexpected error: subordinate relation {cos_agent_relation} " - f"should have exactly one unit" - ) - - if not (raw := cos_agent_relation.data[principal_unit].get(CosAgentProviderUnitData.KEY)): - return - - if not (provider_data := self._validated_provider_data(raw)): - return - - # Copy data from the principal relation to the peer relation, so the leader could - # follow up. - # Save the originating unit name, so it could be used for topology later on by the leader. - data = CosAgentPeersUnitData( # peer relation databag model - principal_unit_name=event.unit.name, - principal_relation_id=str(event.relation.id), - principal_relation_name=event.relation.name, - metrics_alert_rules=provider_data.metrics_alert_rules, - log_alert_rules=provider_data.log_alert_rules, - dashboards=provider_data.dashboards, - ) - self.peer_relation.data[self._charm.unit][ - f"{CosAgentPeersUnitData.KEY}-{event.unit.name}" - ] = data.json() - - # We can't easily tell if the data that was changed is limited to only the data - # that goes into peer relation (in which case, if this is not a leader unit, we wouldn't - # need to emit `on.data_changed`), so we're emitting `on.data_changed` either way. - self.on.data_changed.emit() # pyright: ignore - - def _validated_provider_data(self, raw) -> Optional[CosAgentProviderUnitData]: - try: - return CosAgentProviderUnitData(**json.loads(raw)) - except (pydantic.ValidationError, json.decoder.JSONDecodeError) as e: - self.on.validation_error.emit(message=str(e)) # pyright: ignore - return None - - def trigger_refresh(self, _): - """Trigger a refresh of relation data.""" - # FIXME: Figure out what we should do here - self.on.data_changed.emit() # pyright: ignore - - @property - def _principal_unit(self) -> Optional[Unit]: - """Return the principal unit for a relation. - - Assumes that the relation is of type subordinate. - Relies on the fact that, for subordinate relations, the only remote unit visible to - *this unit* is the principal unit that this unit is attached to. - """ - if relations := self._principal_relations: - # Technically it's a list, but for subordinates there can only be one relation - principal_relation = next(iter(relations)) - if units := principal_relation.units: - # Technically it's a list, but for subordinates there can only be one - return next(iter(units)) - - return None - - @property - def _principal_relations(self): - relations = [] - for relation in self._charm.model.relations[self._relation_name]: - if not json.loads(relation.data[next(iter(relation.units))]["config"]).get( - ["subordinate"], False - ): - relations.append(relation) - if len(relations) > 1: - logger.error( - "Multiple applications claiming to be principal. Update the cos-agent library in the client application charms." - ) - raise MultiplePrincipalsError("Multiple principal applications.") - return relations - - @property - def _remote_data(self) -> List[CosAgentProviderUnitData]: - """Return a list of remote data from each of the related units. - - Assumes that the relation is of type subordinate. - Relies on the fact that, for subordinate relations, the only remote unit visible to - *this unit* is the principal unit that this unit is attached to. - """ - all_data = [] - - for relation in self._charm.model.relations[self._relation_name]: - if not relation.units: - continue - unit = next(iter(relation.units)) - if not (raw := relation.data[unit].get(CosAgentProviderUnitData.KEY)): - continue - if not (provider_data := self._validated_provider_data(raw)): - continue - all_data.append(provider_data) - - return all_data - - def _gather_peer_data(self) -> List[CosAgentPeersUnitData]: - """Collect data from the peers. - - Returns a trimmed-down list of CosAgentPeersUnitData. - """ - relation = self.peer_relation - - # Ensure that whatever context we're running this in, we take the necessary precautions: - if not relation or not relation.data or not relation.app: - return [] - - # Iterate over all peer unit data and only collect every principal once. - peer_data: List[CosAgentPeersUnitData] = [] - app_names: Set[str] = set() - - for unit in chain((self._charm.unit,), relation.units): - if not relation.data.get(unit): - continue - - for unit_name in relation.data.get(unit): # pyright: ignore - if not unit_name.startswith(CosAgentPeersUnitData.KEY): - continue - raw = relation.data[unit].get(unit_name) - if raw is None: - continue - data = CosAgentPeersUnitData(**json.loads(raw)) - # Have we already seen this principal app? - if (app_name := data.app_name) in app_names: - continue - peer_data.append(data) - app_names.add(app_name) - - return peer_data - - @property - def metrics_alerts(self) -> Dict[str, Any]: - """Fetch metrics alerts.""" - alert_rules = {} - - seen_apps: List[str] = [] - for data in self._gather_peer_data(): - if rules := data.metrics_alert_rules: - app_name = data.app_name - if app_name in seen_apps: - continue # dedup! - seen_apps.append(app_name) - # This is only used for naming the file, so be as specific as we can be - identifier = JujuTopology( - model=self._charm.model.name, - model_uuid=self._charm.model.uuid, - application=app_name, - # For the topology unit, we could use `data.principal_unit_name`, but that unit - # name may not be very stable: `_gather_peer_data` de-duplicates by app name so - # the exact unit name that turns up first in the iterator may vary from time to - # time. So using the grafana-agent unit name instead. - unit=self._charm.unit.name, - ).identifier - - alert_rules[identifier] = rules - - return alert_rules - - @property - def metrics_jobs(self) -> List[Dict]: - """Parse the relation data contents and extract the metrics jobs.""" - scrape_jobs = [] - for data in self._remote_data: - for job in data.metrics_scrape_jobs: - # In #220, relation schema changed from a simplified dict to the standard - # `scrape_configs`. - # This is to ensure backwards compatibility with Providers older than v0.5. - if "path" in job and "port" in job and "job_name" in job: - job = { - "job_name": job["job_name"], - "metrics_path": job["path"], - "static_configs": [{"targets": [f"localhost:{job['port']}"]}], - # We include insecure_skip_verify because we are always scraping localhost. - # Even if we have the certs for the scrape targets, we'd rather specify the scrape - # jobs with localhost rather than the SAN DNS the cert was issued for. - "tls_config": {"insecure_skip_verify": True}, - } - - scrape_jobs.append(job) - - return scrape_jobs - - @property - def snap_log_endpoints(self) -> List[SnapEndpoint]: - """Fetch logging endpoints exposed by related snaps.""" - plugs = [] - for data in self._remote_data: - targets = data.log_slots - if targets: - for target in targets: - if target in plugs: - logger.warning( - f"plug {target} already listed. " - "The same snap is being passed from multiple " - "endpoints; this should not happen." - ) - else: - plugs.append(target) - - endpoints = [] - for plug in plugs: - if ":" not in plug: - logger.error(f"invalid plug definition received: {plug}. Ignoring...") - else: - endpoint = SnapEndpoint(*plug.split(":")) - endpoints.append(endpoint) - return endpoints - - @property - def logs_alerts(self) -> Dict[str, Any]: - """Fetch log alerts.""" - alert_rules = {} - seen_apps: List[str] = [] - - for data in self._gather_peer_data(): - if rules := data.log_alert_rules: - # This is only used for naming the file, so be as specific as we can be - app_name = data.app_name - if app_name in seen_apps: - continue # dedup! - seen_apps.append(app_name) - - identifier = JujuTopology( - model=self._charm.model.name, - model_uuid=self._charm.model.uuid, - application=app_name, - # For the topology unit, we could use `data.principal_unit_name`, but that unit - # name may not be very stable: `_gather_peer_data` de-duplicates by app name so - # the exact unit name that turns up first in the iterator may vary from time to - # time. So using the grafana-agent unit name instead. - unit=self._charm.unit.name, - ).identifier - - alert_rules[identifier] = rules - - return alert_rules - - @property - def dashboards(self) -> List[Dict[str, str]]: - """Fetch dashboards as encoded content. - - Dashboards are assumed not to vary across units of the same primary. - """ - dashboards: List[Dict[str, Any]] = [] - - seen_apps: List[str] = [] - for data in self._gather_peer_data(): - app_name = data.app_name - if app_name in seen_apps: - continue # dedup! - seen_apps.append(app_name) - - for encoded_dashboard in data.dashboards or (): - content = GrafanaDashboard(encoded_dashboard)._deserialize() - - title = content.get("title", "no_title") - - dashboards.append( - { - "relation_id": data.principal_relation_id, - # We have the remote charm name - use it for the identifier - "charm": f"{data.principal_relation_name}-{app_name}", - "content": content, - "title": title, - } - ) - - return dashboards diff --git a/lib/charms/operator_libs_linux/v2/snap.py b/lib/charms/operator_libs_linux/v2/snap.py deleted file mode 100644 index 38c88cf0..00000000 --- a/lib/charms/operator_libs_linux/v2/snap.py +++ /dev/null @@ -1,1099 +0,0 @@ -# Copyright 2021 Canonical Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Representations of the system's Snaps, and abstractions around managing them. - -The `snap` module provides convenience methods for listing, installing, refreshing, and removing -Snap packages, in addition to setting and getting configuration options for them. - -In the `snap` module, `SnapCache` creates a dict-like mapping of `Snap` objects at when -instantiated. Installed snaps are fully populated, and available snaps are lazily-loaded upon -request. This module relies on an installed and running `snapd` daemon to perform operations over -the `snapd` HTTP API. - -`SnapCache` objects can be used to install or modify Snap packages by name in a manner similar to -using the `snap` command from the commandline. - -An example of adding Juju to the system with `SnapCache` and setting a config value: - -```python -try: - cache = snap.SnapCache() - juju = cache["juju"] - - if not juju.present: - juju.ensure(snap.SnapState.Latest, channel="beta") - juju.set({"some.key": "value", "some.key2": "value2"}) -except snap.SnapError as e: - logger.error("An exception occurred when installing charmcraft. Reason: %s", e.message) -``` - -In addition, the `snap` module provides "bare" methods which can act on Snap packages as -simple function calls. :meth:`add`, :meth:`remove`, and :meth:`ensure` are provided, as -well as :meth:`add_local` for installing directly from a local `.snap` file. These return -`Snap` objects. - -As an example of installing several Snaps and checking details: - -```python -try: - nextcloud, charmcraft = snap.add(["nextcloud", "charmcraft"]) - if nextcloud.get("mode") != "production": - nextcloud.set({"mode": "production"}) -except snap.SnapError as e: - logger.error("An exception occurred when installing snaps. Reason: %s" % e.message) -``` -""" - -import http.client -import json -import logging -import os -import re -import socket -import subprocess -import sys -import urllib.error -import urllib.parse -import urllib.request -from collections.abc import Mapping -from datetime import datetime, timedelta, timezone -from enum import Enum -from subprocess import CalledProcessError, CompletedProcess -from typing import Any, Dict, Iterable, List, Optional, Union - -logger = logging.getLogger(__name__) - -# The unique Charmhub library identifier, never change it -LIBID = "05394e5893f94f2d90feb7cbe6b633cd" - -# Increment this major API version when introducing breaking changes -LIBAPI = 2 - -# Increment this PATCH version before using `charmcraft publish-lib` or reset -# to 0 if you are raising the major API version -LIBPATCH = 3 - - -# Regex to locate 7-bit C1 ANSI sequences -ansi_filter = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") - - -def _cache_init(func): - def inner(*args, **kwargs): - if _Cache.cache is None: - _Cache.cache = SnapCache() - return func(*args, **kwargs) - - return inner - - -# recursive hints seems to error out pytest -JSONType = Union[Dict[str, Any], List[Any], str, int, float] - - -class SnapService: - """Data wrapper for snap services.""" - - def __init__( - self, - daemon: Optional[str] = None, - daemon_scope: Optional[str] = None, - enabled: bool = False, - active: bool = False, - activators: List[str] = [], - **kwargs, - ): - self.daemon = daemon - self.daemon_scope = kwargs.get("daemon-scope", None) or daemon_scope - self.enabled = enabled - self.active = active - self.activators = activators - - def as_dict(self) -> Dict: - """Return instance representation as dict.""" - return { - "daemon": self.daemon, - "daemon_scope": self.daemon_scope, - "enabled": self.enabled, - "active": self.active, - "activators": self.activators, - } - - -class MetaCache(type): - """MetaCache class used for initialising the snap cache.""" - - @property - def cache(cls) -> "SnapCache": - """Property for returning the snap cache.""" - return cls._cache - - @cache.setter - def cache(cls, cache: "SnapCache") -> None: - """Setter for the snap cache.""" - cls._cache = cache - - def __getitem__(cls, name) -> "Snap": - """Snap cache getter.""" - return cls._cache[name] - - -class _Cache(object, metaclass=MetaCache): - _cache = None - - -class Error(Exception): - """Base class of most errors raised by this library.""" - - def __repr__(self): - """Represent the Error class.""" - return "<{}.{} {}>".format(type(self).__module__, type(self).__name__, self.args) - - @property - def name(self): - """Return a string representation of the model plus class.""" - return "<{}.{}>".format(type(self).__module__, type(self).__name__) - - @property - def message(self): - """Return the message passed as an argument.""" - return self.args[0] - - -class SnapAPIError(Error): - """Raised when an HTTP API error occurs talking to the Snapd server.""" - - def __init__(self, body: Dict, code: int, status: str, message: str): - super().__init__(message) # Makes str(e) return message - self.body = body - self.code = code - self.status = status - self._message = message - - def __repr__(self): - """Represent the SnapAPIError class.""" - return "APIError({!r}, {!r}, {!r}, {!r})".format( - self.body, self.code, self.status, self._message - ) - - -class SnapState(Enum): - """The state of a snap on the system or in the cache.""" - - Present = "present" - Absent = "absent" - Latest = "latest" - Available = "available" - - -class SnapError(Error): - """Raised when there's an error running snap control commands.""" - - -class SnapNotFoundError(Error): - """Raised when a requested snap is not known to the system.""" - - -class Snap(object): - """Represents a snap package and its properties. - - `Snap` exposes the following properties about a snap: - - name: the name of the snap - - state: a `SnapState` representation of its install status - - channel: "stable", "candidate", "beta", and "edge" are common - - revision: a string representing the snap's revision - - confinement: "classic" or "strict" - """ - - def __init__( - self, - name, - state: SnapState, - channel: str, - revision: str, - confinement: str, - apps: Optional[List[Dict[str, str]]] = None, - cohort: Optional[str] = "", - ) -> None: - self._name = name - self._state = state - self._channel = channel - self._revision = revision - self._confinement = confinement - self._cohort = cohort - self._apps = apps or [] - self._snap_client = SnapClient() - - def __eq__(self, other) -> bool: - """Equality for comparison.""" - return isinstance(other, self.__class__) and ( - self._name, - self._revision, - ) == (other._name, other._revision) - - def __hash__(self): - """Calculate a hash for this snap.""" - return hash((self._name, self._revision)) - - def __repr__(self): - """Represent the object such that it can be reconstructed.""" - return "<{}.{}: {}>".format(self.__module__, self.__class__.__name__, self.__dict__) - - def __str__(self): - """Represent the snap object as a string.""" - return "<{}: {}-{}.{} -- {}>".format( - self.__class__.__name__, - self._name, - self._revision, - self._channel, - str(self._state), - ) - - def _snap(self, command: str, optargs: Optional[Iterable[str]] = None) -> str: - """Perform a snap operation. - - Args: - command: the snap command to execute - optargs: an (optional) list of additional arguments to pass, - commonly confinement or channel - - Raises: - SnapError if there is a problem encountered - """ - optargs = optargs or [] - args = ["snap", command, self._name, *optargs] - try: - return subprocess.check_output(args, universal_newlines=True) - except CalledProcessError as e: - raise SnapError( - "Snap: {!r}; command {!r} failed with output = {!r}".format( - self._name, args, e.output - ) - ) - - def _snap_daemons( - self, - command: List[str], - services: Optional[List[str]] = None, - ) -> CompletedProcess: - """Perform snap app commands. - - Args: - command: the snap command to execute - services: the snap service to execute command on - - Raises: - SnapError if there is a problem encountered - """ - if services: - # an attempt to keep the command constrained to the snap instance's services - services = ["{}.{}".format(self._name, service) for service in services] - else: - services = [self._name] - - args = ["snap", *command, *services] - - try: - return subprocess.run(args, universal_newlines=True, check=True, capture_output=True) - except CalledProcessError as e: - raise SnapError("Could not {} for snap [{}]: {}".format(args, self._name, e.stderr)) - - def get(self, key: Optional[str], *, typed: bool = False) -> Any: - """Fetch snap configuration values. - - Args: - key: the key to retrieve. Default to retrieve all values for typed=True. - typed: set to True to retrieve typed values (set with typed=True). - Default is to return a string. - """ - if typed: - config = json.loads(self._snap("get", ["-d", key])) - if key: - return config.get(key) - return config - - if not key: - raise TypeError("Key must be provided when typed=False") - - return self._snap("get", [key]).strip() - - def set(self, config: Dict[str, Any], *, typed: bool = False) -> str: - """Set a snap configuration value. - - Args: - config: a dictionary containing keys and values specifying the config to set. - typed: set to True to convert all values in the config into typed values while - configuring the snap (set with typed=True). Default is not to convert. - """ - if typed: - kv = [f"{key}={json.dumps(val)}" for key, val in config.items()] - return self._snap("set", ["-t"] + kv) - - return self._snap("set", [f"{key}={val}" for key, val in config.items()]) - - def unset(self, key) -> str: - """Unset a snap configuration value. - - Args: - key: the key to unset - """ - return self._snap("unset", [key]) - - def start(self, services: Optional[List[str]] = None, enable: Optional[bool] = False) -> None: - """Start a snap's services. - - Args: - services (list): (optional) list of individual snap services to start (otherwise all) - enable (bool): (optional) flag to enable snap services on start. Default `false` - """ - args = ["start", "--enable"] if enable else ["start"] - self._snap_daemons(args, services) - - def stop(self, services: Optional[List[str]] = None, disable: Optional[bool] = False) -> None: - """Stop a snap's services. - - Args: - services (list): (optional) list of individual snap services to stop (otherwise all) - disable (bool): (optional) flag to disable snap services on stop. Default `False` - """ - args = ["stop", "--disable"] if disable else ["stop"] - self._snap_daemons(args, services) - - def logs(self, services: Optional[List[str]] = None, num_lines: Optional[int] = 10) -> str: - """Fetch a snap services' logs. - - Args: - services (list): (optional) list of individual snap services to show logs from - (otherwise all) - num_lines (int): (optional) integer number of log lines to return. Default `10` - """ - args = ["logs", "-n={}".format(num_lines)] if num_lines else ["logs"] - return self._snap_daemons(args, services).stdout - - def connect( - self, plug: str, service: Optional[str] = None, slot: Optional[str] = None - ) -> None: - """Connect a plug to a slot. - - Args: - plug (str): the plug to connect - service (str): (optional) the snap service name to plug into - slot (str): (optional) the snap service slot to plug in to - - Raises: - SnapError if there is a problem encountered - """ - command = ["connect", "{}:{}".format(self._name, plug)] - - if service and slot: - command = command + ["{}:{}".format(service, slot)] - elif slot: - command = command + [slot] - - args = ["snap", *command] - try: - subprocess.run(args, universal_newlines=True, check=True, capture_output=True) - except CalledProcessError as e: - raise SnapError("Could not {} for snap [{}]: {}".format(args, self._name, e.stderr)) - - def hold(self, duration: Optional[timedelta] = None) -> None: - """Add a refresh hold to a snap. - - Args: - duration: duration for the hold, or None (the default) to hold this snap indefinitely. - """ - hold_str = "forever" - if duration is not None: - seconds = round(duration.total_seconds()) - hold_str = f"{seconds}s" - self._snap("refresh", [f"--hold={hold_str}"]) - - def unhold(self) -> None: - """Remove the refresh hold of a snap.""" - self._snap("refresh", ["--unhold"]) - - def alias(self, application: str, alias: Optional[str] = None) -> None: - """Create an alias for a given application. - - Args: - application: application to get an alias. - alias: (optional) name of the alias; if not provided, the application name is used. - """ - if alias is None: - alias = application - args = ["snap", "alias", f"{self.name}.{application}", alias] - try: - subprocess.check_output(args, universal_newlines=True) - except CalledProcessError as e: - raise SnapError( - "Snap: {!r}; command {!r} failed with output = {!r}".format( - self._name, args, e.output - ) - ) - - def restart( - self, services: Optional[List[str]] = None, reload: Optional[bool] = False - ) -> None: - """Restarts a snap's services. - - Args: - services (list): (optional) list of individual snap services to restart. - (otherwise all) - reload (bool): (optional) flag to use the service reload command, if available. - Default `False` - """ - args = ["restart", "--reload"] if reload else ["restart"] - self._snap_daemons(args, services) - - def _install( - self, - channel: Optional[str] = "", - cohort: Optional[str] = "", - revision: Optional[str] = None, - ) -> None: - """Add a snap to the system. - - Args: - channel: the channel to install from - cohort: optional, the key of a cohort that this snap belongs to - revision: optional, the revision of the snap to install - """ - cohort = cohort or self._cohort - - args = [] - if self.confinement == "classic": - args.append("--classic") - if channel: - args.append('--channel="{}"'.format(channel)) - if revision: - args.append('--revision="{}"'.format(revision)) - if cohort: - args.append('--cohort="{}"'.format(cohort)) - - self._snap("install", args) - - def _refresh( - self, - channel: Optional[str] = "", - cohort: Optional[str] = "", - revision: Optional[str] = None, - leave_cohort: Optional[bool] = False, - ) -> None: - """Refresh a snap. - - Args: - channel: the channel to install from - cohort: optionally, specify a cohort. - revision: optionally, specify the revision of the snap to refresh - leave_cohort: leave the current cohort. - """ - args = [] - if channel: - args.append('--channel="{}"'.format(channel)) - - if revision: - args.append('--revision="{}"'.format(revision)) - - if not cohort: - cohort = self._cohort - - if leave_cohort: - self._cohort = "" - args.append("--leave-cohort") - elif cohort: - args.append('--cohort="{}"'.format(cohort)) - - self._snap("refresh", args) - - def _remove(self) -> str: - """Remove a snap from the system.""" - return self._snap("remove") - - @property - def name(self) -> str: - """Returns the name of the snap.""" - return self._name - - def ensure( - self, - state: SnapState, - classic: Optional[bool] = False, - channel: Optional[str] = "", - cohort: Optional[str] = "", - revision: Optional[str] = None, - ): - """Ensure that a snap is in a given state. - - Args: - state: a `SnapState` to reconcile to. - classic: an (Optional) boolean indicating whether classic confinement should be used - channel: the channel to install from - cohort: optional. Specify the key of a snap cohort. - revision: optional. the revision of the snap to install/refresh - - While both channel and revision could be specified, the underlying snap install/refresh - command will determine which one takes precedence (revision at this time) - - Raises: - SnapError if an error is encountered - """ - self._confinement = "classic" if classic or self._confinement == "classic" else "" - - if state not in (SnapState.Present, SnapState.Latest): - # We are attempting to remove this snap. - if self._state in (SnapState.Present, SnapState.Latest): - # The snap is installed, so we run _remove. - self._remove() - else: - # The snap is not installed -- no need to do anything. - pass - else: - # We are installing or refreshing a snap. - if self._state not in (SnapState.Present, SnapState.Latest): - # The snap is not installed, so we install it. - self._install(channel, cohort, revision) - else: - # The snap is installed, but we are changing it (e.g., switching channels). - self._refresh(channel, cohort, revision) - - self._update_snap_apps() - self._state = state - - def _update_snap_apps(self) -> None: - """Update a snap's apps after snap changes state.""" - try: - self._apps = self._snap_client.get_installed_snap_apps(self._name) - except SnapAPIError: - logger.debug("Unable to retrieve snap apps for {}".format(self._name)) - self._apps = [] - - @property - def present(self) -> bool: - """Report whether or not a snap is present.""" - return self._state in (SnapState.Present, SnapState.Latest) - - @property - def latest(self) -> bool: - """Report whether the snap is the most recent version.""" - return self._state is SnapState.Latest - - @property - def state(self) -> SnapState: - """Report the current snap state.""" - return self._state - - @state.setter - def state(self, state: SnapState) -> None: - """Set the snap state to a given value. - - Args: - state: a `SnapState` to reconcile the snap to. - - Raises: - SnapError if an error is encountered - """ - if self._state is not state: - self.ensure(state) - self._state = state - - @property - def revision(self) -> str: - """Returns the revision for a snap.""" - return self._revision - - @property - def channel(self) -> str: - """Returns the channel for a snap.""" - return self._channel - - @property - def confinement(self) -> str: - """Returns the confinement for a snap.""" - return self._confinement - - @property - def apps(self) -> List: - """Returns (if any) the installed apps of the snap.""" - self._update_snap_apps() - return self._apps - - @property - def services(self) -> Dict: - """Returns (if any) the installed services of the snap.""" - self._update_snap_apps() - services = {} - for app in self._apps: - if "daemon" in app: - services[app["name"]] = SnapService(**app).as_dict() - - return services - - @property - def held(self) -> bool: - """Report whether the snap has a hold.""" - info = self._snap("info") - return "hold:" in info - - -class _UnixSocketConnection(http.client.HTTPConnection): - """Implementation of HTTPConnection that connects to a named Unix socket.""" - - def __init__(self, host, timeout=None, socket_path=None): - if timeout is None: - super().__init__(host) - else: - super().__init__(host, timeout=timeout) - self.socket_path = socket_path - - def connect(self): - """Override connect to use Unix socket (instead of TCP socket).""" - if not hasattr(socket, "AF_UNIX"): - raise NotImplementedError("Unix sockets not supported on {}".format(sys.platform)) - self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.sock.connect(self.socket_path) - if self.timeout is not None: - self.sock.settimeout(self.timeout) - - -class _UnixSocketHandler(urllib.request.AbstractHTTPHandler): - """Implementation of HTTPHandler that uses a named Unix socket.""" - - def __init__(self, socket_path: str): - super().__init__() - self.socket_path = socket_path - - def http_open(self, req) -> http.client.HTTPResponse: - """Override http_open to use a Unix socket connection (instead of TCP).""" - return self.do_open(_UnixSocketConnection, req, socket_path=self.socket_path) - - -class SnapClient: - """Snapd API client to talk to HTTP over UNIX sockets. - - In order to avoid shelling out and/or involving sudo in calling the snapd API, - use a wrapper based on the Pebble Client, trimmed down to only the utility methods - needed for talking to snapd. - """ - - def __init__( - self, - socket_path: str = "/run/snapd.socket", - opener: Optional[urllib.request.OpenerDirector] = None, - base_url: str = "http://localhost/v2/", - timeout: float = 30.0, - ): - """Initialize a client instance. - - Args: - socket_path: a path to the socket on the filesystem. Defaults to /run/snap/snapd.socket - opener: specifies an opener for unix socket, if unspecified a default is used - base_url: base url for making requests to the snap client. Defaults to - http://localhost/v2/ - timeout: timeout in seconds to use when making requests to the API. Default is 30.0s. - """ - if opener is None: - opener = self._get_default_opener(socket_path) - self.opener = opener - self.base_url = base_url - self.timeout = timeout - - @classmethod - def _get_default_opener(cls, socket_path): - """Build the default opener to use for requests (HTTP over Unix socket).""" - opener = urllib.request.OpenerDirector() - opener.add_handler(_UnixSocketHandler(socket_path)) - opener.add_handler(urllib.request.HTTPDefaultErrorHandler()) - opener.add_handler(urllib.request.HTTPRedirectHandler()) - opener.add_handler(urllib.request.HTTPErrorProcessor()) - return opener - - def _request( - self, - method: str, - path: str, - query: Dict = None, - body: Dict = None, - ) -> JSONType: - """Make a JSON request to the Snapd server with the given HTTP method and path. - - If query dict is provided, it is encoded and appended as a query string - to the URL. If body dict is provided, it is serialied as JSON and used - as the HTTP body (with Content-Type: "application/json"). The resulting - body is decoded from JSON. - """ - headers = {"Accept": "application/json"} - data = None - if body is not None: - data = json.dumps(body).encode("utf-8") - headers["Content-Type"] = "application/json" - - response = self._request_raw(method, path, query, headers, data) - return json.loads(response.read().decode())["result"] - - def _request_raw( - self, - method: str, - path: str, - query: Dict = None, - headers: Dict = None, - data: bytes = None, - ) -> http.client.HTTPResponse: - """Make a request to the Snapd server; return the raw HTTPResponse object.""" - url = self.base_url + path - if query: - url = url + "?" + urllib.parse.urlencode(query) - - if headers is None: - headers = {} - request = urllib.request.Request(url, method=method, data=data, headers=headers) - - try: - response = self.opener.open(request, timeout=self.timeout) - except urllib.error.HTTPError as e: - code = e.code - status = e.reason - message = "" - try: - body = json.loads(e.read().decode())["result"] - except (IOError, ValueError, KeyError) as e2: - # Will only happen on read error or if Pebble sends invalid JSON. - body = {} - message = "{} - {}".format(type(e2).__name__, e2) - raise SnapAPIError(body, code, status, message) - except urllib.error.URLError as e: - raise SnapAPIError({}, 500, "Not found", e.reason) - return response - - def get_installed_snaps(self) -> Dict: - """Get information about currently installed snaps.""" - return self._request("GET", "snaps") - - def get_snap_information(self, name: str) -> Dict: - """Query the snap server for information about single snap.""" - return self._request("GET", "find", {"name": name})[0] - - def get_installed_snap_apps(self, name: str) -> List: - """Query the snap server for apps belonging to a named, currently installed snap.""" - return self._request("GET", "apps", {"names": name, "select": "service"}) - - -class SnapCache(Mapping): - """An abstraction to represent installed/available packages. - - When instantiated, `SnapCache` iterates through the list of installed - snaps using the `snapd` HTTP API, and a list of available snaps by reading - the filesystem to populate the cache. Information about available snaps is lazily-loaded - from the `snapd` API when requested. - """ - - def __init__(self): - if not self.snapd_installed: - raise SnapError("snapd is not installed or not in /usr/bin") from None - self._snap_client = SnapClient() - self._snap_map = {} - if self.snapd_installed: - self._load_available_snaps() - self._load_installed_snaps() - - def __contains__(self, key: str) -> bool: - """Check if a given snap is in the cache.""" - return key in self._snap_map - - def __len__(self) -> int: - """Report number of items in the snap cache.""" - return len(self._snap_map) - - def __iter__(self) -> Iterable["Snap"]: - """Provide iterator for the snap cache.""" - return iter(self._snap_map.values()) - - def __getitem__(self, snap_name: str) -> Snap: - """Return either the installed version or latest version for a given snap.""" - snap = self._snap_map.get(snap_name, None) - if snap is None: - # The snapd cache file may not have existed when _snap_map was - # populated. This is normal. - try: - self._snap_map[snap_name] = self._load_info(snap_name) - except SnapAPIError: - raise SnapNotFoundError("Snap '{}' not found!".format(snap_name)) - - return self._snap_map[snap_name] - - @property - def snapd_installed(self) -> bool: - """Check whether snapd has been installled on the system.""" - return os.path.isfile("/usr/bin/snap") - - def _load_available_snaps(self) -> None: - """Load the list of available snaps from disk. - - Leave them empty and lazily load later if asked for. - """ - if not os.path.isfile("/var/cache/snapd/names"): - # The snap catalog may not be populated yet; this is normal. - # snapd updates the cache infrequently and the cache file may not - # currently exist. - return - - with open("/var/cache/snapd/names", "r") as f: - for line in f: - if line.strip(): - self._snap_map[line.strip()] = None - - def _load_installed_snaps(self) -> None: - """Load the installed snaps into the dict.""" - installed = self._snap_client.get_installed_snaps() - - for i in installed: - snap = Snap( - name=i["name"], - state=SnapState.Latest, - channel=i["channel"], - revision=i["revision"], - confinement=i["confinement"], - apps=i.get("apps", None), - ) - self._snap_map[snap.name] = snap - - def _load_info(self, name) -> Snap: - """Load info for snaps which are not installed if requested. - - Args: - name: a string representing the name of the snap - """ - info = self._snap_client.get_snap_information(name) - - return Snap( - name=info["name"], - state=SnapState.Available, - channel=info["channel"], - revision=info["revision"], - confinement=info["confinement"], - apps=None, - ) - - -@_cache_init -def add( - snap_names: Union[str, List[str]], - state: Union[str, SnapState] = SnapState.Latest, - channel: Optional[str] = "", - classic: Optional[bool] = False, - cohort: Optional[str] = "", - revision: Optional[str] = None, -) -> Union[Snap, List[Snap]]: - """Add a snap to the system. - - Args: - snap_names: the name or names of the snaps to install - state: a string or `SnapState` representation of the desired state, one of - [`Present` or `Latest`] - channel: an (Optional) channel as a string. Defaults to 'latest' - classic: an (Optional) boolean specifying whether it should be added with classic - confinement. Default `False` - cohort: an (Optional) string specifying the snap cohort to use - revision: an (Optional) string specifying the snap revision to use - - Raises: - SnapError if some snaps failed to install or were not found. - """ - if not channel and not revision: - channel = "latest" - - snap_names = [snap_names] if isinstance(snap_names, str) else snap_names - if not snap_names: - raise TypeError("Expected at least one snap to add, received zero!") - - if isinstance(state, str): - state = SnapState(state) - - return _wrap_snap_operations(snap_names, state, channel, classic, cohort, revision) - - -@_cache_init -def remove(snap_names: Union[str, List[str]]) -> Union[Snap, List[Snap]]: - """Remove specified snap(s) from the system. - - Args: - snap_names: the name or names of the snaps to install - - Raises: - SnapError if some snaps failed to install. - """ - snap_names = [snap_names] if isinstance(snap_names, str) else snap_names - if not snap_names: - raise TypeError("Expected at least one snap to add, received zero!") - - return _wrap_snap_operations(snap_names, SnapState.Absent, "", False) - - -@_cache_init -def ensure( - snap_names: Union[str, List[str]], - state: str, - channel: Optional[str] = "", - classic: Optional[bool] = False, - cohort: Optional[str] = "", - revision: Optional[int] = None, -) -> Union[Snap, List[Snap]]: - """Ensure specified snaps are in a given state on the system. - - Args: - snap_names: the name(s) of the snaps to operate on - state: a string representation of the desired state, from `SnapState` - channel: an (Optional) channel as a string. Defaults to 'latest' - classic: an (Optional) boolean specifying whether it should be added with classic - confinement. Default `False` - cohort: an (Optional) string specifying the snap cohort to use - revision: an (Optional) integer specifying the snap revision to use - - When both channel and revision are specified, the underlying snap install/refresh - command will determine the precedence (revision at the time of adding this) - - Raises: - SnapError if the snap is not in the cache. - """ - if not revision and not channel: - channel = "latest" - - if state in ("present", "latest") or revision: - return add(snap_names, SnapState(state), channel, classic, cohort, revision) - else: - return remove(snap_names) - - -def _wrap_snap_operations( - snap_names: List[str], - state: SnapState, - channel: str, - classic: bool, - cohort: Optional[str] = "", - revision: Optional[str] = None, -) -> Union[Snap, List[Snap]]: - """Wrap common operations for bare commands.""" - snaps = {"success": [], "failed": []} - - op = "remove" if state is SnapState.Absent else "install or refresh" - - for s in snap_names: - try: - snap = _Cache[s] - if state is SnapState.Absent: - snap.ensure(state=SnapState.Absent) - else: - snap.ensure( - state=state, classic=classic, channel=channel, cohort=cohort, revision=revision - ) - snaps["success"].append(snap) - except SnapError as e: - logger.warning("Failed to {} snap {}: {}!".format(op, s, e.message)) - snaps["failed"].append(s) - except SnapNotFoundError: - logger.warning("Snap '{}' not found in cache!".format(s)) - snaps["failed"].append(s) - - if len(snaps["failed"]): - raise SnapError( - "Failed to install or refresh snap(s): {}".format(", ".join(list(snaps["failed"]))) - ) - - return snaps["success"] if len(snaps["success"]) > 1 else snaps["success"][0] - - -def install_local( - filename: str, classic: Optional[bool] = False, dangerous: Optional[bool] = False -) -> Snap: - """Perform a snap operation. - - Args: - filename: the path to a local .snap file to install - classic: whether to use classic confinement - dangerous: whether --dangerous should be passed to install snaps without a signature - - Raises: - SnapError if there is a problem encountered - """ - args = [ - "snap", - "install", - filename, - ] - if classic: - args.append("--classic") - if dangerous: - args.append("--dangerous") - try: - result = subprocess.check_output(args, universal_newlines=True).splitlines()[-1] - snap_name, _ = result.split(" ", 1) - snap_name = ansi_filter.sub("", snap_name) - - c = SnapCache() - - try: - return c[snap_name] - except SnapAPIError as e: - logger.error( - "Could not find snap {} when querying Snapd socket: {}".format(snap_name, e.body) - ) - raise SnapError("Failed to find snap {} in Snap cache".format(snap_name)) - except CalledProcessError as e: - raise SnapError("Could not install snap {}: {}".format(filename, e.output)) - - -def _system_set(config_item: str, value: str) -> None: - """Set system snapd config values. - - Args: - config_item: name of snap system setting. E.g. 'refresh.hold' - value: value to assign - """ - args = ["snap", "set", "system", "{}={}".format(config_item, value)] - try: - subprocess.check_call(args, universal_newlines=True) - except CalledProcessError: - raise SnapError("Failed setting system config '{}' to '{}'".format(config_item, value)) - - -def hold_refresh(days: int = 90, forever: bool = False) -> bool: - """Set the system-wide snap refresh hold. - - Args: - days: number of days to hold system refreshes for. Maximum 90. Set to zero to remove hold. - forever: if True, will set a hold forever. - """ - if not isinstance(forever, bool): - raise TypeError("forever must be a bool") - if not isinstance(days, int): - raise TypeError("days must be an int") - if forever: - _system_set("refresh.hold", "forever") - logger.info("Set system-wide snap refresh hold to: forever") - elif days == 0: - _system_set("refresh.hold", "") - logger.info("Removed system-wide snap refresh hold") - else: - # Currently the snap daemon can only hold for a maximum of 90 days - if not 1 <= days <= 90: - raise ValueError("days must be between 1 and 90") - # Add the number of days to current time - target_date = datetime.now(timezone.utc).astimezone() + timedelta(days=days) - # Format for the correct datetime format - hold_date = target_date.strftime("%Y-%m-%dT%H:%M:%S%z") - # Python dumps the offset in format '+0100', we need '+01:00' - hold_date = "{0}:{1}".format(hold_date[:-2], hold_date[-2:]) - # Actually set the hold date - _system_set("refresh.hold", hold_date) - logger.info("Set system-wide snap refresh hold to: %s", hold_date) diff --git a/machine_metadata.yaml b/machine_metadata.yaml deleted file mode 100644 index fb2baa16..00000000 --- a/machine_metadata.yaml +++ /dev/null @@ -1,71 +0,0 @@ -# Copyright 2021 Canonical Ltd. -# See LICENSE file for licensing details. - -name: grafana-agent - -description: | - Grafana Agent Subordinate Charm -summary: | - Grafana Agent is a telemetry collector for sending metrics, logs, - and trace data to the opinionated Grafana observability stack. -maintainers: - - Jose Massón - - Ryan Barry - - Leon Mintz - - Pietro Pasotti - - Dylan Stephano-Shachter - - Luca Bello - - Simon Aronsson - -#docs: https://discourse.charmhub.io/t/grafana-agent-k8s-docs-index/5605 -website: https://charmhub.io/grafana-agent -#source: https://github.com/canonical/grafana-agent-k8s-operator -#issues: https://github.com/canonical/grafana-agent-k8s-operator/issues - -subordinate: true -series: - - jammy - - focal - -requires: - certificates: - interface: tls-certificates - limit: 1 - description: | - Certificate for the grafana agent server (API endpoint is served on :12345 by default) - to use to authenticate to clients, and the CA certificate of the signing CA. - We currently assume that the same CA signs all scrape targets (e.g. principal units). - juju-info: - description: | - `juju-info` provides basic compatibility with all charms. - If all you want is /var/log logs and node_exporter metrics, - this relation will be enough. - interface: juju-info - scope: container - cos-agent: - description: | - `cos-agent` is a dedicated relation for the grafana agent machine - charm. It will allow you to set up custom scrape jobs, fetch files - from arbitrary locations, send alert rules, dashboards, etc. - interface: cos_agent - scope: container - - send-remote-write: - interface: prometheus_remote_write - logging-consumer: - interface: loki_push_api - grafana-cloud-config: - interface: grafana_cloud_config - limit: 1 - receive-ca-cert: - interface: certificate_transfer - description: | - Obtain TLS information (certificate, ca, chain) from another charm. - -provides: - grafana-dashboards-provider: - interface: grafana_dashboard - -peers: - peers: - interface: grafana_agent_replica diff --git a/k8s_metadata.yaml b/metadata.yaml similarity index 100% rename from k8s_metadata.yaml rename to metadata.yaml diff --git a/src/k8s_charm.py b/src/charm.py similarity index 100% rename from src/k8s_charm.py rename to src/charm.py diff --git a/src/grafana_agent.py b/src/grafana_agent.py index 764e095d..0294b3b9 100644 --- a/src/grafana_agent.py +++ b/src/grafana_agent.py @@ -22,7 +22,6 @@ from charms.certificate_transfer_interface.v0.certificate_transfer import ( CertificateTransferRequires, ) -from charms.grafana_agent.v0.cos_agent import MultiplePrincipalsError from charms.grafana_cloud_integrator.v0.cloud_config_requirer import ( GrafanaCloudConfigRequirer, ) @@ -549,11 +548,7 @@ def _update_config(self) -> None: else: self.delete_file(self._ca_path) - try: - config = self._generate_config() - except MultiplePrincipalsError as e: - self.status.update_config = BlockedStatus(str(e)) - return + config = self._generate_config() try: old_config = yaml.safe_load(self.read_file(CONFIG_PATH)) diff --git a/src/machine_charm.py b/src/machine_charm.py deleted file mode 100755 index f3948c8f..00000000 --- a/src/machine_charm.py +++ /dev/null @@ -1,626 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright 2022 Canonical Ltd. -# See LICENSE file for licensing details. - -"""A juju charm for Grafana Agent on Kubernetes.""" -import json -import logging -import os -import re -import subprocess -from dataclasses import dataclass, field -from pathlib import Path -from typing import Any, Dict, List, Optional, Union - -from charms.grafana_agent.v0.cos_agent import COSAgentRequirer, MultiplePrincipalsError -from charms.operator_libs_linux.v2 import snap # type: ignore -from cosl import JujuTopology -from cosl.rules import AlertRules -from grafana_agent import METRICS_RULES_SRC_PATH, GrafanaAgentCharm -from ops.main import main -from ops.model import BlockedStatus, MaintenanceStatus, Relation, Unit - -logger = logging.getLogger(__name__) - -_FsType = str -_MountOption = str -_MountOptions = List[_MountOption] - - -@dataclass -class _SnapFstabEntry: - """Representation of an individual fstab entry for snap plugs.""" - - source: str - target: str - fstype: Union[_FsType, None] - options: _MountOptions - dump: int - fsck: int - - owner: str = field(init=False) - endpoint_source: str = field(init=False) - relative_target: str = field(init=False) - - def __post_init__(self): - """Populate with calculated values at runtime.""" - self.owner = re.sub( - r"^(.*?)?/snap/(?P([A-Za-z0-9_-])+)/.*$", r"\g", self.source - ) - self.endpoint_source = re.sub( - r"^(.*?)?/snap/([A-Za-z0-9_-])+/(?P.*$)", r"\g", self.source - ) - self.relative_target = re.sub( - r"^(.*?)?/snap/grafana-agent/\d+/shared-logs+(?P/.*$)", r"\g", self.target - ) - - -@dataclass -class SnapFstab: - """Build a small representation/wrapper for snap fstab files.""" - - fstab_file: Union[Path, str] - entries: List[_SnapFstabEntry] = field(init=False) - - def __post_init__(self): - """Populate with calculated values at runtime.""" - self.fstab_file = ( - self.fstab_file if isinstance(self.fstab_file, Path) else Path(self.fstab_file) - ) - if not self.fstab_file.exists(): - self.entries = [] - return - - entries = [] - for line in self.fstab_file.read_text().split("\n"): - if not line.strip(): - # skip whitespace-only lines - continue - raw_entry = line.split() - fields = { - "source": raw_entry[0], - "target": raw_entry[1], - "fstype": None if raw_entry[2] == "none" else raw_entry[2], - "options": raw_entry[3].split(","), - "dump": int(raw_entry[4]), - "fsck": int(raw_entry[5]), - } - entry = _SnapFstabEntry(**fields) - entries.append(entry) - - self.entries = entries - - def entry(self, owner: str, endpoint_name: Optional[str]) -> Optional[_SnapFstabEntry]: - """Find and return a specific entry if it exists.""" - entries = [e for e in self.entries if e.owner == owner] - - if len(entries) > 1 and endpoint_name: - # If there's more than one entry, the endpoint name may not directly map to - # the source *or* path. charmed-kafka uses 'logs' as the plug name, and maps - # .../common/logs to .../log inside Grafana Agent - # - # The only meaningful scenario in which this could happen (multiple fstab - # entries with the same snap "owning" the originating path) is if a snap provides - # multiple paths as part of the same plug. - # - # In this case, for a cheap comparison (rather than implementing some recursive - # LCS just for this), convert all possible endpoint sources into a list of unique - # characters, as well as the endpoint name, and build a sequence of entries with - # a value that's the length of the intersection, the pick the first one i.e. the one - # with the largest intersection. - ordered_entries = sorted( - entries, - # descending order - reverse=True, - # size of the character-level similarity of the two strings - key=lambda e: len(set(endpoint_name) & set(e.endpoint_source)), - ) - return ordered_entries[0] - - if len(entries) > 1 or not entries: - logger.debug( - "Ambiguous or unknown mountpoint for snap %s at slot %s, not relabeling.", - owner, - endpoint_name, - ) - return None - - return entries[0] - - -class GrafanaAgentError(Exception): - """Custom exception type for Grafana Agent.""" - - pass - - -class GrafanaAgentInstallError(GrafanaAgentError): - """Custom exception type for install related errors.""" - - pass - - -class GrafanaAgentServiceError(GrafanaAgentError): - """Custom exception type for service related errors.""" - - pass - - -class GrafanaAgentMachineCharm(GrafanaAgentCharm): - """Machine version of the Grafana Agent charm.""" - - service_name = "grafana-agent.grafana-agent" - - mandatory_relation_pairs = { - "cos-agent": [ # must be paired with: - {"grafana-cloud-config"}, # or - {"send-remote-write", "logging-consumer", "grafana-dashboards-provider"}, - ], - "juju-info": [ # must be paired with: - {"grafana-cloud-config"}, # or - {"send-remote-write", "logging-consumer"}, - ], - } - - def __init__(self, *args): - super().__init__(*args) - # technically, only one of 'cos-agent' and 'juju-info' are likely to ever be active at - # any given time. however, for the sake of understandability, we always set _cos, and - # we always listen to juju-info-joined events even though one of the two paths will be - # at all effects unused. - self._cos = COSAgentRequirer(self) - self.snap = snap.SnapCache()["grafana-agent"] - self.framework.observe( - self._cos.on.data_changed, # pyright: ignore - self._on_cos_data_changed, - ) - self.framework.observe( - self._cos.on.validation_error, self._on_cos_validation_error # pyright: ignore - ) - self.framework.observe(self.on["juju_info"].relation_joined, self._on_juju_info_joined) - self.framework.observe(self.on.install, self.on_install) - self.framework.observe(self.on.start, self._on_start) - self.framework.observe(self.on.stop, self._on_stop) - self.framework.observe(self.on.remove, self._on_remove) - - def _on_juju_info_joined(self, _event): - """Update the config when Juju info is joined.""" - self._update_config() - self._update_status() - - def _on_cos_data_changed(self, event): - """Trigger renewals of all data if there is a change.""" - try: - self._connect_logging_snap_endpoints() - self._update_config() - self._update_status() - self._update_metrics_alerts() - self._update_loki_alerts() - self._update_grafana_dashboards() - except MultiplePrincipalsError: - logger.error( - "Multiple applications claiming to be principle. Update the cos-agent library in the client application charms." - ) - self.unit.status = BlockedStatus("Multiple Principal Applications") - event.defer() - - def _on_cos_validation_error(self, event): - msg_text = "Validation errors for cos-agent relation - check juju debug-log." - self.status.validation_error = BlockedStatus(msg_text) - - messages = event.message.split("\n") - logger.error("%s:", messages[0]) - - for msg in messages[1:]: - logger.error(msg) - - self._update_status() - - def on_install(self, _event) -> None: - """Install the Grafana Agent snap.""" - # Check if Grafana Agent is installed - self.unit.status = MaintenanceStatus("Installing grafana-agent snap") - try: - self.snap.ensure(state=snap.SnapState.Latest) - except snap.SnapError as e: - raise GrafanaAgentInstallError("Failed to install grafana-agent.") from e - - def _on_start(self, _event) -> None: - # Ensure the config is up-to-date before we start to avoid racy relation - # changes and starting with a "bare" config in ActiveStatus - self._update_config() - self.unit.status = MaintenanceStatus("Starting grafana-agent snap") - - try: - self.snap.start(enable=True) - except snap.SnapError as e: - raise GrafanaAgentServiceError("Failed to start grafana-agent") from e - - self._update_status() - - def _on_stop(self, _event) -> None: - self.unit.status = MaintenanceStatus("Stopping grafana-agent snap") - try: - self.snap.stop(disable=True) - except snap.SnapError as e: - raise GrafanaAgentServiceError("Failed to stop grafana-agent") from e - - self._update_status() - - def _on_remove(self, _event) -> None: - """Uninstall the Grafana Agent snap.""" - self.unit.status = MaintenanceStatus("Uninstalling grafana-agent snap") - try: - self.snap.ensure(state=snap.SnapState.Absent) - except snap.SnapError as e: - raise GrafanaAgentInstallError("Failed to uninstall grafana-agent") from e - - @property - def is_k8s(self) -> bool: - """Is this a k8s charm.""" - return False - - def metrics_rules(self) -> Dict[str, Any]: - """Return a list of metrics rules.""" - rules = self._cos.metrics_alerts - - # Determine the principal topology. - principal_topology = self.principal_topology - if principal_topology: - topology = JujuTopology( - model=principal_topology["juju_model"], - model_uuid=principal_topology["juju_model_uuid"], - application=principal_topology["juju_application"], - unit=principal_topology["juju_unit"], - ) - else: - return {} - - # Replace any existing topology labels with those from the principal. - for identifier in rules: - for group in rules[identifier]["groups"]: - for rule in group["rules"]: - rule["labels"]["juju_model"] = principal_topology["juju_model"] - rule["labels"]["juju_model_uuid"] = principal_topology["juju_model_uuid"] - rule["labels"]["juju_application"] = principal_topology["juju_application"] - - # Get the rules defined by Grafana Agent itself. - own_rules = AlertRules(query_type="promql", topology=topology) - own_rules.add_path(METRICS_RULES_SRC_PATH) - if topology.identifier in rules: - rules[topology.identifier]["groups"] += own_rules.as_dict()["groups"] - else: - rules[topology.identifier] = own_rules.as_dict() - - return rules - - def metrics_jobs(self) -> list: - """Return a list of metrics scrape jobs.""" - jobs = self._cos.metrics_jobs - for job in jobs: - static_configs = job.get("static_configs", []) - for static_config in static_configs: - static_config["labels"] = { - # Be sure to keep labels from static_config - **static_config.get("labels", {}), - **self._principal_labels, - } - return jobs - - def logs_rules(self) -> Dict[str, Any]: - """Return a list of logging rules.""" - return self._cos.logs_alerts - - @property - def dashboards(self) -> list: - """Return a list of dashboards.""" - return self._cos.dashboards - - @property - def is_ready(self) -> bool: - """Checks if the charm is ready for configuration.""" - return self._is_installed - - def agent_version_output(self) -> str: - """Runs `agent -version` and returns the output. - - Returns: - Output of `agent -version` - """ - return subprocess.run(["/bin/agent", "-version"], capture_output=True, text=True).stdout - - def read_file(self, filepath: Union[str, Path]): - """Read a file's contents. - - Returns: - A string with the file's contents - """ - with open(filepath) as f: - return f.read() - - def write_file(self, path: Union[str, Path], text: str) -> None: - """Write text to a file. - - Args: - path: file path to write to - text: text to write to the file - """ - with open(path, "w") as f: - f.write(text) - - def delete_file(self, path: Union[str, Path]): - """Delete a file. - - Args: - path: file to be deleted - """ - os.remove(path) - - def stop(self) -> None: - """Stop grafana agent.""" - try: - self.snap.stop() - except snap.SnapError as e: - raise GrafanaAgentServiceError("Failed to restart grafana-agent") from e - - def restart(self) -> None: - """Restart grafana agent.""" - try: - self.snap.restart() - except snap.SnapError as e: - raise GrafanaAgentServiceError("Failed to restart grafana-agent") from e - - def run(self, cmd: List[str]): - """Run cmd on the workload. - - Args: - cmd: Command to be run. - """ - subprocess.run(cmd) - - @property - def _is_installed(self) -> bool: - """Check if the Grafana Agent snap is installed.""" - return self.snap.present - - @property - def _additional_integrations(self) -> Dict[str, Any]: - """Additional integrations for machine charms.""" - node_exporter_job_name = ( - f"juju_{self.model.name}_{self.model.uuid}_{self.model.app.name}_node-exporter" - ) - return { - "node_exporter": { - "enabled": True, - "enable_collectors": [ - "logind", - "systemd", - "mountstats", - "processes", - "sysctl", - ], - "sysctl_include": [ - "net.ipv4.neigh.default.gc_thresh3", - ], - "relabel_configs": [ - # Align the "job" name with those of prometheus_scrape - { - "target_label": "job", - "regex": "(.*)", - "replacement": node_exporter_job_name, - }, - ] - + self._principal_relabeling_config, - } - } - - @property - def _additional_log_configs(self) -> List[Dict[str, Any]]: - """Additional logging configuration for machine charms.""" - _, loki_endpoints = self._enrich_endpoints() - return [ - { - "name": "log_file_scraper", - "clients": loki_endpoints, - "scrape_configs": [ - { - "job_name": "varlog", - "pipeline_stages": [ - { - "drop": { - "expression": ".*file is a directory.*", - }, - }, - ], - "static_configs": [ - { - "targets": ["localhost"], - "labels": { - "__path__": "/var/log/*log", - **self._principal_labels, - }, - } - ], - }, - { - "job_name": "syslog", - "journal": {"labels": self._principal_labels}, - "pipeline_stages": [ - { - "drop": { - "expression": ".*file is a directory.*", - }, - }, - ], - }, - ] - + self._snap_plugs_logging_configs, - } - ] - - @property - def _agent_relations(self) -> List[Relation]: - """Return all relations from botih cos-agent and juju-info.""" - return self.model.relations["cos-agent"] + self.model.relations["juju-info"] - - @property - def _principal_relation(self) -> Optional[Relation]: - """The cos-agent relation, if the charm we're related to supports it, else juju-info.""" - # juju relate will do "the right thing" and default to cos-agent, falling back to - # juju-info if no cos-agent endpoint is available on the principal. - # Technically, if the charm is executing, there MUST be one of these two relations - # (otherwise, the subordinate won't even execute). However, for the sake of juju maybe not - # showing us the relation until after the first few install/start/config-changed, we err on - # the safe side and type this as Optional. - principal_relations = [] - for relation in self._agent_relations: - if not relation.units: - continue - if relation.name == "juju-info": - principal_relations.append(relation) - continue - relation_data = json.loads( - relation.data[next(iter(relation.units))].get("config", "{}") - ) - if not relation_data: - continue - if not relation_data.get("subordinate", False): - principal_relations.append(relation) - if len(principal_relations) > 1: - raise MultiplePrincipalsError("Multiple Principle Applications") - if len(principal_relations) == 1: - return principal_relations[0] - return None - - @property - def principal_unit(self) -> Optional[Unit]: - """Return the principal unit this charm is subordinated to.""" - relation = self._principal_relation - if relation and relation.units: - # Here, we could have popped the set and put the unit back or - # memoized the function, but in the interest of backwards compatibility - # with older python versions and avoiding adding temporary state to - # the charm instance, we choose this somewhat unsightly option. - return next(iter(relation.units)) - return None - - @property - def principal_topology( - self, - ) -> Dict[str, str]: - """Return the topology of the principal unit.""" - unit = self.principal_unit - if unit: - # Note we can't include juju_charm as that information is not available to us. - return { - "juju_model": self.model.name, - "juju_model_uuid": self.model.uuid, - "juju_application": unit.app.name, - "juju_unit": unit.name, - } - return {} - - @property - def _instance_topology(self) -> Dict[str, str]: - return self.principal_topology - - @property - def _principal_labels(self) -> Dict[str, str]: - """Return a dict with labels from the topology of the principal charm.""" - return { - # Dict ordering will give the appropriate result here - "instance": self._instance_name, - **self._instance_topology, - } - - @property - def _principal_relabeling_config(self) -> list: - """Return a relabelling config with labels from the topology of the principal charm.""" - topology_relabels = ( - [ - { - "source_labels": ["__address__"], - "target_label": key, - "replacement": value, - } - for key, value in self._instance_topology.items() - ] - if self._principal_labels - else [] - ) - - return [ - { - "target_label": "instance", - "regex": "(.*)", - "replacement": self._instance_name, - } - ] + topology_relabels # type: ignore - - @property - def _snap_plugs_logging_configs(self) -> List[Dict[str, Any]]: - """One logging config for each separate snap connected over the "logs" endpoint.""" - agent_fstab = SnapFstab(Path("/var/lib/snapd/mount/snap.grafana-agent.fstab")) - - shared_logs_configs = [] - for endpoint in self._cos.snap_log_endpoints: - fstab_entry = agent_fstab.entry(endpoint.owner, endpoint.name) - target_path = ( - f"{fstab_entry.target}/**" - if fstab_entry - else "/snap/grafana-agent/current/shared-logs/**" - ) - job = { - "job_name": endpoint.owner, - "static_configs": [ - { - "targets": ["localhost"], - "labels": { - "job": endpoint.owner, - "__path__": target_path, - **{ - k: v - for k, v in self._instance_topology.items() - if k not in ["juju_unit", "juju_application"] - }, - }, - } - ], - "pipeline_stages": [ - { - "drop": { - "expression": ".*file is a directory.*", - }, - }, - ], - } - - if fstab_entry: - job["relabel_configs"] = [ - { - "source_labels": ["__path__"], - "target_label": "path", - "replacement": fstab_entry.relative_target, - } - ] - - shared_logs_configs.append(job) - - return shared_logs_configs - - def _connect_logging_snap_endpoints(self): - for plug in self._cos.snap_log_endpoints: - try: - self.snap.connect("logs", service=plug.owner, slot=plug.name) - except snap.SnapError as e: - logger.error(f"error connecting plug {plug} to grafana-agent:logs") - logger.error(e.message) - - def positions_dir(self) -> str: - """Return the positions directory.""" - return "${SNAP_DATA}" - - -if __name__ == "__main__": - main(GrafanaAgentMachineCharm) diff --git a/tests/integration-machine/conftest.py b/tests/integration-machine/conftest.py deleted file mode 100644 index 6abee52e..00000000 --- a/tests/integration-machine/conftest.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright 2021 Canonical Ltd. -# See LICENSE file for licensing details. -import functools -import logging -from collections import defaultdict -from datetime import datetime - -import pytest -from pytest_operator.plugin import OpsTest - -logger = logging.getLogger(__name__) - - -class Store(defaultdict): - def __init__(self): - super(Store, self).__init__(Store) - - def __getattr__(self, key): - """Override __getattr__ so dot syntax works on keys.""" - try: - return self[key] - except KeyError: - raise AttributeError(key) - - def __setattr__(self, key, value): - """Override __setattr__ so dot syntax works on keys.""" - self[key] = value - - -store = Store() - - -def timed_memoizer(func): - @functools.wraps(func) - async def wrapper(*args, **kwargs): - fname = func.__qualname__ - logger.info("Started: %s" % fname) - start_time = datetime.now() - if fname in store.keys(): - ret = store[fname] - else: - logger.info("Return for {} not cached".format(fname)) - ret = await func(*args, **kwargs) - store[fname] = ret - logger.info("Finished: {} in: {} seconds".format(fname, datetime.now() - start_time)) - return ret - - return wrapper - - -@pytest.fixture(scope="module") -@timed_memoizer -async def grafana_agent_charm(ops_test: OpsTest): - """Grafana-agent charm used for integration testing.""" - charm = await ops_test.build_charm(".") - return charm diff --git a/tests/integration-machine/test_juju_info.py b/tests/integration-machine/test_juju_info.py deleted file mode 100644 index cb300278..00000000 --- a/tests/integration-machine/test_juju_info.py +++ /dev/null @@ -1,138 +0,0 @@ -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - -import asyncio -import json -import logging -from types import SimpleNamespace -from typing import List - -import pytest -from juju.errors import JujuError -from pytest_operator.plugin import OpsTest - -agent = SimpleNamespace(name="agent") -principal = SimpleNamespace(charm="ubuntu", name="principal") - -logger = logging.getLogger(__name__) - -topology_labels = { - "juju_application", - # "juju_charm", # juju_charm is present in the grafana agent's self scrape only - "juju_model", - "juju_model_uuid", - "juju_unit", -} - - -@pytest.mark.abort_on_fail -async def test_build_and_deploy(ops_test: OpsTest, grafana_agent_charm): - await asyncio.gather( - # Principal - ops_test.model.deploy( - principal.charm, application_name=principal.name, num_units=2, series="jammy" - ), - # Subordinate - ops_test.model.deploy( - grafana_agent_charm, application_name=agent.name, num_units=0, series="jammy" - ), - ) - - # grafana agent is in 'unknown' status until related, so wait only for the principal - await ops_test.model.wait_for_idle(apps=[principal.name]) - - -@pytest.mark.abort_on_fail -async def test_service(ops_test: OpsTest): - # WHEN the charm is related to a principal over `juju-info` - await ops_test.model.add_relation("agent:juju-info", principal.name) - await ops_test.model.wait_for_idle(status="active") - - # THEN all units of the principal have the charm in 'enabled/active' state - # $ juju ssh agent/0 snap services grafana-agent - # Service Startup Current Notes - # grafana-agent.grafana-agent enabled active - - machines: List[str] = await ops_test.model.get_machines() - for machine_id in machines: - try: - await ops_test.model.machines[machine_id].ssh( - "snap services grafana-agent | grep 'enabled.*active'" - ) - except JujuError as e: - pytest.fail(f"snap is not enabled/active in unit {machine_id}: {e.message}") - - -@pytest.mark.abort_on_fail -async def test_metrics(ops_test: OpsTest): - # Wait the scrape interval to make sure all "state" keys turned from unknown to up (or down). - await asyncio.sleep(60) - - machines: List[str] = await ops_test.model.get_machines() - - # AND juju topology labels are present for all targets and all targets are 'up' - machine_targets = { - machine_id: await ops_test.model.machines[machine_id].ssh( - "curl localhost:12345/agent/api/v1/metrics/targets" - ) - for machine_id in machines - } - machine_targets = {k: json.loads(v)["data"] for k, v in machine_targets.items()} - assert len(machine_targets) > 1 # Self-scrape + node-exporter - for targets in machine_targets.values(): - for target in targets: - target_labels = target["labels"].keys() - assert topology_labels.issubset(target_labels) - assert target["state"] == "up" - - # $ juju ssh agent/0 curl localhost:12345/agent/api/v1/metrics/targets - # { - # "status": "success", - # "data": [ - # { - # "instance": "243a344db344241f404868d04272fc76", - # "target_group": "integrations/agent", - # "endpoint": "http://127.0.0.1:12345/integrations/agent/metrics", - # "state": "up", - # "labels": { - # "agent_hostname": "juju-f48d37-1", - # "instance": "test-charm-hz7v_8df47ec8-0c18-..._principal_principal/1", - # "job": "juju_test-charm-hz7v_8df47ec8-0c18-..._agent_self-monitoring", - # "juju_application": "agent", - # "juju_charm": "grafana-agent", - # "juju_model": "test-charm-hz7v", - # "juju_model_uuid": "8df47ec8-0c18-465a-8b68-a07188f48d37", - # "juju_unit": "agent/0" - # }, - # "discovered_labels": { - # "__address__": "127.0.0.1:12345", - # "__metrics_path__": "/integrations/agent/metrics", - # "__scheme__": "http", - # "__scrape_interval__": "1m", - # "__scrape_timeout__": "10s", - # "agent_hostname": "juju-f48d37-1", - # "job": "integrations/agent" - # }, - # "last_scrape": "2023-03-09T22:31:16.5693783Z", - # "scrape_duration_ms": 2, - # "scrape_error": "" - # }, - # ... - - -@pytest.mark.xfail # agent return an empty reply (bug) -async def test_logs(ops_test: OpsTest): - machines: List[str] = await ops_test.model.get_machines() - - # AND juju topology labels are present for all targets - machine_targets = { - machine_id: await ops_test.model.machines[machine_id].ssh( - "curl localhost:12345/agent/api/v1/logs/targets" - ) - for machine_id in machines - } - machine_targets = {k: json.loads(v)["data"] for k, v in machine_targets.items()} - assert len(machine_targets) > 1 # Self-scrape + node-exporter - for targets in machine_targets.values(): - for target in targets: - target_labels = target["labels"].keys() - assert topology_labels.issubset(target_labels) diff --git a/tests/scenario/helpers.py b/tests/scenario/helpers.py index 5c3ca66c..1e23b17c 100644 --- a/tests/scenario/helpers.py +++ b/tests/scenario/helpers.py @@ -1,25 +1,12 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. -import inspect from pathlib import Path -import k8s_charm -import machine_charm import yaml CHARM_ROOT = Path(__file__).parent.parent.parent def get_charm_meta(charm_type) -> dict: - if charm_type is machine_charm.GrafanaAgentMachineCharm: - fname = "machine_metadata" - elif charm_type is k8s_charm.GrafanaAgentK8sCharm: - fname = "k8s_metadata" - else: - raise TypeError(charm_type) - - charm_source_path = Path(inspect.getfile(charm_type)) - charm_root = charm_source_path.parent.parent - - raw_meta = (charm_root / fname).with_suffix(".yaml").read_text() + raw_meta = (CHARM_ROOT / "metadata").with_suffix(".yaml").read_text() return yaml.safe_load(raw_meta) diff --git a/tests/scenario/test_k8s/conftest.py b/tests/scenario/test_k8s/conftest.py index 00c6e9bd..e0a1786b 100644 --- a/tests/scenario/test_k8s/conftest.py +++ b/tests/scenario/test_k8s/conftest.py @@ -7,5 +7,5 @@ @pytest.fixture(autouse=True) def patch_all(): - with patch("k8s_charm.KubernetesServicePatch", lambda x, y: None): + with patch("charm.KubernetesServicePatch", lambda x, y: None): yield diff --git a/tests/scenario/test_k8s/test_dashboard_transfer.py b/tests/scenario/test_k8s/test_dashboard_transfer.py index c3b48b19..2e37c282 100644 --- a/tests/scenario/test_k8s/test_dashboard_transfer.py +++ b/tests/scenario/test_k8s/test_dashboard_transfer.py @@ -2,8 +2,8 @@ # See LICENSE file for licensing details. import json +from charm import GrafanaAgentK8sCharm from cosl import GrafanaDashboard -from k8s_charm import GrafanaAgentK8sCharm from scenario import Container, Context, Relation, State diff --git a/tests/scenario/test_machine_charm/conftest.py b/tests/scenario/test_machine_charm/conftest.py deleted file mode 100644 index 1ef2c1ea..00000000 --- a/tests/scenario/test_machine_charm/conftest.py +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright 2022 Canonical Ltd. -# See LICENSE file for licensing details. -import pytest - - -@pytest.fixture -def placeholder_cfg_path(tmp_path): - return tmp_path / "foo.yaml" diff --git a/tests/scenario/test_machine_charm/helpers.py b/tests/scenario/test_machine_charm/helpers.py deleted file mode 100644 index 14bb852c..00000000 --- a/tests/scenario/test_machine_charm/helpers.py +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. -from unittest.mock import MagicMock - - -def set_run_out(mock_run, returncode: int = 0, stdout: str = "", stderr: str = ""): - mock_stdout = MagicMock() - mock_stdout.configure_mock( - **{ - "returncode": returncode, - "stdout.decode.return_value": stdout, - "stderr.decode.return_value": stderr, - } - ) - mock_run.return_value = mock_stdout diff --git a/tests/scenario/test_machine_charm/test_alert_labels.py b/tests/scenario/test_machine_charm/test_alert_labels.py deleted file mode 100644 index d0c7e5b3..00000000 --- a/tests/scenario/test_machine_charm/test_alert_labels.py +++ /dev/null @@ -1,121 +0,0 @@ -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - -import json - -import machine_charm -from scenario import Context, PeerRelation, Relation, State, SubordinateRelation - -from tests.scenario.helpers import get_charm_meta - - -def test_metrics_alert_rule_labels(vroot): - """Check that metrics alert rules are labeled with principal topology.""" - cos_agent_primary_data = { - "config": json.dumps( - { - "subordinate": False, - "metrics_alert_rules": { - "groups": [ - { - "name": "alertgroup", - "rules": [ - { - "alert": "Missing", - "expr": "up == 0", - "for": "0m", - "labels": { - "juju_model": "machine", - "juju_model_uuid": "74a5690b-89c9-44dd-984b-f69f26a6b751", - "juju_application": "primary", - }, - } - ], - } - ] - }, - "log_alert_rules": {}, - "dashboards": [ - "/Td6WFoAAATm1rRGAgAhARYAAAB0L+WjAQAmCnsKICAidGl0bGUiOiAi" - "Zm9vIiwKICAiYmFyIiA6ICJiYXoiCn0KAACkcc0YFt15xAABPyd8KlLdH7bzfQEAAAAABFla" - ], - "metrics_scrape_jobs": [ - {"job_name": "primary_0", "path": "/metrics", "port": "8080"} - ], - "log_slots": ["foo:bar"], - } - ) - } - - cos_agent_subordinate_data = { - "config": json.dumps( - { - "subordinate": True, - "metrics_alert_rules": { - "groups": [ - { - "name": "alertgroup", - "rules": [ - { - "alert": "Missing", - "expr": "up == 0", - "for": "0m", - "labels": { - "juju_model": "machine", - "juju_model_uuid": "74a5690b-89c9-44dd-984b-f69f26a6b751", - "juju_application": "subordinate", - }, - } - ], - } - ] - }, - "log_alert_rules": {}, - "dashboards": [ - "/Td6WFoAAATm1rRGAgAhARYAAAB0L+WjAQAmCnsKICAidGl0bGUiOiAi" - "Zm9vIiwKICAiYmFyIiA6ICJiYXoiCn0KAACkcc0YFt15xAABPyd8KlLdH7bzfQEAAAAABFla" - ], - "metrics_scrape_jobs": [ - {"job_name": "subordinate_0", "path": "/metrics", "port": "8081"} - ], - "log_slots": ["oh:snap"], - } - ) - } - - cos_agent_primary_relation = SubordinateRelation( - "cos-agent", remote_app_name="primary", remote_unit_data=cos_agent_primary_data - ) - cos_agent_subordinate_relation = SubordinateRelation( - "cos-agent", remote_app_name="subordinate", remote_unit_data=cos_agent_subordinate_data - ) - remote_write_relation = Relation("send-remote-write", remote_app_name="prometheus") - - context = Context( - charm_type=machine_charm.GrafanaAgentMachineCharm, - meta=get_charm_meta(machine_charm.GrafanaAgentMachineCharm), - charm_root=vroot, - ) - state = State( - leader=True, - relations=[ - cos_agent_primary_relation, - cos_agent_subordinate_relation, - remote_write_relation, - PeerRelation("peers"), - ], - ) - state_0 = context.run(event=cos_agent_primary_relation.changed_event, state=state) - (vroot / "metadata.yaml").unlink() - (vroot / "config.yaml").unlink() - (vroot / "actions.yaml").unlink() - state_1 = context.run(event=cos_agent_subordinate_relation.changed_event, state=state_0) - (vroot / "metadata.yaml").unlink() - (vroot / "config.yaml").unlink() - (vroot / "actions.yaml").unlink() - state_2 = context.run(event=remote_write_relation.joined_event, state=state_1) - - alert_rules = json.loads(state_2.relations[2].local_app_data["alert_rules"]) - for group in alert_rules["groups"]: - for rule in group["rules"]: - assert rule["labels"]["juju_application"] == "primary" diff --git a/tests/scenario/test_machine_charm/test_cos_agent_e2e.py b/tests/scenario/test_machine_charm/test_cos_agent_e2e.py deleted file mode 100644 index 6ca20c32..00000000 --- a/tests/scenario/test_machine_charm/test_cos_agent_e2e.py +++ /dev/null @@ -1,218 +0,0 @@ -# Copyright 2021 Canonical Ltd. -# See LICENSE file for licensing details. -import json -import os -import tempfile -from pathlib import Path -from unittest.mock import MagicMock, PropertyMock, patch - -import pytest -from charms.grafana_agent.v0.cos_agent import ( - CosAgentPeersUnitData, - COSAgentProvider, - COSAgentRequirer, -) -from ops.charm import CharmBase -from ops.framework import Framework -from scenario import Context, PeerRelation, State, SubordinateRelation - - -@pytest.fixture -def placeholder_cfg_path(tmp_path): - return tmp_path / "foo.yaml" - - -PROM_RULE = """alert: HostCpuHighIowait -expr: avg by (instance) (rate(node_cpu_seconds_total{mode="iowait"}[5m])) * 100 > 10 -for: 0m -labels: - severity: warning -annotations: - summary: Host CPU high iowait (instance {{ $labels.instance }}) - description: "CPU iowait > 10%. A high iowait means that you are disk or network bound.\n VALUE = {{ $value }}\n LABELS = {{ $labels }}" -""" -LOKI_RULE = """groups: - - name: grafana-agent-high-log-volume - rules: - - alert: HighLogVolume - expr: | - count_over_time(({%%juju_topology%%})[30s]) > 100 - labels: - severity: high - annotations: - summary: Log rate is too high! -""" -GRAFANA_DASH = """ -{ - "title": "foo", - "bar" : "baz" -} -""" - - -@pytest.fixture(autouse=True) -def patch_all(placeholder_cfg_path): - with patch("subprocess.run", MagicMock()), patch( - "grafana_agent.CONFIG_PATH", placeholder_cfg_path - ): - yield - - -@pytest.fixture(autouse=True) -def vroot(placeholder_cfg_path): - with tempfile.TemporaryDirectory() as vroot: - vroot = Path(vroot) - promroot = vroot / "src/prometheus_alert_rules" - lokiroot = vroot / "src/loki_alert_rules" - grafroot = vroot / "src/grafana_dashboards" - - promroot.mkdir(parents=True) - lokiroot.mkdir(parents=True) - grafroot.mkdir(parents=True) - - (promroot / "prom.rule").write_text(PROM_RULE) - (lokiroot / "loki.rule").write_text(LOKI_RULE) - (grafroot / "grafana_dashboard.json").write_text(GRAFANA_DASH) - - old_cwd = os.getcwd() - os.chdir(str(vroot)) - yield vroot - os.chdir(old_cwd) - - -@pytest.fixture(autouse=True) -def snap_is_installed(): - with patch( - "machine_charm.GrafanaAgentMachineCharm._is_installed", new_callable=PropertyMock - ) as mock_foo: - mock_foo.return_value = True - yield - - -@pytest.fixture -def provider_charm(): - class MyPrincipal(CharmBase): - META = { - "name": "mock-principal", - "provides": { - "cos-agent": {"interface": "cos_agent", "scope": "container"}, - }, - } - _log_slots = ["charmed-kafka:logs"] - - def __init__(self, framework: Framework): - super().__init__(framework) - self.gagent = COSAgentProvider( - self, - metrics_endpoints=[ - {"path": "/metrics", "port": "8080"}, - ], - metrics_rules_dir="./src/alert_rules/prometheus", - logs_rules_dir="./src/alert_rules/loki", - log_slots=self._log_slots, - refresh_events=[self.on.cos_agent_relation_changed], - ) - - return MyPrincipal - - -@pytest.fixture -def requirer_charm(): - class MySubordinate(CharmBase): - META = { - "name": "mock-subordinate", - "requires": { - "cos-agent": {"interface": "cos_agent", "scope": "container"}, - }, - "peers": {"peers": {"interface": "grafana_agent_replica"}}, - } - - def __init__(self, framework: Framework): - super().__init__(framework) - self.gagent = COSAgentRequirer( - self, - refresh_events=[self.on.cos_agent_relation_changed], - ) - - return MySubordinate - - -@pytest.fixture -def provider_ctx(provider_charm, vroot): - return Context(charm_type=provider_charm, meta=provider_charm.META, charm_root=vroot) - - -@pytest.fixture -def requirer_ctx(requirer_charm, vroot): - return Context(charm_type=requirer_charm, meta=requirer_charm.META, charm_root=vroot) - - -def test_cos_agent_changed_no_remote_data(provider_ctx): - cos_agent = SubordinateRelation("cos-agent") - state_out = provider_ctx.run( - cos_agent.changed_event(remote_unit_id=1), State(relations=[cos_agent]) - ) - - config = json.loads(state_out.relations[0].local_unit_data[CosAgentPeersUnitData.KEY]) - assert config["metrics_alert_rules"] == {} - assert config["log_alert_rules"] == {} - assert len(config["dashboards"]) == 1 - assert len(config["metrics_scrape_jobs"]) == 1 - assert config["log_slots"] == ["charmed-kafka:logs"] - - -def test_subordinate_update(requirer_ctx): - # step 2: gagent is notified that the principal has touched its relation data - peer = PeerRelation("peers") - config = { - "metrics_alert_rules": {}, - "log_alert_rules": {}, - "dashboards": [ - "/Td6WFoAAATm1rRGAgAhARYAAAB0L+WjAQAmCnsKICAidGl0bGUiOiAi" - "Zm9vIiwKICAiYmFyIiA6ICJiYXoiCn0KAACkcc0YFt15xAABPyd8KlLdH7bzfQEAAAAABFla" - ], - "metrics_scrape_jobs": [ - {"job_name": "mock-principal_0", "path": "/metrics", "port": "8080"} - ], - "log_slots": ["charmed-kafka:logs"], - } - - cos_agent1 = SubordinateRelation( - "cos-agent", - remote_app_name="mock-principal", - remote_unit_data={"config": json.dumps(config)}, - ) - state_out1 = requirer_ctx.run( - cos_agent1.changed_event(remote_unit_id=0), State(relations=[cos_agent1, peer]) - ) - peer_out = state_out1.get_relations("peers")[0] - peer_out_data = json.loads( - peer_out.local_unit_data[f"{CosAgentPeersUnitData.KEY}-mock-principal/0"] - ) - assert peer_out_data["principal_unit_name"] == "mock-principal/0" - assert peer_out_data["principal_relation_id"] == str(cos_agent1.relation_id) - assert peer_out_data["principal_relation_name"] == cos_agent1.endpoint - - # passthrough as-is - assert peer_out_data["metrics_alert_rules"] == config["metrics_alert_rules"] - assert peer_out_data["log_alert_rules"] == config["log_alert_rules"] - assert peer_out_data["dashboards"] == config["dashboards"] - - -def test_cos_agent_wrong_rel_data(vroot, snap_is_installed, provider_ctx): - # Step 1: principal charm is deployed and ends in "unknown" state - provider_ctx.charm_spec.charm_type._log_slots = ( - "charmed:frogs" # Set wrong type, must be a list - ) - cos_agent_rel = SubordinateRelation("cos-agent") - state = State(relations=[cos_agent_rel]) - state_out = provider_ctx.run(cos_agent_rel.changed_event(remote_unit_id=1), state=state) - assert state_out.unit_status.name == "unknown" - - found = False - for log in provider_ctx.juju_log: - if "ERROR" in log[0] and "Invalid relation data provided:" in log[1]: - found = True - break - - assert found is True diff --git a/tests/scenario/test_machine_charm/test_models.py b/tests/scenario/test_machine_charm/test_models.py deleted file mode 100644 index 2de3fd3a..00000000 --- a/tests/scenario/test_machine_charm/test_models.py +++ /dev/null @@ -1,62 +0,0 @@ -# Copyright 2021 Canonical Ltd. -# See LICENSE file for licensing details. -import json -from typing import List - -import pydantic -import pytest -from charms.grafana_agent.v0.cos_agent import CosAgentProviderUnitData, GrafanaDashboard - - -class Foo(pydantic.BaseModel): - dash: List[GrafanaDashboard] - - -def test_dashboard_validation(): - raw_dash = {"totl": "foo", "bar": "baz"} - with pytest.raises(pydantic.ValidationError): - Foo(dash=[raw_dash]) - - -def test_dashboard_serialization(): - raw_dash = {"title": "foo", "bar": "baz"} - encoded_dashboard = GrafanaDashboard._serialize(json.dumps(raw_dash)) - data = Foo(dash=[encoded_dashboard]) - assert data.json() == '{"dash": ["{encoded_dashboard}"]}'.replace( - "{encoded_dashboard}", encoded_dashboard - ) - - -def test_cos_agent_provider_unit_data_dashboard_serialization(): - raw_dash = {"title": "title", "foo": "bar"} - encoded_dashboard = GrafanaDashboard()._serialize(json.dumps(raw_dash)) - data = CosAgentProviderUnitData( - metrics_alert_rules={}, - log_alert_rules={}, - metrics_scrape_jobs=[], - log_slots=[], - dashboards=[encoded_dashboard], - subordinate=True, - ) - assert json.loads(data.json()) == { - "metrics_alert_rules": {}, - "log_alert_rules": {}, - "dashboards": [encoded_dashboard], - "metrics_scrape_jobs": [], - "log_slots": [], - "subordinate": True, - } - - -def test_dashboard_deserialization_roundtrip(): - raw_dash = {"title": "title", "foo": "bar"} - encoded_dashboard = GrafanaDashboard()._serialize(json.dumps(raw_dash)) - raw = { - "metrics_alert_rules": {}, - "log_alert_rules": {}, - "metrics_scrape_jobs": [], - "log_slots": [], - "dashboards": [encoded_dashboard], - } - data = CosAgentProviderUnitData(**raw) - assert GrafanaDashboard(data.dashboards[0])._deserialize() == raw_dash diff --git a/tests/scenario/test_machine_charm/test_multiple_subordinates.py b/tests/scenario/test_machine_charm/test_multiple_subordinates.py deleted file mode 100644 index ef7f3291..00000000 --- a/tests/scenario/test_machine_charm/test_multiple_subordinates.py +++ /dev/null @@ -1,194 +0,0 @@ -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - -import json - -import machine_charm -import pytest -from charms.grafana_agent.v0.cos_agent import MultiplePrincipalsError -from scenario import Context, PeerRelation, State, SubordinateRelation - -from tests.scenario.helpers import get_charm_meta - - -def test_juju_info_and_cos_agent(vroot): - def post_event(charm: machine_charm.GrafanaAgentMachineCharm): - assert len(charm._cos.dashboards) == 1 - assert len(charm._cos.snap_log_endpoints) == 1 - assert not charm._cos.logs_alerts - assert not charm._cos.metrics_alerts - assert len(charm._cos.metrics_jobs) == 1 - assert charm._principal_relation.name == "juju-info" - - cos_agent_data = { - "config": json.dumps( - { - "subordinate": True, - "metrics_alert_rules": {}, - "log_alert_rules": {}, - "dashboards": [ - "/Td6WFoAAATm1rRGAgAhARYAAAB0L+WjAQAmCnsKICAidGl0bGUiOiAi" - "Zm9vIiwKICAiYmFyIiA6ICJiYXoiCn0KAACkcc0YFt15xAABPyd8KlLdH7bzfQEAAAAABFla" - ], - "metrics_scrape_jobs": [ - {"job_name": "hardware-observer_0", "path": "/metrics", "port": "8080"} - ], - "log_slots": ["foo:bar"], - } - ) - } - - cos_agent_relation = SubordinateRelation( - "cos-agent", remote_app_name="hardware-observer", remote_unit_data=cos_agent_data - ) - - context = Context( - charm_type=machine_charm.GrafanaAgentMachineCharm, - meta=get_charm_meta(machine_charm.GrafanaAgentMachineCharm), - charm_root=vroot, - ) - state = State( - relations=[ - cos_agent_relation, - SubordinateRelation("juju-info", remote_app_name="remote-juju-info"), - PeerRelation("peers"), - ] - ) - context.run(event=cos_agent_relation.changed_event, state=state, post_event=post_event) - - -def test_two_cos_agent_relations(vroot): - def post_event(charm: machine_charm.GrafanaAgentMachineCharm): - assert len(charm._cos.dashboards) == 2 - assert len(charm._cos.snap_log_endpoints) == 2 - assert not charm._cos.logs_alerts - assert not charm._cos.metrics_alerts - assert len(charm._cos.metrics_jobs) == 2 - assert charm._principal_relation.name == "cos-agent" - assert charm._principal_relation.app.name == "primary" - - cos_agent_primary_data = { - "config": json.dumps( - { - "subordinate": False, - "metrics_alert_rules": {}, - "log_alert_rules": {}, - "dashboards": [ - "/Td6WFoAAATm1rRGAgAhARYAAAB0L+WjAQAmCnsKICAidGl0bGUiOiAi" - "Zm9vIiwKICAiYmFyIiA6ICJiYXoiCn0KAACkcc0YFt15xAABPyd8KlLdH7bzfQEAAAAABFla" - ], - "metrics_scrape_jobs": [ - {"job_name": "primary_0", "path": "/metrics", "port": "8080"} - ], - "log_slots": ["foo:bar"], - } - ) - } - - cos_agent_subordinate_data = { - "config": json.dumps( - { - "subordinate": True, - "metrics_alert_rules": {}, - "log_alert_rules": {}, - "dashboards": [ - "/Td6WFoAAATm1rRGAgAhARYAAAB0L+WjAQAmCnsKICAidGl0bGUiOiAi" - "Zm9vIiwKICAiYmFyIiA6ICJiYXoiCn0KAACkcc0YFt15xAABPyd8KlLdH7bzfQEAAAAABFla" - ], - "metrics_scrape_jobs": [ - {"job_name": "subordinate_0", "path": "/metrics", "port": "8081"} - ], - "log_slots": ["oh:snap"], - } - ) - } - - cos_agent_primary_relation = SubordinateRelation( - "cos-agent", remote_app_name="primary", remote_unit_data=cos_agent_primary_data - ) - cos_agent_subordinate_relation = SubordinateRelation( - "cos-agent", remote_app_name="subordinate", remote_unit_data=cos_agent_subordinate_data - ) - - context = Context( - charm_type=machine_charm.GrafanaAgentMachineCharm, - meta=get_charm_meta(machine_charm.GrafanaAgentMachineCharm), - charm_root=vroot, - ) - state = State( - relations=[ - cos_agent_primary_relation, - cos_agent_subordinate_relation, - PeerRelation("peers"), - ] - ) - out_state = context.run(event=cos_agent_primary_relation.changed_event, state=state) - vroot.clean() - context.run( - event=cos_agent_subordinate_relation.changed_event, state=out_state, post_event=post_event - ) - - -def test_two_cos_primary_relations(vroot): - def post_event(charm: machine_charm.GrafanaAgentMachineCharm): - with pytest.raises(MultiplePrincipalsError): - charm._principal_relation - - cos_agent_primary_data = { - "config": json.dumps( - { - "subordinate": False, - "metrics_alert_rules": {}, - "log_alert_rules": {}, - "dashboards": [ - "/Td6WFoAAATm1rRGAgAhARYAAAB0L+WjAQAmCnsKICAidGl0bGUiOiAi" - "Zm9vIiwKICAiYmFyIiA6ICJiYXoiCn0KAACkcc0YFt15xAABPyd8KlLdH7bzfQEAAAAABFla" - ], - "metrics_scrape_jobs": [ - {"job_name": "primary_0", "path": "/metrics", "port": "8080"} - ], - "log_slots": ["foo:bar"], - } - ) - } - - cos_agent_subordinate_data = { - "config": json.dumps( - { - "subordinate": False, - "metrics_alert_rules": {}, - "log_alert_rules": {}, - "dashboards": [ - "/Td6WFoAAATm1rRGAgAhARYAAAB0L+WjAQAmCnsKICAidGl0bGUiOiAi" - "Zm9vIiwKICAiYmFyIiA6ICJiYXoiCn0KAACkcc0YFt15xAABPyd8KlLdH7bzfQEAAAAABFla" - ], - "metrics_scrape_jobs": [ - {"job_name": "subordinate_0", "path": "/metrics", "port": "8081"} - ], - "log_slots": ["oh:snap"], - } - ) - } - - cos_agent_primary_relation = SubordinateRelation( - "cos-agent", remote_app_name="primary", remote_unit_data=cos_agent_primary_data - ) - cos_agent_subordinate_relation = SubordinateRelation( - "cos-agent", remote_app_name="subordinate", remote_unit_data=cos_agent_subordinate_data - ) - - context = Context( - charm_type=machine_charm.GrafanaAgentMachineCharm, - meta=get_charm_meta(machine_charm.GrafanaAgentMachineCharm), - charm_root=vroot, - ) - state = State( - relations=[ - cos_agent_primary_relation, - cos_agent_subordinate_relation, - PeerRelation("peers"), - ] - ) - context.run( - event=cos_agent_subordinate_relation.changed_event, state=state, post_event=post_event - ) diff --git a/tests/scenario/test_machine_charm/test_peer_relation.py b/tests/scenario/test_machine_charm/test_peer_relation.py deleted file mode 100644 index 0b2bac18..00000000 --- a/tests/scenario/test_machine_charm/test_peer_relation.py +++ /dev/null @@ -1,385 +0,0 @@ -# Copyright 2021 Canonical Ltd. -# See LICENSE file for licensing details. -import json -from unittest.mock import MagicMock - -import pytest -from charms.grafana_agent.v0.cos_agent import ( - CosAgentPeersUnitData, - CosAgentProviderUnitData, - COSAgentRequirer, -) -from charms.prometheus_k8s.v1.prometheus_remote_write import ( - PrometheusRemoteWriteConsumer, -) -from cosl import GrafanaDashboard -from ops.charm import CharmBase -from ops.framework import Framework -from scenario import Context, PeerRelation, State, SubordinateRelation - - -def encode_as_dashboard(dct: dict): - return GrafanaDashboard._serialize(json.dumps(dct).encode("utf-8")) - - -def test_fetch_data_from_relation(): - relation = MagicMock() - unit = MagicMock() - app = MagicMock() - py_dash = {"title": "title", "foo": "bar"} - - relation.units = [] # there should be remote units in here, presumably - config = { - "principal_unit_name": "principal/0", - "principal_relation_id": "0", - "principal_relation_name": "foo", - "dashboards": [encode_as_dashboard(py_dash)], - } - relation.app = app - relation.data = {unit: {CosAgentPeersUnitData.KEY: json.dumps(config)}, app: {}} - - obj = MagicMock() - obj._charm.unit = unit - - obj.peer_relation = relation - data = COSAgentRequirer._gather_peer_data(obj) - assert len(data) == 1 - - data_peer_1 = data[0] - assert len(data_peer_1.dashboards) == 1 - dash_out_raw = data_peer_1.dashboards[0] - assert GrafanaDashboard(dash_out_raw)._deserialize() == py_dash - - -class MyRequirerCharm(CharmBase): - META = { - "name": "test", - "requires": { - "cos-agent": {"interface": "cos_agent", "scope": "container"}, - "send-remote-write": {"interface": "prometheus_remote_write"}, - }, - "peers": {"peers": {"interface": "grafana_agent_replica"}}, - } - - def __init__(self, framework: Framework): - super().__init__(framework) - self.cosagent = COSAgentRequirer(self) - self.prom = PrometheusRemoteWriteConsumer(self) - framework.observe(self.cosagent.on.data_changed, self._on_cosagent_data_changed) - - def _on_cosagent_data_changed(self, _): - pass - - -def test_no_dashboards(): - state = State() - - def post_event(charm: MyRequirerCharm): - assert not charm.cosagent.dashboards - - ctx = Context( - charm_type=MyRequirerCharm, - meta=MyRequirerCharm.META, - ) - state = State() - ctx.run(state=state, event="update-status", post_event=post_event) - - -def test_no_dashboards_peer(): - peer_relation = PeerRelation(endpoint="peers", interface="grafana_agent_replica") - - state = State(relations=[peer_relation]) - - def post_event(charm: MyRequirerCharm): - assert not charm.cosagent.dashboards - - ctx = Context( - charm_type=MyRequirerCharm, - meta=MyRequirerCharm.META, - ) - ctx.run(state=state, event="update-status", post_event=post_event) - - -def test_no_dashboards_peer_cosagent(): - cos_agent = SubordinateRelation( - endpoint="cos-agent", interface="cos_agent", remote_app_name="primary" - ) - peer_relation = PeerRelation(endpoint="peers", interface="grafana_agent_replica") - - state = State(relations=[peer_relation, cos_agent]) - - def post_event(charm: MyRequirerCharm): - assert not charm.cosagent.dashboards - - ctx = Context( - charm_type=MyRequirerCharm, - meta=MyRequirerCharm.META, - ) - ctx.run(state=state, event=cos_agent.changed_event(remote_unit_id=0), post_event=post_event) - - -@pytest.mark.parametrize("leader", (True, False)) -def test_cosagent_to_peer_data_flow_dashboards(leader): - # This test verifies that if the charm receives via cos-agent a dashboard, - # it is correctly transferred to peer relation data. - - raw_dashboard_1 = {"title": "title", "foo": "bar"} - raw_data_1 = CosAgentProviderUnitData( - metrics_alert_rules={}, - log_alert_rules={}, - metrics_scrape_jobs=[], - log_slots=[], - dashboards=[encode_as_dashboard(raw_dashboard_1)], - ) - cos_agent = SubordinateRelation( - endpoint="cos-agent", - interface="cos_agent", - remote_app_name="primary", - remote_unit_data={raw_data_1.KEY: raw_data_1.json()}, - ) - peer_relation = PeerRelation(endpoint="peers", interface="grafana_agent_replica") - - state = State(relations=[peer_relation, cos_agent], leader=leader) - - def post_event(charm: MyRequirerCharm): - assert charm.cosagent.dashboards - - ctx = Context( - charm_type=MyRequirerCharm, - meta=MyRequirerCharm.META, - ) - state_out = ctx.run( - state=state, event=cos_agent.changed_event(remote_unit_id=0), post_event=post_event - ) - - peer_relation_out = next(filter(lambda r: r.endpoint == "peers", state_out.relations)) - print(peer_relation_out.local_unit_data) - peer_data = peer_relation_out.local_unit_data[f"{CosAgentPeersUnitData.KEY}-primary/0"] - assert json.loads(peer_data)["dashboards"] == [encode_as_dashboard(raw_dashboard_1)] - - -@pytest.mark.parametrize("leader", (True, False)) -def test_cosagent_to_peer_data_flow_relation(leader): - # dump the data the same way the provider would - raw_dashboard_1 = {"title": "title", "foo": "bar"} - data_1 = CosAgentProviderUnitData( - metrics_alert_rules={}, - log_alert_rules={}, - metrics_scrape_jobs=[], - log_slots=[], - dashboards=[encode_as_dashboard(raw_dashboard_1)], - ) - - cos_agent_1 = SubordinateRelation( - endpoint="cos-agent", - interface="cos_agent", - remote_app_name="primary", - remote_unit_data={data_1.KEY: data_1.json()}, - ) - - raw_dashboard_2 = {"title": "other_title", "foo": "other bar (would that be a pub?)"} - data_2 = CosAgentProviderUnitData( - metrics_alert_rules={}, - log_alert_rules={}, - metrics_scrape_jobs=[], - log_slots=[], - dashboards=[encode_as_dashboard(raw_dashboard_2)], - ) - - cos_agent_2 = SubordinateRelation( - endpoint="cos-agent", - interface="cos_agent", - remote_app_name="other_primary", - remote_unit_data={data_2.KEY: data_2.json()}, - ) - - # now the peer relation already contains the primary/0 information - # i.e. we've already seen cos_agent_1-relation-changed before - peer_relation = PeerRelation( - endpoint="peers", - interface="grafana_agent_replica", - peers_data={ - 1: { - f"{CosAgentPeersUnitData.KEY}-primary/0": CosAgentPeersUnitData( - principal_unit_name="primary/0", - principal_relation_id="42", - principal_relation_name="foobar-relation", - dashboards=[encode_as_dashboard(raw_dashboard_1)], - ).json() - } - }, - ) - - state = State( - leader=leader, - relations=[ - peer_relation, - cos_agent_1, - cos_agent_2, - ], - ) - - def pre_event(charm: MyRequirerCharm): - dashboards = charm.cosagent.dashboards - assert len(dashboards) == 1 - - dash = dashboards[0] - assert dash["title"] == "title" - assert dash["content"] == raw_dashboard_1 - - def post_event(charm: MyRequirerCharm): - dashboards = charm.cosagent.dashboards - assert len(dashboards) == 2 - - other_dash, dash = dashboards - assert dash["title"] == "title" - assert dash["content"] == raw_dashboard_1 - - assert other_dash["title"] == "other_title" - assert other_dash["content"] == raw_dashboard_2 - - ctx = Context( - charm_type=MyRequirerCharm, - meta=MyRequirerCharm.META, - ) - state_out = ctx.run( - state=state, - event=cos_agent_2.changed_event(remote_unit_id=0), - pre_event=pre_event, - post_event=post_event, - ) - - peer_relation_out: PeerRelation = next( - filter(lambda r: r.endpoint == "peers", state_out.relations) - ) - # the dashboard we just received via cos-agent is now in our local peer databag - peer_data_local = peer_relation_out.local_unit_data[ - f"{CosAgentPeersUnitData.KEY}-other_primary/0" - ] - assert json.loads(peer_data_local)["dashboards"] == [encode_as_dashboard(raw_dashboard_2)] - - # the dashboard we previously had via peer data is still there. - peer_data_peer = peer_relation_out.peers_data[1][f"{CosAgentPeersUnitData.KEY}-primary/0"] - assert json.loads(peer_data_peer)["dashboards"] == [encode_as_dashboard(raw_dashboard_1)] - - -@pytest.mark.parametrize("leader", (True, False)) -def test_cosagent_to_peer_data_app_vs_unit(leader): - # this test verifies that if multiple units (belonging to different apps) all publish their own - # CosAgentProviderUnitData via `cos-agent`, then the `peers` peer relation will be populated - # with the right data. - # This means: - # - The per-app data is only collected once per application (dedup'ed). - # - The per-unit data is collected across all units. - - # dump the data the same way the provider would - raw_dashboard_1 = {"title": "title", "foo": "bar"} - data_1 = CosAgentProviderUnitData( - dashboards=[encode_as_dashboard(raw_dashboard_1)], - metrics_alert_rules={"a": "b", "c": 1}, - log_alert_rules={"a": "b", "c": 2}, - metrics_scrape_jobs=[{"1": 2, "2": 3}], - log_slots=["foo:bar", "bax:qux"], - ) - - # there's an "other_primary" app also relating over `cos-agent` - raw_dashboard_2 = {"title": "other_title", "foo": "other bar (would that be a pub?)"} - data_2 = CosAgentProviderUnitData( - dashboards=[encode_as_dashboard(raw_dashboard_2)], - metrics_alert_rules={"a": "h", "c": 1}, - log_alert_rules={"a": "h", "d": 2}, - metrics_scrape_jobs=[{"1": 2, "4": 3}], - log_slots=["dead:beef", "bax:quff"], - ) - - cos_agent_2 = SubordinateRelation( - endpoint="cos-agent", - interface="cos_agent", - remote_app_name="other_primary", - remote_unit_data={data_2.KEY: data_2.json()}, - ) - - # suppose that this unit's primary is 'other_primary/0'. - - # now the peer relation already contains the primary/0 information - # i.e. we've already seen cos_agent_1-relation-changed before - peer_relation = PeerRelation( - endpoint="peers", - interface="grafana_agent_replica", - # one of this unit's peers, who has as primary "primary/23", has already - # logged its part of the data - peers_data={ - 1: { - f"{CosAgentPeersUnitData.KEY}-primary/23": CosAgentPeersUnitData( - principal_unit_name="primary/23", - principal_relation_id="42", - principal_relation_name="cos-agent", - # data coming from `primary` is here: - dashboards=data_1.dashboards, - metrics_alert_rules=data_1.metrics_alert_rules, - log_alert_rules=data_1.log_alert_rules, - ).json() - } - }, - ) - - state = State( - leader=leader, - relations=[ - peer_relation, - cos_agent_2, - ], - ) - - def pre_event(charm: MyRequirerCharm): - # verify that before the event is processed, the charm correctly gathers only 1 dashboard - dashboards = charm.cosagent.dashboards - assert len(dashboards) == 1 - - dash = dashboards[0] - assert dash["title"] == "title" - assert dash["content"] == raw_dashboard_1 - - def post_event(charm: MyRequirerCharm): - # after the event is processed, the charm has copied its primary's 'cos-agent' data into - # its 'peers' peer databag, therefore there are now two dashboards. - # The source of the dashboards is peer data. - - dashboards = charm.cosagent.dashboards - assert len(dashboards) == 2 - - dash = dashboards[0] - assert dash["title"] == "other_title" - assert dash["content"] == raw_dashboard_2 - - dash = dashboards[1] - assert dash["title"] == "title" - assert dash["content"] == raw_dashboard_1 - - ctx = Context( - charm_type=MyRequirerCharm, - meta=MyRequirerCharm.META, - ) - state_out = ctx.run( - state=state, - event=cos_agent_2.changed_event(remote_unit_id=0), - pre_event=pre_event, - post_event=post_event, - ) - - peer_relation_out: PeerRelation = next( - filter(lambda r: r.endpoint == "peers", state_out.relations) - ) - my_databag_peer_data = peer_relation_out.local_unit_data[ - f"{CosAgentPeersUnitData.KEY}-other_primary/0" - ] - assert set(json.loads(my_databag_peer_data)["dashboards"]) == { - encode_as_dashboard(raw_dashboard_2) - } - - peer_databag_peer_data = peer_relation_out.peers_data[1][ - f"{CosAgentPeersUnitData.KEY}-primary/23" - ] - assert json.loads(peer_databag_peer_data)["dashboards"][0] == encode_as_dashboard( - raw_dashboard_1 - ) diff --git a/tests/scenario/test_machine_charm/test_relation_priority.py b/tests/scenario/test_machine_charm/test_relation_priority.py deleted file mode 100644 index d4e74467..00000000 --- a/tests/scenario/test_machine_charm/test_relation_priority.py +++ /dev/null @@ -1,201 +0,0 @@ -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. -import json -from pathlib import Path -from unittest.mock import patch - -import machine_charm -import pytest -from charms.grafana_agent.v0.cos_agent import MultiplePrincipalsError -from cosl import GrafanaDashboard -from scenario import Context, PeerRelation, State, SubordinateRelation - -from tests.scenario.helpers import get_charm_meta -from tests.scenario.test_machine_charm.helpers import set_run_out - - -def trigger(evt: str, state: State, vroot: Path = None, **kwargs): - context = Context( - charm_type=machine_charm.GrafanaAgentMachineCharm, - meta=get_charm_meta(machine_charm.GrafanaAgentMachineCharm), - charm_root=vroot, - ) - return context.run(event=evt, state=state, **kwargs) - - -@pytest.fixture -def mock_cfg_path(tmp_path): - return tmp_path / "foo.yaml" - - -@pytest.fixture(autouse=True) -def patch_all(placeholder_cfg_path): - with patch("grafana_agent.CONFIG_PATH", placeholder_cfg_path): - yield - - -@patch("machine_charm.subprocess.run") -def test_no_relations(mock_run, vroot): - def post_event(charm: machine_charm.GrafanaAgentMachineCharm): - assert not charm._cos.dashboards - assert not charm._cos.logs_alerts - assert not charm._cos.metrics_alerts - assert not charm._cos.metrics_jobs - assert not charm._cos.snap_log_endpoints - - assert not charm._principal_relation - assert not charm.principal_unit - - set_run_out(mock_run, 0) - trigger("start", State(), post_event=post_event, vroot=vroot) - - -@patch("machine_charm.subprocess.run") -def test_juju_info_relation(mock_run, vroot): - def post_event(charm: machine_charm.GrafanaAgentMachineCharm): - assert not charm._cos.dashboards - assert not charm._cos.logs_alerts - assert not charm._cos.metrics_alerts - assert not charm._cos.metrics_jobs - assert not charm._cos.snap_log_endpoints - - assert charm._principal_relation - assert charm.principal_unit - - set_run_out(mock_run, 0) - trigger( - "start", - State( - relations=[ - SubordinateRelation( - "juju-info", remote_unit_data={"config": json.dumps({"subordinate": True})} - ) - ] - ), - post_event=post_event, - vroot=vroot, - ) - - -@patch("machine_charm.subprocess.run") -def test_cos_machine_relation(mock_run, vroot): - def post_event(charm: machine_charm.GrafanaAgentMachineCharm): - assert charm._cos.dashboards - assert charm._cos.snap_log_endpoints - assert not charm._cos.logs_alerts - assert not charm._cos.metrics_alerts - assert charm._cos.metrics_jobs - - assert charm._principal_relation.name == "cos-agent" - assert charm.principal_unit.name == "mock-principal/0" - - set_run_out(mock_run, 0) - - cos_agent_data = { - "config": json.dumps( - { - "metrics_alert_rules": {}, - "log_alert_rules": {}, - "dashboards": [ - "/Td6WFoAAATm1rRGAgAhARYAAAB0L+WjAQAmCnsKICAidGl0bGUiOiAi" - "Zm9vIiwKICAiYmFyIiA6ICJiYXoiCn0KAACkcc0YFt15xAABPyd8KlLdH7bzfQEAAAAABFla" - ], - "metrics_scrape_jobs": [ - {"job_name": "mock-principal_0", "path": "/metrics", "port": "8080"} - ], - "log_slots": ["charmed-kafka:logs"], - } - ) - } - - peer_data = { - "config": json.dumps( - { - "principal_unit_name": "foo", - "principal_relation_id": "2", - "principal_relation_name": "peers", - "metrics_alert_rules": {}, - "log_alert_rules": {}, - "dashboards": [GrafanaDashboard._serialize('{"very long": "dashboard"}')], - } - ) - } - trigger( - "start", - State( - relations=[ - SubordinateRelation( - "cos-agent", - remote_app_name="mock-principal", - remote_unit_data=cos_agent_data, - ), - PeerRelation("peers", peers_data={1: peer_data}), - ] - ), - post_event=post_event, - vroot=vroot, - ) - - -@patch("machine_charm.subprocess.run") -def test_both_relations(mock_run, vroot): - def post_event(charm: machine_charm.GrafanaAgentMachineCharm): - assert charm._cos.dashboards - assert charm._cos.snap_log_endpoints - assert not charm._cos.logs_alerts - assert not charm._cos.metrics_alerts - assert charm._cos.metrics_jobs - - # Trying to get the principal should raise an exception. - with pytest.raises(MultiplePrincipalsError): - assert charm._principal_relation - - set_run_out(mock_run, 0) - - cos_agent_data = { - "config": json.dumps( - { - "metrics_alert_rules": {}, - "log_alert_rules": {}, - "dashboards": [ - "/Td6WFoAAATm1rRGAgAhARYAAAB0L+WjAQAmCnsKICAidGl0bGUiOiAi" - "Zm9vIiwKICAiYmFyIiA6ICJiYXoiCn0KAACkcc0YFt15xAABPyd8KlLdH7bzfQEAAAAABFla" - ], - "metrics_scrape_jobs": [ - {"job_name": "mock-principal_0", "path": "/metrics", "port": "8080"} - ], - "log_slots": ["charmed-kafka:logs"], - } - ) - } - - peer_data = { - "config": json.dumps( - { - "principal_unit_name": "foo", - "principal_relation_id": "2", - "principal_relation_name": "peers", - "metrics_alert_rules": {}, - "log_alert_rules": {}, - "dashboards": [GrafanaDashboard._serialize('{"very long": "dashboard"}')], - } - ) - } - - context = Context( - charm_type=machine_charm.GrafanaAgentMachineCharm, - meta=get_charm_meta(machine_charm.GrafanaAgentMachineCharm), - charm_root=vroot, - ) - state = State( - relations=[ - SubordinateRelation( - "cos-agent", - remote_app_name="remote-cos-agent", - remote_unit_data=cos_agent_data, - ), - SubordinateRelation("juju-info", remote_app_name="remote-juju-info"), - PeerRelation("peers", peers_data={1: peer_data}), - ] - ) - context.run(event="start", state=state, post_event=post_event) diff --git a/tests/scenario/test_machine_charm/test_scrape_configs.py b/tests/scenario/test_machine_charm/test_scrape_configs.py deleted file mode 100644 index ef9174b0..00000000 --- a/tests/scenario/test_machine_charm/test_scrape_configs.py +++ /dev/null @@ -1,90 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright 2022 Canonical Ltd. -# See LICENSE file for licensing details. -import inspect -import json -import tempfile -import uuid -from pathlib import Path -from unittest.mock import patch - -import machine_charm -import pytest -import yaml -from charms.grafana_agent.v0.cos_agent import CosAgentProviderUnitData -from scenario import Context, Model, PeerRelation, Relation, State, SubordinateRelation - -machine_meta = yaml.safe_load( - ( - Path(inspect.getfile(machine_charm.GrafanaAgentMachineCharm)).parent.parent - / "machine_metadata.yaml" - ).read_text() -) - - -@pytest.fixture(autouse=True) -def patch_all(placeholder_cfg_path): - with patch("grafana_agent.CONFIG_PATH", placeholder_cfg_path): - yield - - -def test_snap_endpoints(placeholder_cfg_path): - written_path, written_text = "", "" - - def mock_write(_, path, text): - nonlocal written_path, written_text - written_path = path - written_text = text - - loki_relation = Relation( - "logging-consumer", - remote_app_name="loki", - remote_units_data={ - 1: {"endpoint": json.dumps({"url": "http://loki1:3100/loki/api/v1/push"})} - }, - ) - - data = CosAgentProviderUnitData( - dashboards=[], - metrics_alert_rules={}, - log_alert_rules={}, - metrics_scrape_jobs=[], - log_slots=["foo:bar", "oh:snap", "shameless-plug"], - ) - cos_relation = SubordinateRelation( - "cos-agent", remote_app_name="principal", remote_unit_data={data.KEY: data.json()} - ) - - vroot = tempfile.TemporaryDirectory() - vroot_path = Path(vroot.name) - vroot_path.joinpath("src", "loki_alert_rules").mkdir(parents=True) - vroot_path.joinpath("src", "prometheus_alert_rules").mkdir(parents=True) - vroot_path.joinpath("src", "grafana_dashboards").mkdir(parents=True) - - my_uuid = str(uuid.uuid4()) - - with patch("charms.operator_libs_linux.v2.snap.SnapCache"), patch( - "machine_charm.GrafanaAgentMachineCharm.write_file", new=mock_write - ), patch("machine_charm.GrafanaAgentMachineCharm.is_ready", return_value=True): - state = State( - relations=[cos_relation, loki_relation, PeerRelation("peers")], - model=Model(name="my-model", uuid=my_uuid), - ) - - ctx = Context( - charm_type=machine_charm.GrafanaAgentMachineCharm, - meta=machine_meta, - charm_root=vroot.name, - ) - ctx.run(state=state, event=cos_relation.changed_event) - - assert written_path == placeholder_cfg_path - written_config = yaml.safe_load(written_text) - logs_configs = written_config["logs"]["configs"] - for config in logs_configs: - if config["name"] == "log_file_scraper": - scrape_job_names = [job["job_name"] for job in config["scrape_configs"]] - assert "foo" in scrape_job_names - assert "oh" in scrape_job_names - assert "shameless_plug" not in scrape_job_names diff --git a/tests/scenario/test_setup_statuses.py b/tests/scenario/test_setup_statuses.py index 87e61dee..c18ed2d6 100644 --- a/tests/scenario/test_setup_statuses.py +++ b/tests/scenario/test_setup_statuses.py @@ -4,9 +4,8 @@ from typing import Type from unittest.mock import patch +import charm import grafana_agent -import k8s_charm -import machine_charm import pytest from ops import BlockedStatus, UnknownStatus, WaitingStatus, pebble from ops.testing import CharmType @@ -15,16 +14,14 @@ from tests.scenario.helpers import get_charm_meta -@pytest.fixture(params=["k8s", "lxd"]) +@pytest.fixture(params=["k8s"]) def substrate(request): return request.param @pytest.fixture def charm_type(substrate) -> Type[CharmType]: - return {"lxd": machine_charm.GrafanaAgentMachineCharm, "k8s": k8s_charm.GrafanaAgentK8sCharm}[ - substrate - ] + return {"k8s": charm.GrafanaAgentK8sCharm}[substrate] @pytest.fixture @@ -50,7 +47,7 @@ def patch_all(substrate, mock_cfg_path): yield else: - with patch("k8s_charm.KubernetesServicePatch", lambda x, y: None): + with patch("charm.KubernetesServicePatch", lambda x, y: None): yield @@ -85,7 +82,7 @@ def test_start(charm_type, substrate, vroot): assert out.unit_status == UnknownStatus() -def test_k8s_charm_start_with_container(charm_type, substrate, vroot): +def test_charm_start_with_container(charm_type, substrate, vroot): if substrate == "lxd": pytest.skip("k8s-only test") diff --git a/tests/scenario/test_start_statuses.py b/tests/scenario/test_start_statuses.py index 8a867811..40e1cf81 100644 --- a/tests/scenario/test_start_statuses.py +++ b/tests/scenario/test_start_statuses.py @@ -6,27 +6,24 @@ from typing import Type from unittest.mock import patch -import k8s_charm -import machine_charm +import charm import pytest import yaml from ops import pebble from ops.testing import CharmType -from scenario import Container, Context, ExecOutput, State, SubordinateRelation +from scenario import Container, Context, ExecOutput, State CHARM_ROOT = Path(__file__).parent.parent.parent -@pytest.fixture(params=["k8s", "lxd"]) +@pytest.fixture(params=["k8s"]) def substrate(request): return request.param @pytest.fixture def charm_type(substrate) -> Type[CharmType]: - return {"lxd": machine_charm.GrafanaAgentMachineCharm, "k8s": k8s_charm.GrafanaAgentK8sCharm}[ - substrate - ] + return {"k8s": charm.GrafanaAgentK8sCharm}[substrate] @pytest.fixture @@ -53,13 +50,13 @@ def patch_all(substrate, placeholder_cfg_path): yield else: - with patch("k8s_charm.KubernetesServicePatch", lambda x, y: None): + with patch("charm.KubernetesServicePatch", lambda x, y: None): yield @pytest.fixture def charm_meta(substrate, charm_type) -> dict: - fname = {"lxd": "machine_metadata", "k8s": "k8s_metadata"}[substrate] + fname = {"k8s": "metadata"}[substrate] charm_source_path = Path(inspect.getfile(charm_type)) charm_root = charm_source_path.parent.parent @@ -83,50 +80,17 @@ def test_install(charm_type, charm_meta, substrate, vroot): assert out.unit_status == ("unknown", "") -def test_start_not_ready(charm_type, charm_meta, substrate, vroot, placeholder_cfg_path): - if substrate != "lxd": - pytest.skip(reason="machine-only test") - - def post_event(charm: machine_charm.GrafanaAgentMachineCharm): - assert not charm.is_ready - - juju_info = SubordinateRelation("juju-info") - with patch("machine_charm.GrafanaAgentMachineCharm.is_ready", False): - ctx = Context( - charm_type=charm_type, - meta=charm_meta, - charm_root=vroot, - ) - out = ctx.run( - state=State(relations=[juju_info]), event=juju_info.joined_event, post_event=post_event - ) - - assert out.unit_status == ("waiting", "waiting for agent to start") - - def test_start(charm_type, charm_meta, substrate, vroot, placeholder_cfg_path): - with patch("machine_charm.GrafanaAgentMachineCharm.is_ready", True): - ctx = Context( - charm_type=charm_type, - meta=charm_meta, - charm_root=vroot, - ) - out = ctx.run(state=State(), event="start") - - if substrate == "lxd": - written_cfg = placeholder_cfg_path.read_text() - assert written_cfg # check nonempty - - assert out.unit_status.name == "blocked" - - else: - assert out.unit_status.name == "unknown" - + ctx = Context( + charm_type=charm_type, + meta=charm_meta, + charm_root=vroot, + ) + out = ctx.run(state=State(), event="start") + assert out.unit_status.name == "unknown" -def test_k8s_charm_start_with_container(charm_type, charm_meta, substrate, vroot): - if substrate == "lxd": - pytest.skip("k8s-only test") +def test_charm_start_with_container(charm_type, charm_meta, substrate, vroot): agent = Container( name="agent", can_connect=True, diff --git a/tests/unit/k8s/helpers.py b/tests/unit/helpers.py similarity index 100% rename from tests/unit/k8s/helpers.py rename to tests/unit/helpers.py diff --git a/tests/unit/machine/test_fstab_parsing.py b/tests/unit/machine/test_fstab_parsing.py deleted file mode 100644 index 930a9973..00000000 --- a/tests/unit/machine/test_fstab_parsing.py +++ /dev/null @@ -1,129 +0,0 @@ -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. -import unittest -from pathlib import Path - -from fs.tempfs import TempFS -from machine_charm import SnapFstab - - -class TestFstabParsing(unittest.TestCase): - """Verify that fstab handling behaves appropriately.""" - - def setUp(self): - self.sandbox = TempFS("fstab_samples", auto_clean=True) - self.sandbox_root = self.sandbox.getsyspath("/") - self.addCleanup(self.sandbox.close) - - def test_single_plug_parses(self): - fstab = "/var/snap/charmed-kafka/common/log /snap/grafana-agent/7/shared-logs/log none bind,ro 0 0\n" - fstab_file = Path(self.sandbox_root) / "single-plug-fstab" - fstab_file.write_text(fstab) - - fstab = SnapFstab(fstab_file) - entry = fstab.entry("charmed-kafka", "logs") - self.assertEqual(entry.owner, "charmed-kafka") - self.assertEqual(entry.endpoint_source, "common/log") - self.assertEqual(entry.target, "/snap/grafana-agent/7/shared-logs/log") - self.assertEqual(entry.relative_target, "/log") - - def test_multiple_owners_parses(self): - fstab = """ - /var/snap/charmed-kafka/common/aei /snap/grafana-agent/7/shared-logs/aei none bind,ro 0 0\n - /var/snap/charmed-kafka/common/log /snap/grafana-agent/7/shared-logs/log none bind,ro 0 0\n - /var/snap/other-snap/logs/shared /snap/grafana-agent/7/shared-logs/shared none bind,ro 0 0\n - """ - fstab_file = Path(self.sandbox_root) / "single-plug-fstab" - fstab_file.write_text(fstab) - - fstab = SnapFstab(fstab_file) - entry = fstab.entry("charmed-kafka", "logs") - self.assertEqual(entry.owner, "charmed-kafka") - self.assertEqual(entry.endpoint_source, "common/log") - self.assertEqual(entry.target, "/snap/grafana-agent/7/shared-logs/log") - self.assertEqual(entry.relative_target, "/log") - - def test_multiple_entries(self): - fstab = """ - /var/snap/charmed-kafka/common/aei /snap/grafana-agent/7/shared-logs/aei none bind,ro 0 0\n - /var/snap/charmed-kafka/common/log /snap/grafana-agent/7/shared-logs/log none bind,ro 0 0\n - /var/snap/other-snap/logs/shared /snap/grafana-agent/8/shared-logs/shared-aei none bind,ro 0 0\n - """ - fstab_file = Path(self.sandbox_root) / "single-plug-fstab" - fstab_file.write_text(fstab) - - fstab = SnapFstab(fstab_file) - entry = fstab.entry("charmed-kafka", "logs") - self.assertEqual(entry.owner, "charmed-kafka") - self.assertEqual(entry.endpoint_source, "common/log") - self.assertEqual(entry.target, "/snap/grafana-agent/7/shared-logs/log") - self.assertEqual(entry.relative_target, "/log") - - entry = fstab.entry("charmed-kafka", "aei") - self.assertEqual(entry.owner, "charmed-kafka") - self.assertEqual(entry.endpoint_source, "common/aei") - self.assertEqual(entry.target, "/snap/grafana-agent/7/shared-logs/aei") - self.assertEqual(entry.relative_target, "/aei") - - entry = fstab.entry("other-snap", "shared") - self.assertEqual(entry.owner, "other-snap") - self.assertEqual(entry.endpoint_source, "logs/shared") - self.assertEqual(entry.target, "/snap/grafana-agent/8/shared-logs/shared-aei") - self.assertEqual(entry.relative_target, "/shared-aei") - - def test_multiple_owners_parses_inverted_order(self): - fstab = """ - /var/snap/charmed-kafka/common/log /snap/grafana-agent/7/shared-logs/log none bind,ro 0 0\n - /var/snap/charmed-kafka/common/aei /snap/grafana-agent/7/shared-logs/aei none bind,ro 0 0\n - /var/snap/other-snap/logs/shared /snap/grafana-agent/7/shared-logs/shared none bind,ro 0 0\n - """ - fstab_file = Path(self.sandbox_root) / "single-plug-fstab" - fstab_file.write_text(fstab) - - fstab = SnapFstab(fstab_file) - entry = fstab.entry("charmed-kafka", "aei") - self.assertEqual(entry.owner, "charmed-kafka") - self.assertEqual(entry.endpoint_source, "common/aei") - self.assertEqual(entry.target, "/snap/grafana-agent/7/shared-logs/aei") - self.assertEqual(entry.relative_target, "/aei") - - def test_multiple_plugs_parses(self): - fstab = """ - /var/snap/charmed-kafka/common/log /snap/grafana-agent/7/shared-logs/log none bind,ro 0 0\n - /var/snap/other-snap/logs/shared /snap/grafana-agent/7/shared-logs/shared none bind,ro 0 0\n - """ - fstab_file = Path(self.sandbox_root) / "single-plug-fstab" - fstab_file.write_text(fstab) - - fstab = SnapFstab(fstab_file) - entry = fstab.entry("charmed-kafka", "logs") - self.assertEqual(entry.owner, "charmed-kafka") - self.assertEqual(entry.endpoint_source, "common/log") - self.assertEqual(entry.target, "/snap/grafana-agent/7/shared-logs/log") - self.assertEqual(entry.relative_target, "/log") - - other_entry = fstab.entry("other-snap", "shared-logs") - self.assertEqual(other_entry.owner, "other-snap") - self.assertEqual(other_entry.endpoint_source, "logs/shared") - self.assertEqual(other_entry.target, "/snap/grafana-agent/7/shared-logs/shared") - self.assertEqual(other_entry.relative_target, "/shared") - - def test_same_slot_plugs_parses(self): - fstab = """ - /var/snap/charmed-kafka/common/log /snap/grafana-agent/7/shared-logs/log none bind,ro 0 0\n - /var/snap/other-snap/common/log /snap/grafana-agent/7/shared-logs/log-1 none bind,ro 0 0\n - """ - fstab_file = Path(self.sandbox_root) / "single-plug-fstab" - fstab_file.write_text(fstab) - - fstab = SnapFstab(fstab_file) - entry = fstab.entry("charmed-kafka", "logs") - self.assertEqual(entry.endpoint_source, "common/log") - self.assertEqual(entry.target, "/snap/grafana-agent/7/shared-logs/log") - self.assertEqual(entry.relative_target, "/log") - - other_entry = fstab.entry("other-snap", "logs") - self.assertEqual(other_entry.owner, "other-snap") - self.assertEqual(other_entry.endpoint_source, "common/log") - self.assertEqual(other_entry.target, "/snap/grafana-agent/7/shared-logs/log-1") - self.assertEqual(other_entry.relative_target, "/log-1") diff --git a/tests/unit/machine/test_relation_status.py b/tests/unit/machine/test_relation_status.py deleted file mode 100644 index 1c1b7b6a..00000000 --- a/tests/unit/machine/test_relation_status.py +++ /dev/null @@ -1,83 +0,0 @@ -# Copyright 2021 Canonical Ltd. -# See LICENSE file for licensing details. - -import tempfile -import unittest -from unittest.mock import patch - -from charm import GrafanaAgentMachineCharm as GrafanaAgentCharm -from ops.model import ActiveStatus, BlockedStatus -from ops.testing import Harness - - -class TestRelationStatus(unittest.TestCase): - def setUp(self, *unused): - patcher = patch.object(GrafanaAgentCharm, "_agent_version", property(lambda *_: "0.0.0")) - self.mock_version = patcher.start() - self.addCleanup(patcher.stop) - - temp_config_path = tempfile.mkdtemp() + "/grafana-agent.yaml" - # otherwise will attempt to write to /etc/grafana-agent.yaml - patcher = patch("grafana_agent.CONFIG_PATH", temp_config_path) - self.config_path_mock = patcher.start() - self.addCleanup(patcher.stop) - - patcher = patch("charm.snap") - self.mock_snap = patcher.start() - self.addCleanup(patcher.stop) - - self.harness = Harness(GrafanaAgentCharm) - self.harness.set_model_name(self.__class__.__name__) - - self.addCleanup(self.harness.cleanup) - self.harness.set_leader(True) - self.harness.begin_with_initial_hooks() - - def test_no_relations(self): - # GIVEN no relations joined (see SetUp) - # WHEN the charm starts (see SetUp) - # THEN status is "blocked" - self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus) - - # AND WHEN "update-status" fires - self.harness.charm.on.update_status.emit() - # THEN status is still "blocked" - self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus) - - def test_cos_agent_with_relations(self): - # WHEN an incoming relation is added - rel_id = self.harness.add_relation("cos-agent", "grafana-agent") - self.harness.add_relation_unit(rel_id, "grafana-agent/0") - - # THEN the charm goes into blocked status - self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus) - - # AND WHEN at least one of the necessary outgoing relations is added - for outgoing in ["send-remote-write", "logging-consumer", "grafana-dashboards-provider"]: - rel_id = self.harness.add_relation(outgoing, "grafana-agent") - self.harness.add_relation_unit(rel_id, "grafana-agent/0") - - # THEN the charm goes into active status when one mandatory relation is added - self.assertIsInstance(self.harness.charm.unit.status, ActiveStatus) - - # AND WHEN we remove one of the mandatory relations - self.harness.remove_relation(rel_id) - - # THEN the charm keeps into active status - self.assertIsInstance(self.harness.charm.unit.status, ActiveStatus) - - def test_juju_info_with_relations(self): - # WHEN an incoming relation is added - rel_id = self.harness.add_relation("juju-info", "grafana-agent") - self.harness.add_relation_unit(rel_id, "grafana-agent/0") - - # THEN the charm goes into blocked status - self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus) - - # AND WHEN all the necessary outgoing relations are added - for outgoing in ["send-remote-write", "logging-consumer"]: - rel_id = self.harness.add_relation(outgoing, "grafana-agent") - self.harness.add_relation_unit(rel_id, "grafana-agent/0") - - # THEN the charm goes into active status - self.assertIsInstance(self.harness.charm.unit.status, ActiveStatus) diff --git a/tests/unit/machine/test_update_status.py b/tests/unit/machine/test_update_status.py deleted file mode 100644 index fadf8932..00000000 --- a/tests/unit/machine/test_update_status.py +++ /dev/null @@ -1,41 +0,0 @@ -# Copyright 2021 Canonical Ltd. -# See LICENSE file for licensing details. - -import tempfile -import unittest -from unittest.mock import patch - -from charm import GrafanaAgentMachineCharm as GrafanaAgentCharm -from ops.testing import Harness - - -class TestUpdateStatus(unittest.TestCase): - def setUp(self, *unused): - patcher = patch.object(GrafanaAgentCharm, "_agent_version", property(lambda *_: "0.0.0")) - self.mock_version = patcher.start() - self.addCleanup(patcher.stop) - - temp_config_path = tempfile.mkdtemp() + "/grafana-agent.yaml" - # otherwise will attempt to write to /etc/grafana-agent.yaml - patcher = patch("grafana_agent.CONFIG_PATH", temp_config_path) - self.config_path_mock = patcher.start() - self.addCleanup(patcher.stop) - - patcher = patch("charm.snap") - self.mock_snap = patcher.start() - self.addCleanup(patcher.stop) - - self.harness = Harness(GrafanaAgentCharm) - self.harness.set_model_name(self.__class__.__name__) - - self.addCleanup(self.harness.cleanup) - self.harness.set_leader(True) - self.harness.begin_with_initial_hooks() - - def test_no_relations(self): - self.harness.charm.on.update_status.emit() - - def test_with_relations(self): - # self.relation_id = self.harness.add_relation("alerting", "otherapp") - # self.harness.add_relation_unit(self.relation_id, "otherapp/0") - pass diff --git a/tests/unit/k8s/test_alerts.py b/tests/unit/test_alerts.py similarity index 100% rename from tests/unit/k8s/test_alerts.py rename to tests/unit/test_alerts.py diff --git a/tests/unit/k8s/test_relation_status.py b/tests/unit/test_relation_status.py similarity index 100% rename from tests/unit/k8s/test_relation_status.py rename to tests/unit/test_relation_status.py diff --git a/tests/unit/k8s/test_scrape_configuration.py b/tests/unit/test_scrape_configuration.py similarity index 100% rename from tests/unit/k8s/test_scrape_configuration.py rename to tests/unit/test_scrape_configuration.py diff --git a/tests/unit/k8s/test_update_status.py b/tests/unit/test_update_status.py similarity index 100% rename from tests/unit/k8s/test_update_status.py rename to tests/unit/test_update_status.py diff --git a/tox.ini b/tox.ini index c0b5e89d..9f824d2b 100644 --- a/tox.ini +++ b/tox.ini @@ -3,13 +3,12 @@ [tox] isolated_build = True skip_missing_interpreters = True -envlist = lint, static-{charm,lib}, unit, unit-machine, scenario, scenario-{k8s,machine} +envlist = lint, static-{charm,lib}, unit, scenario [vars] src_path = {toxinidir}/src tst_path = {toxinidir}/tests -lib_path = {toxinidir}/lib/charms/grafana_agent/v0 -all_path = {[vars]src_path} {[vars]tst_path} {[vars]lib_path} +all_path = {[vars]src_path} {[vars]tst_path} [testenv] basepython = python3 @@ -37,25 +36,6 @@ commands = ruff --fix {[vars]all_path} black {[vars]all_path} -[testenv:render-k8s] -skip_install=True -description = Render the k8s charm -allowlist_externals = cp -commands = - cp {toxinidir}/src/k8s_charm.py {toxinidir}/src/charm.py - cp {toxinidir}/k8s_metadata.yaml {toxinidir}/metadata.yaml - -[testenv:render-machine] -skip_install=True -description = Render the machine charm -allowlist_externals = - cp - chmod -commands = - cp {toxinidir}/src/machine_charm.py {toxinidir}/src/charm.py - chmod +x {toxinidir}/src/charm.py - cp {toxinidir}/machine_metadata.yaml {toxinidir}/metadata.yaml - [testenv:lint] skip_install=True description = Check code against coding style standards @@ -68,20 +48,17 @@ commands = ruff {[vars]all_path} black --check --diff {[vars]all_path} -[testenv:static-{charm,lib}] +[testenv:static-{charm}] skip_install=True description = Run static analysis checks deps = pyright -r {toxinidir}/requirements.txt - lib: ops commands = charm: pyright {[vars]src_path} - lib: pyright {[vars]lib_path} -[testenv:unit-machine] -skip_install=True -description = Run machine charm unit tests +[testenv:unit] +description = Run unit tests deps = -r{toxinidir}/requirements.txt pytest @@ -91,39 +68,20 @@ deps = fs toml responses -allowlist_externals = cp commands = - # render as machine - cp {[vars]src_path}/machine_charm.py {[vars]src_path}/charm.py - cp machine_metadata.yaml metadata.yaml - coverage run \ --source={[vars]src_path} \ - -m pytest -v --tb native --log-cli-level=INFO -s {posargs} {[vars]tst_path}/unit/machine + -m pytest -v --tb native --log-cli-level=INFO -s {posargs} {[vars]tst_path}/unit coverage report -m -[testenv:unit] -description = Run k8s charm unit tests +[testenv:scenario] +description = Run scenario tests on K8s deps = -r{toxinidir}/requirements.txt pytest - pytest-subtests - coverage[toml] - deepdiff - fs - toml - responses -allowlist_externals = cp + ops-scenario > 4 commands = - # render as k8s - cp {[vars]src_path}/k8s_charm.py {[vars]src_path}/charm.py - cp k8s_metadata.yaml metadata.yaml - - coverage run \ - --source={[vars]src_path} \ - -m pytest -v --tb native --log-cli-level=INFO -s {posargs} {[vars]tst_path}/unit/k8s - coverage report -m - + pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tst_path}/scenario --ignore {[vars]tst_path}/scenario/test_k8s [testenv:integration] skip_install=True @@ -136,64 +94,9 @@ deps = pytest prometheus-api-client pytest-operator -allowlist_externals = cp commands = - # use a better solution when we actually have machine code - cp {[vars]src_path}/k8s_charm.py {[vars]src_path}/charm.py - cp k8s_metadata.yaml metadata.yaml - pytest -v --tb native --log-cli-level=INFO -s {posargs} {[vars]tst_path}/integration -[testenv:scenario] -skip_install=True -allowlist_externals = - tox -commands = - tox -e scenario-k8s - tox -e scenario-machine - -[testenv:scenario-machine] -description = Run scenario tests on LXD -deps = - -r{toxinidir}/requirements.txt - pytest - ops-scenario > 4 -allowlist_externals = cp -commands = - cp {[vars]src_path}/machine_charm.py {[vars]src_path}/charm.py - cp machine_metadata.yaml metadata.yaml - pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tst_path}/scenario/test_machine_charm --ignore {[vars]tst_path}/scenario/test_k8s - -[testenv:scenario-k8s] -description = Run scenario tests on K8s -deps = - -r{toxinidir}/requirements.txt - pytest - ops-scenario > 4 -allowlist_externals = cp -commands = - cp {[vars]src_path}/k8s_charm.py {[vars]src_path}/charm.py - cp k8s_metadata.yaml metadata.yaml - pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tst_path}/scenario --ignore {[vars]tst_path}/scenario/test_machine_charm - -[testenv:integration-machine] -description = Run integration tests (machine charm) -deps = - aiohttp - asyncstdlib - # Libjuju needs to track the juju version - juju ~= 3.0.0 - pytest - prometheus-api-client - pytest-operator -allowlist_externals = cp -commands = - # use a better solution when we actually have machine code - cp {[vars]src_path}/machine_charm.py {[vars]src_path}/charm.py - cp machine_metadata.yaml metadata.yaml - - pytest -v --tb native --log-cli-level=INFO -s {posargs} {[vars]tst_path}/integration-machine - [testenv:check] skip_install=True depends =