From 119168335c883acd8db53e7419634452362fd609 Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Wed, 15 Nov 2023 11:53:19 +0100 Subject: [PATCH] removed scripts --- README.md | 23 +- pyproject.toml | 2 - scenario/integrations/darkroom.py | 406 ------- scenario/scripts/errors.py | 17 - scenario/scripts/logger.py | 17 - scenario/scripts/main.py | 57 - scenario/scripts/snapshot.py | 1001 ----------------- scenario/scripts/state_apply.py | 256 ----- scenario/scripts/utils.py | 23 - .../test_darkroom_harness.py | 20 - .../test_install_harness.py | 33 - .../test_integrations_harness.py | 78 -- .../test_darkroom_scenario.py | 20 - .../test_install_scenario.py | 33 - .../test_integrations_scenario.py | 78 -- .../test_darkroom_live.py | 2 - 16 files changed, 2 insertions(+), 2064 deletions(-) delete mode 100644 scenario/integrations/darkroom.py delete mode 100644 scenario/scripts/errors.py delete mode 100644 scenario/scripts/logger.py delete mode 100644 scenario/scripts/main.py delete mode 100644 scenario/scripts/snapshot.py delete mode 100644 scenario/scripts/state_apply.py delete mode 100644 scenario/scripts/utils.py delete mode 100644 tests/test_darkroom/test_harness_integration/test_darkroom_harness.py delete mode 100644 tests/test_darkroom/test_harness_integration/test_install_harness.py delete mode 100644 tests/test_darkroom/test_harness_integration/test_integrations_harness.py delete mode 100644 tests/test_darkroom/test_live_integration/test_darkroom_scenario.py delete mode 100644 tests/test_darkroom/test_live_integration/test_install_scenario.py delete mode 100644 tests/test_darkroom/test_live_integration/test_integrations_scenario.py delete mode 100644 tests/test_darkroom/test_scenario_integration/test_darkroom_live.py diff --git a/README.md b/README.md index 674d038f..6c08886a 100644 --- a/README.md +++ b/README.md @@ -1256,25 +1256,6 @@ If you have a clear false negative, are explicitly testing 'edge', inconsistent checker is in your way, you can set the `SCENARIO_SKIP_CONSISTENCY_CHECKS` envvar and skip it altogether. Hopefully you don't need that. -# Snapshot +# Jhack integrations -Scenario comes with a cli tool called `snapshot`. Assuming you've pip-installed `ops-scenario`, you should be able to -reach the entry point by typing `scenario snapshot` in a shell so long as the install dir is in your `PATH`. - -Snapshot's purpose is to gather the `State` data structure from a real, live charm running in some cloud your local juju -client has access to. This is handy in case: - -- you want to write a test about the state the charm you're developing is currently in -- your charm is bork or in some inconsistent state, and you want to write a test to check the charm will handle it - correctly the next time around (aka regression testing) -- you are new to Scenario and want to quickly get started with a real-life example. - -Suppose you have a Juju model with a `prometheus-k8s` unit deployed as `prometheus-k8s/0`. If you type -`scenario snapshot prometheus-k8s/0`, you will get a printout of the State object. Pipe that out into some file, import -all you need from `scenario`, and you have a working `State` that you can `Context.run` events with. - -You can also pass a `--format` flag to obtain instead: - -- a jsonified `State` data structure, for portability -- a full-fledged pytest test case (with imports and all), where you only have to fill in the charm type and the event - that you wish to trigger. +The [`Jhack scenario`](todo link to jhack) subcommand offers some utilities to work with Scenario. diff --git a/pyproject.toml b/pyproject.toml index b728b128..f548183e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,8 +39,6 @@ classifiers = [ "Homepage" = "https://github.com/canonical/ops-scenario" "Bug Tracker" = "https://github.com/canonical/ops-scenario/issues" -[project.scripts] -scenario = "scenario.scripts.main:main" [tool.setuptools.package-dir] scenario = "scenario" diff --git a/scenario/integrations/darkroom.py b/scenario/integrations/darkroom.py deleted file mode 100644 index 6481de43..00000000 --- a/scenario/integrations/darkroom.py +++ /dev/null @@ -1,406 +0,0 @@ -"""Darkroom.""" - - -import logging -import os -from typing import TYPE_CHECKING, Callable, List, Literal, Sequence, Tuple, Union - -import yaml -from ops import CharmBase, EventBase -from ops.model import ModelError, SecretRotate, StatusBase, _ModelBackend -from ops.testing import _TestingModelBackend - -from scenario import Container, Event, Model, Network, Port, Relation, Secret, State -from scenario.mocking import _MockModelBackend -from scenario.state import _CharmSpec - -if TYPE_CHECKING: - from ops import Framework - -_Trace = Sequence[Tuple[Event, State]] -_SupportedBackends = Union[_TestingModelBackend, _ModelBackend, _MockModelBackend] - -logger = logging.getLogger("darkroom") - -# todo move those to Scenario.State and add an Event._is_framework_event() method. -FRAMEWORK_EVENT_NAMES = {"pre_commit", "commit"} - - -class _Unknown: - def __repr__(self): - return "" - - -UNKNOWN = _Unknown() -# Singleton representing missing information that cannot be retrieved, -# because of e.g. lack of leadership. -del _Unknown - - -class Darkroom: - """Darkroom. - - Can be used to "capture" the current State of a charm, given its backend - (testing, mocked, or live). - - Designed to work with multiple backends. - - "Live model backend": live charm with real juju and pebble backends - - "Harness testing backend": simulated backend provided by ops.testing. - - "Scenario backend": simulated backend provided by ops.scenario - - Usage:: - >>> harness = Harness(MyCharm) - >>> harness.begin_with_initial_hooks() - >>> state: State = Darkroom().capture(harness.model._backend) - - - Can be "attached" to a testing harness or scenario.Context to automatically capture - state and triggering event whenever an event is emitted. Result is a "Trace", i.e. a sequence - of events (and custom events), if the charm emits any. - - Can be "installed" in a testing suite or live charm. This will autoattach it to the - current context. - - >>> l = [] - >>> def register_trace(t): # this will be called with each generated trace - >>> l.append(t) - >>> Darkroom.install(register_trace) - >>> harness = Harness(MyCharm) - >>> h.begin_with_initial_hooks() - >>> assert l[0][0][0].name == "leader_settings_changed" - >>> assert l[0][0][1].unit_status == ActiveStatus("foo") - >>> # now that Darkroom is installed, regardless of the testing backend we use to emit - >>> # events, they will be captured - >>> scenario.Context(MyCharm).run("start") - >>> assert l[1][0][0].name == "start" - >>> assert l[1][0][1].unit_status == WaitingStatus("bar") - - Usage in live charms: - Edit charm.py to read: - >>> if __name__ == '__main__': - >>> from darkroom import Darkroom - >>> Darkroom.install(print, live=True) - >>> ops.main(MyCharm) - """ - - def __init__( - self, - skip_framework_events: bool = True, - skip_custom_events: bool = False, - ): - self._skip_framework_events = skip_framework_events - self._skip_custom_events = skip_custom_events - - def _listen_to(self, event: Event, framework: "Framework") -> bool: - """Whether this event should be captured or not. - - Depends on the init-provided skip config. - """ - if self._skip_framework_events and event.name in FRAMEWORK_EVENT_NAMES: - return False - if not self._skip_custom_events: - return True - - # derive the charmspec from the framework. - # Framework contains pointers to all observers. - # attempt to autoload: - try: - charm_type = next( - filter(lambda o: isinstance(o, CharmBase), framework._objects.values()), - ) - except StopIteration as e: - raise RuntimeError("unable to find charm in framework objects") from e - - try: - charm_root = framework.charm_dir - meta = charm_root / "metadata.yaml" - if not meta.exists(): - raise RuntimeError("metadata.yaml not found") - actions = charm_root / "actions.yaml" - config = charm_root / "config.yaml" - charm_spec = _CharmSpec( - charm_type, - meta=yaml.safe_load(meta.read_text()), - actions=yaml.safe_load(actions.read_text()) - if actions.exists() - else None, - config=yaml.safe_load(config.read_text()) if config.exists() else None, - ) - except Exception as e: - # todo: fall back to generating from framework._meta - raise RuntimeError("cannot autoload charm spec") from e - - if not event._is_builtin_event(charm_spec): - return False - - return True - - @staticmethod - def _get_mode( - backend: _SupportedBackends, - ) -> Literal["harness", "scenario", "live"]: - if isinstance(backend, _TestingModelBackend): - return "harness" - elif isinstance(backend, _MockModelBackend): - return "scenario" - elif isinstance(backend, _TestingModelBackend): - return "live" - else: - raise TypeError(backend) - - def capture(self, backend: _SupportedBackends) -> State: - mode = self._get_mode(backend) - logger.info(f"capturing in mode = `{mode}`.") - - if isinstance(backend, _MockModelBackend): - return backend._state - - state = State( - config=dict(backend.config_get()), - relations=self._get_relations(backend), - containers=self._get_containers(backend), - networks=self._get_networks(backend), - secrets=self._get_secrets(backend), - opened_ports=self._get_opened_ports(backend), - leader=self._get_leader(backend), - unit_id=self._get_unit_id(backend), - app_status=self._get_app_status(backend), - unit_status=self._get_unit_status(backend), - workload_version=self._get_workload_version(backend), - model=self._get_model(backend), - ) - - return state - - @staticmethod - def _get_unit_id(backend: _SupportedBackends) -> int: - return int(backend.unit_name.split("/")[1]) - - @staticmethod - def _get_workload_version(backend: _SupportedBackends) -> int: - # only available in testing: a live charm can't get its own workload version. - return getattr(backend, "_workload_version", UNKNOWN) - - @staticmethod - def _get_unit_status(backend: _SupportedBackends) -> StatusBase: - raw = backend.status_get() - return StatusBase.from_name(message=raw["message"], name=raw["status"]) - - @staticmethod - def _get_app_status(backend: _SupportedBackends) -> StatusBase: - try: - raw = backend.status_get(is_app=True) - return StatusBase.from_name(message=raw["message"], name=raw["status"]) - except ModelError: - return UNKNOWN - - @staticmethod - def _get_model(backend: _SupportedBackends) -> Model: - if backend._meta.containers: - # if we have containers we're definitely k8s. - model_type = "kubernetes" - else: - # guess k8s|lxd from envvars - model_type = "kubernetes" if "KUBERNETES" in os.environ else "lxd" - return Model(name=backend.model_name, uuid=backend.model_uuid, type=model_type) - - @staticmethod - def _get_leader(backend: _SupportedBackends): - return backend.is_leader() - - @staticmethod - def _get_opened_ports(backend: _SupportedBackends) -> List[Port]: - return [Port(p.protocol, p.port) for p in backend.opened_ports()] - - def _get_relations(self, backend: _SupportedBackends) -> List[Relation]: - relations = [] - - local_unit_name = backend.unit_name - local_app_name = backend.unit_name.split("/")[0] - - for endpoint, ids in backend._relation_ids_map.items(): - for r_id in ids: - relations.append( - self._get_relation( - backend, - r_id, - endpoint, - local_app_name, - local_unit_name, - ), - ) - - return relations - - def _get_relation( - self, - backend: _SupportedBackends, - r_id: int, - endpoint: str, - local_app_name: str, - local_unit_name: str, - ): - def get_interface_name(endpoint: str): - return backend._meta.relations[endpoint].interface_name - - def try_get(databag, owner): - try: - return databag[owner] - except ModelError: - return UNKNOWN - - # todo switch between peer and sub - rel_data = backend._relation_data_raw[r_id] - - app_and_units = backend._relation_app_and_units[r_id] - remote_app_name = app_and_units["app"] - return Relation( - endpoint=endpoint, - interface=get_interface_name(endpoint), - relation_id=r_id, - local_app_data=try_get(rel_data, local_app_name), - local_unit_data=try_get(rel_data, local_unit_name), - remote_app_data=try_get(rel_data, remote_app_name), - remote_units_data={ - int(remote_unit_id.split("/")[1]): try_get(rel_data, remote_unit_id) - for remote_unit_id in app_and_units["units"] - }, - remote_app_name=remote_app_name, - ) - - def _get_containers(self, backend: _SupportedBackends) -> List[Container]: - containers = [] - mode = self._get_mode(backend) - - for name, c in backend._meta.containers.items(): - if mode == "live": - # todo: real pebble socket address - pebble = backend.get_pebble("") - else: - # testing backends get the 3rd elem: - path = ["a", "b", "c", name, "bar.socket"] - pebble = backend.get_pebble("/".join(path)) - assert pebble - # todo: complete container snapshot - containers.append(Container(name=name, mounts=c.mounts)) - return containers - - def _get_networks(self, backend: _SupportedBackends) -> List[Network]: - networks = [ - Network(name=nw_name, **nw) for nw_name, nw in backend._networks.items() - ] - return networks - - def _get_secrets(self, backend: _SupportedBackends) -> List[Secret]: - secrets = [] - for s in backend._secrets: - owner_app = s.owner_name.split("/")[0] - relation_id = backend._relation_id_to(owner_app) - grants = s.grants.get(relation_id, set()) - - remote_grants = set() - granted = False - for grant in grants: - if grant in (backend.unit_name, backend.app_name): - granted = grant - else: - remote_grants.add(grant) - - secrets.append( - Secret( - id=s.id, - label=s.label, - contents=backend.secret_get(s.id), - granted=granted, - remote_grants={relation_id: remote_grants}, - description=s.description, - owner=s.owner_name, - rotate=s.rotate_policy or SecretRotate.NEVER, - expire=s.expire_time, - ), - ) - return secrets - - def _get_event(self, event: EventBase) -> Event: - return Event(event.handle.kind) - - def attach(self, listener: Callable[[Event, State], None]): - """Every time an event is emitted, record the event and capture the state after execution.""" - from ops import Framework - - if not getattr(Framework, "__orig_emit__", None): - Framework.__orig_emit__ = Framework._emit # noqa - # do not simply use Framework._emit because if we apply this patch multiple times - # the previous listeners will keep being called. - - def _darkroom_emit(instance: Framework, ops_event): - # proceed with framework._emit() - Framework.__orig_emit__(instance, ops_event) - event: Event = self._get_event(ops_event) - - if not self._listen_to(event, instance): - logger.debug(f"skipping event {ops_event}") - return - - backend = instance.model._backend # noqa - # todo should we automagically event.bind(state)? - state = self.capture(backend) - listener(event, state) - - Framework._emit = _darkroom_emit - - @staticmethod - def install(listener: Callable[[_Trace], None], live: bool = False): - """Patch Harness so that every time a new instance is created, a Darkroom is attached to it. - - Note that the trace will be initially empty and will be filled up as the harness emits events. - So only access the traces when you're sure the harness is done emitting. - """ - Darkroom._install_on_harness(listener) - Darkroom._install_on_scenario(listener) - - if live: - # if we are in a live event context, we attach and register a single trace - trace = [] - listener(trace) - - # we don't do this automatically, but instead do it on an explicit live=True, - # because otherwise listener will be called with an empty trace at the - # beginning of every run. - Darkroom().attach(lambda e, s: trace.append((e, s))) - - @staticmethod - def _install_on_scenario(listener: Callable[[_Trace], None]): - from scenario import Context - - if not getattr(Context, "__orig_init__", None): - Context.__orig__init__ = Context.__init__ - # do not simply use Context.__init__ because - # if we instantiate multiple Contexts we'll keep adding to the older harnesses' traces. - - def patch(context: Context, *args, **kwargs): - trace = [] - listener(trace) - Context.__orig__init__(context, *args, **kwargs) - dr = Darkroom() - dr.attach(listener=lambda event, state: trace.append((event, state))) - - Context.__init__ = patch - - @staticmethod - def _install_on_harness(listener: Callable[[_Trace], None]): - from ops.testing import Harness - - if not getattr(Harness, "__orig_init__", None): - Harness.__orig_init__ = Harness.__init__ - # do not simply use Harness.__init__ because - # if we instantiate multiple harnesses we'll keep adding to the older harnesses' traces. - - def patch(harness: Harness, *args, **kwargs): - trace = [] - listener(trace) - Harness.__orig_init__(harness, *args, **kwargs) - dr = Darkroom() - dr.attach(listener=lambda event, state: trace.append((event, state))) - - Harness.__init__ = patch diff --git a/scenario/scripts/errors.py b/scenario/scripts/errors.py deleted file mode 100644 index f713ef60..00000000 --- a/scenario/scripts/errors.py +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. -class SnapshotError(RuntimeError): - """Base class for errors raised by snapshot.""" - - -class InvalidTargetUnitName(SnapshotError): - """Raised if the unit name passed to snapshot is invalid.""" - - -class InvalidTargetModelName(SnapshotError): - """Raised if the model name passed to snapshot is invalid.""" - - -class StateApplyError(SnapshotError): - """Raised when the state-apply juju command fails.""" diff --git a/scenario/scripts/logger.py b/scenario/scripts/logger.py deleted file mode 100644 index 98cadfb1..00000000 --- a/scenario/scripts/logger.py +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - -import logging -import os - -logger = logging.getLogger(__file__) - - -def setup_logging(verbosity: int): - base_loglevel = int(os.getenv("LOGLEVEL", 30)) - verbosity = min(verbosity, 2) - loglevel = base_loglevel - (verbosity * 10) - logging.basicConfig(format="%(message)s") - logging.getLogger().setLevel(logging.WARNING) - logger.setLevel(loglevel) diff --git a/scenario/scripts/main.py b/scenario/scripts/main.py deleted file mode 100644 index ebee6084..00000000 --- a/scenario/scripts/main.py +++ /dev/null @@ -1,57 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. -from importlib import metadata -from importlib.metadata import PackageNotFoundError -from pathlib import Path - -import typer - -from scenario.scripts import logger -from scenario.scripts.snapshot import snapshot -from scenario.scripts.state_apply import state_apply - - -def _version(): - """Print the scenario version and exit.""" - try: - print(metadata.version("ops-scenario")) - return - except PackageNotFoundError: - pass - - pyproject_toml = Path(__file__).parent.parent.parent / "pyproject.toml" - - if not pyproject_toml.exists(): - print("") - return - - for line in pyproject_toml.read_text().split("\n"): - if line.startswith("version"): - print(line.split("=")[1].strip("\"' ")) - return - - -def main(): - app = typer.Typer( - name="scenario", - help="Scenario utilities. " - "For docs, issues and feature requests, visit " - "the github repo --> https://github.com/canonical/ops-scenario", - no_args_is_help=True, - rich_markup_mode="markdown", - ) - - app.command(name="version")(_version) - app.command(name="snapshot", no_args_is_help=True)(snapshot) - app.command(name="state-apply", no_args_is_help=True)(state_apply) - - @app.callback() - def setup_logging(verbose: int = typer.Option(0, "-v", count=True)): - logger.setup_logging(verbose) - - app() - - -if __name__ == "__main__": - main() diff --git a/scenario/scripts/snapshot.py b/scenario/scripts/snapshot.py deleted file mode 100644 index f2c678b6..00000000 --- a/scenario/scripts/snapshot.py +++ /dev/null @@ -1,1001 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - -import datetime -import json -import os -import re -import shlex -import sys -import tempfile -from dataclasses import asdict, dataclass -from enum import Enum -from itertools import chain -from pathlib import Path -from subprocess import run -from typing import Any, BinaryIO, Dict, Iterable, List, Optional, TextIO, Tuple, Union - -import ops.pebble -import typer -import yaml -from ops.storage import SQLiteStorage - -from scenario.runtime import UnitStateDB -from scenario.scripts.errors import InvalidTargetModelName, InvalidTargetUnitName -from scenario.scripts.logger import logger as root_scripts_logger -from scenario.scripts.utils import JujuUnitName -from scenario.state import ( - Address, - BindAddress, - BindFailedError, - Container, - Event, - Model, - Mount, - Network, - Port, - Relation, - Secret, - State, - _EntityStatus, -) - -logger = root_scripts_logger.getChild(__file__) - -JUJU_RELATION_KEYS = frozenset({"egress-subnets", "ingress-address", "private-address"}) -JUJU_CONFIG_KEYS = frozenset({}) - -SNAPSHOT_OUTPUT_DIR = (Path(os.getcwd()).parent / "snapshot_storage").absolute() -CHARM_SUBCLASS_REGEX = re.compile(r"class (\D+)\(CharmBase\):") - - -def _try_format(string: str): - try: - import black - - try: - return black.format_str(string, mode=black.Mode()) - except black.parsing.InvalidInput as e: - logger.error(f"error parsing {string}: {e}") - return string - except ModuleNotFoundError: - logger.warning("install black for formatting") - return string - - -def format_state(state: State): - """Stringify this State as nicely as possible.""" - return _try_format(repr(state)) - - -PYTEST_TEST_TEMPLATE = """ -from scenario import * -from charm import {ct} - -def test_case(): - # Arrange: prepare the state - state = {state} - - #Act: trigger an event on the state - ctx = Context( - {ct}, - juju_version="{jv}") - - out = ctx.run( - {en} - state, - ) - - # Assert: verify that the output state is the way you want it to be - # TODO: add assertions -""" - - -def format_test_case( - state: State, - charm_type_name: str = None, - event_name: str = None, - juju_version: str = None, -): - """Format this State as a pytest test case.""" - ct = charm_type_name or "CHARM_TYPE, # TODO: replace with charm type name" - en = "EVENT_NAME, # TODO: replace with event name" - if event_name: - try: - en = Event(event_name).bind(state) - except BindFailedError: - logger.error( - f"Failed to bind {event_name} to {state}; leaving placeholder instead", - ) - - jv = juju_version or "3.0, # TODO: check juju version is correct" - state_fmt = repr(state) - return _try_format( - PYTEST_TEST_TEMPLATE.format(state=state_fmt, ct=ct, en=en, jv=jv), - ) - - -def _juju_run(cmd: str, model=None) -> Dict[str, Any]: - """Execute juju {command} in a given model.""" - _model = f" -m {model}" if model else "" - cmd = f"juju {cmd}{_model} --format json" - raw = run(shlex.split(cmd), capture_output=True, text=True).stdout - return json.loads(raw) - - -def _juju_ssh(target: JujuUnitName, cmd: str, model: Optional[str] = None) -> str: - _model = f" -m {model}" if model else "" - command = f"juju ssh{_model} {target.unit_name} {cmd}" - raw = run(shlex.split(command), capture_output=True, text=True).stdout - return raw - - -def _juju_exec(target: JujuUnitName, model: Optional[str], cmd: str) -> str: - """Execute a juju command. - - Notes: - Visit the Juju documentation to view all possible Juju commands: - https://juju.is/docs/olm/juju-cli-commands - """ - _model = f" -m {model}" if model else "" - _target = f" -u {target}" if target else "" - return run( - shlex.split(f"juju exec{_model}{_target} -- {cmd}"), - capture_output=True, - text=True, - ).stdout - - -def get_leader(target: JujuUnitName, model: Optional[str]): - # could also get it from _juju_run('status')... - logger.info("getting leader...") - return _juju_exec(target, model, "is-leader") == "True" - - -def get_network(target: JujuUnitName, model: Optional[str], endpoint: str) -> Network: - """Get the Network data structure for this endpoint.""" - raw = _juju_exec(target, model, f"network-get {endpoint}") - json_data = yaml.safe_load(raw) - - bind_addresses = [] - for raw_bind in json_data["bind-addresses"]: - addresses = [] - for raw_adds in raw_bind["addresses"]: - addresses.append( - Address( - hostname=raw_adds["hostname"], - value=raw_adds["value"], - cidr=raw_adds["cidr"], - address=raw_adds.get("address", ""), - ), - ) - - bind_addresses.append( - BindAddress( - interface_name=raw_bind.get("interface-name", ""), - addresses=addresses, - ), - ) - return Network( - name=endpoint, - bind_addresses=bind_addresses, - egress_subnets=json_data.get("egress-subnets", None), - ingress_addresses=json_data.get("ingress-addresses", None), - ) - - -def get_secrets( - target: JujuUnitName, # noqa: U100 - model: Optional[str], # noqa: U100 - metadata: Dict, # noqa: U100 - relations: Tuple[str, ...] = (), # noqa: U100 -) -> List[Secret]: - """Get Secret list from the charm.""" - logger.warning("Secrets snapshotting not implemented yet. Also, are you *sure*?") - return [] - - -def get_networks( - target: JujuUnitName, - model: Optional[str], - metadata: Dict, - include_dead: bool = False, - relations: Tuple[str, ...] = (), -) -> List[Network]: - """Get all Networks from this unit.""" - logger.info("getting networks...") - networks = [get_network(target, model, "juju-info")] - - endpoints = relations # only alive relations - if include_dead: - endpoints = chain( - metadata.get("provides", ()), - metadata.get("requires", ()), - metadata.get("peers", ()), - ) - - for endpoint in endpoints: - logger.debug(f" getting network for endpoint {endpoint!r}") - networks.append(get_network(target, model, endpoint)) - return networks - - -def get_metadata(target: JujuUnitName, model: Model): - """Get metadata.yaml from this target.""" - logger.info("fetching metadata...") - - meta_path = target.remote_charm_root / "metadata.yaml" - - raw_meta = _juju_ssh( - target, - f"cat {meta_path}", - model=model.name, - ) - return yaml.safe_load(raw_meta) - - -class RemotePebbleClient: - """Clever little class that wraps calls to a remote pebble client.""" - - def __init__( - self, - container: str, - target: JujuUnitName, - model: Optional[str] = None, - ): - self.socket_path = f"/charm/containers/{container}/pebble.socket" - self.container = container - self.target = target - self.model = model - - def _run(self, cmd: str) -> str: - _model = f" -m {self.model}" if self.model else "" - command = ( - f"juju ssh{_model} --container {self.container} {self.target.unit_name} " - f"/charm/bin/pebble {cmd}" - ) - proc = run(shlex.split(command), capture_output=True, text=True) - if proc.returncode == 0: - return proc.stdout - raise RuntimeError( - f"error wrapping pebble call with {command}: " - f"process exited with {proc.returncode}; " - f"stdout = {proc.stdout}; " - f"stderr = {proc.stderr}", - ) - - def can_connect(self) -> bool: - try: - version = self.get_system_info() - except Exception: - return False - return bool(version) - - def get_system_info(self): - return self._run("version") - - def get_plan(self) -> dict: - plan_raw = self._run("plan") - return yaml.safe_load(plan_raw) - - def pull( - self, - path: str, # noqa: U100 - *, - encoding: Optional[str] = "utf-8", # noqa: U100 - ) -> Union[BinaryIO, TextIO]: - raise NotImplementedError() - - def list_files( - self, - path: str, # noqa: U100 - *, - pattern: Optional[str] = None, # noqa: U100 - itself: bool = False, # noqa: U100 - ) -> List[ops.pebble.FileInfo]: - raise NotImplementedError() - - def get_checks( - self, - level: Optional[ops.pebble.CheckLevel] = None, - names: Optional[Iterable[str]] = None, - ) -> List[ops.pebble.CheckInfo]: - _level = f" --level={level}" if level else "" - _names = (" " + " ".join(names)) if names else "" - out = self._run(f"checks{_level}{_names}") - if out == "Plan has no health checks.": - return [] - raise NotImplementedError() - - -def fetch_file( - target: JujuUnitName, - remote_path: Union[Path, str], - container_name: str, - local_path: Union[Path, str], - model: Optional[str] = None, -) -> None: - """Download a file from a live unit to a local path.""" - model_arg = f" -m {model}" if model else "" - scp_cmd = ( - f"juju scp --container {container_name}{model_arg} " - f"{target.unit_name}:{remote_path} {local_path}" - ) - run(shlex.split(scp_cmd)) - - -def get_mounts( - target: JujuUnitName, - model: Optional[str], - container_name: str, - container_meta: Dict, - fetch_files: Optional[List[Path]] = None, - temp_dir_base_path: Path = SNAPSHOT_OUTPUT_DIR, -) -> Dict[str, Mount]: - """Get named Mounts from a container's metadata, and download specified files from the unit.""" - mount_meta = container_meta.get("mounts") - - if fetch_files and not mount_meta: - logger.error( - f"No mounts defined for container {container_name} in metadata.yaml. " - f"Cannot fetch files {fetch_files} for this container.", - ) - return {} - - mount_spec = {} - for mt in mount_meta or (): - if name := mt.get("storage"): - mount_spec[name] = mt["location"] - else: - logger.error(f"unknown mount type: {mt}") - - mounts = {} - for remote_path in fetch_files or (): - found = None - for mn, mt in mount_spec.items(): - if str(remote_path).startswith(mt): - found = mn, mt - - if not found: - logger.error( - "could not find mount corresponding to requested remote_path " - f"{remote_path}: skipping...", - ) - continue - - mount_name, src = found - mount = mounts.get(mount_name) - if not mount: - # create the mount obj and tempdir - location = tempfile.TemporaryDirectory(prefix=str(temp_dir_base_path)).name - mount = Mount(src=src, location=location) - mounts[mount_name] = mount - - # populate the local tempdir - filepath = Path(mount.location).joinpath(*remote_path.parts[1:]) - os.makedirs(os.path.dirname(filepath), exist_ok=True) - try: - fetch_file( - target, - container_name=container_name, - model=model, - remote_path=remote_path, - local_path=filepath, - ) - - except RuntimeError as e: - logger.error(e) - - return mounts - - -def get_container( - target: JujuUnitName, - model: Optional[str], - container_name: str, - container_meta: Dict, - fetch_files: Optional[List[Path]] = None, - temp_dir_base_path: Path = SNAPSHOT_OUTPUT_DIR, -) -> Container: - """Get container data structure from the target.""" - remote_client = RemotePebbleClient(container_name, target, model) - plan = remote_client.get_plan() - - return Container( - name=container_name, - _base_plan=plan, - can_connect=remote_client.can_connect(), - mounts=get_mounts( - target, - model, - container_name, - container_meta, - fetch_files, - temp_dir_base_path=temp_dir_base_path, - ), - ) - - -def get_containers( - target: JujuUnitName, - model: Optional[str], - metadata: Optional[Dict], - fetch_files: Dict[str, List[Path]] = None, - temp_dir_base_path: Path = SNAPSHOT_OUTPUT_DIR, -) -> List[Container]: - """Get all containers from this unit.""" - fetch_files = fetch_files or {} - logger.info("getting containers...") - - if not metadata: - logger.warning("no metadata: unable to get containers") - return [] - - containers = [] - for container_name, container_meta in metadata.get("containers", {}).items(): - container = get_container( - target, - model, - container_name, - container_meta, - fetch_files=fetch_files.get(container_name), - temp_dir_base_path=temp_dir_base_path, - ) - containers.append(container) - return containers - - -def get_juju_status(model: Optional[str]) -> Dict: - """Return juju status as json.""" - logger.info("getting status...") - return _juju_run("status --relations", model=model) - - -@dataclass -class Status: - app: _EntityStatus - unit: _EntityStatus - workload_version: str - - -def get_status(juju_status: Dict, target: JujuUnitName) -> Status: - """Parse `juju status` to get the Status data structure and some relation information.""" - app = juju_status["applications"][target.app_name] - - app_status_raw = app["application-status"] - app_status = app_status_raw["current"], app_status_raw.get("message", "") - - unit_status_raw = app["units"][target]["workload-status"] - unit_status = unit_status_raw["current"], unit_status_raw.get("message", "") - - workload_version = app.get("version", "") - return Status( - app=_EntityStatus(*app_status), - unit=_EntityStatus(*unit_status), - workload_version=workload_version, - ) - - -def get_endpoints(juju_status: Dict, target: JujuUnitName) -> Tuple[str, ...]: - """Parse `juju status` to get the relation names owned by the target.""" - app = juju_status["applications"][target.app_name] - relations_raw = app.get("relations", None) - if not relations_raw: - return () - relations = tuple(relations_raw.keys()) - return relations - - -def get_opened_ports( - target: JujuUnitName, - model: Optional[str], -) -> List[Port]: - """Get opened ports list from target.""" - logger.info("getting opened ports...") - - opened_ports_raw = _juju_exec( - target, - model, - "opened-ports --format json", - ) - ports = [] - - for raw_port in json.loads(opened_ports_raw): - _port_n, _proto = raw_port.split("/") - ports.append(Port(_proto, int(_port_n))) - - return ports - - -def get_config( - target: JujuUnitName, - model: Optional[str], -) -> Dict[str, Union[str, int, float, bool]]: - """Get config dict from target.""" - - logger.info("getting config...") - json_data = _juju_run(f"config {target.app_name}", model=model) - - # dispatch table for builtin config options - converters = { - "string": str, - "int": int, - "integer": int, # fixme: which one is it? - "number": float, - "boolean": lambda x: x == "true", - "attrs": lambda x: x, # fixme: wot? - } - - cfg = {} - for name, option in json_data.get("settings", ()).items(): - if value := option.get("value"): - try: - converter = converters[option["type"]] - except KeyError: - raise ValueError(f'unrecognized type {option["type"]}') - cfg[name] = converter(value) - - else: - logger.debug(f"skipped {name}: no value.") - - return cfg - - -def _get_interface_from_metadata(endpoint: str, metadata: Dict) -> Optional[str]: - """Get the name of the interface used by endpoint.""" - for role in ["provides", "requires"]: - for ep, ep_meta in metadata.get(role, {}).items(): - if ep == endpoint: - return ep_meta["interface"] - - logger.error(f"No interface for endpoint {endpoint} found in charm metadata.") - return None - - -def get_relations( - target: JujuUnitName, - model: Optional[str], - metadata: Dict, - include_juju_relation_data=False, -) -> List[Relation]: - """Get the list of relations active for this target.""" - logger.info("getting relations...") - - try: - json_data = _juju_run(f"show-unit {target}", model=model) - except json.JSONDecodeError as e: - raise InvalidTargetUnitName(target) from e - - def _clean(relation_data: dict): - if include_juju_relation_data: - return relation_data - else: - for key in JUJU_RELATION_KEYS: - del relation_data[key] - return relation_data - - relations = [] - for raw_relation in json_data[target].get("relation-info", ()): - logger.debug( - f" getting relation data for endpoint {raw_relation.get('endpoint')!r}", - ) - related_units = raw_relation.get("related-units") - if not related_units: - continue - # related-units: - # owner/0: - # in-scope: true - # data: - # egress-subnets: 10.152.183.130/32 - # ingress-address: 10.152.183.130 - # private-address: 10.152.183.130 - - relation_id = raw_relation["relation-id"] - - local_unit_data_raw = _juju_exec( - target, - model, - f"relation-get -r {relation_id} - {target} --format json", - ) - local_unit_data = json.loads(local_unit_data_raw) - local_app_data_raw = _juju_exec( - target, - model, - f"relation-get -r {relation_id} - {target} --format json --app", - ) - local_app_data = json.loads(local_app_data_raw) - - some_remote_unit_id = JujuUnitName(next(iter(related_units))) - - # fixme: at the moment the juju CLI offers no way to see what type of relation this is; - # if it's a peer relation or a subordinate, we should use the corresponding - # scenario.state types instead of a regular Relation. - - relations.append( - Relation( - endpoint=raw_relation["endpoint"], - interface=_get_interface_from_metadata( - raw_relation["endpoint"], - metadata, - ), - relation_id=relation_id, - remote_app_data=raw_relation["application-data"], - remote_app_name=some_remote_unit_id.app_name, - remote_units_data={ - JujuUnitName(tgt).unit_id: _clean(val["data"]) - for tgt, val in related_units.items() - }, - local_app_data=local_app_data, - local_unit_data=_clean(local_unit_data), - ), - ) - return relations - - -def get_model(name: str = None) -> Model: - """Get the Model data structure.""" - logger.info("getting model...") - - json_data = _juju_run("models") - model_name = name or json_data["current-model"] - try: - model_info = next( - filter(lambda m: m["short-name"] == model_name, json_data["models"]), - ) - except StopIteration as e: - raise InvalidTargetModelName(name) from e - - model_uuid = model_info["model-uuid"] - model_type = model_info["type"] - - return Model(name=model_name, uuid=model_uuid, type=model_type) - - -def try_guess_charm_type_name() -> Optional[str]: - """If we are running this from a charm project root, get the charm type name from charm.py.""" - try: - charm_path = Path(os.getcwd()) / "src" / "charm.py" - if charm_path.exists(): - source = charm_path.read_text() - charms = CHARM_SUBCLASS_REGEX.findall(source) - if len(charms) < 1: - raise RuntimeError(f"Not enough charms at {charm_path}.") - elif len(charms) > 1: - raise RuntimeError(f"Too many charms at {charm_path}.") - return charms[0] - except Exception as e: - logger.warning(f"unable to guess charm type: {e}") - return None - - -class FormatOption( - str, - Enum, -): # Enum for typer support, str for native comparison and ==. - """Output formatting options for snapshot.""" - - state = "state" # the default: will print the python repr of the State dataclass. - json = "json" - pytest = "pytest" - - -def get_juju_version(juju_status: Dict) -> str: - """Get juju agent version from juju status output.""" - return juju_status["model"]["version"] - - -def get_charm_version(target: JujuUnitName, juju_status: Dict) -> str: - """Get charm version info from juju status output.""" - app_info = juju_status["applications"][target.app_name] - channel = app_info.get("charm-channel", "") - charm_name = app_info.get("charm-name", "n/a") - workload_version = app_info.get("version", "n/a") - charm_rev = app_info.get("charm-rev", "n/a") - charm_origin = app_info.get("charm-origin", "n/a") - return ( - f"charm {charm_name!r} ({channel}/{charm_rev}); " - f"origin := {charm_origin}; app version := {workload_version}." - ) - - -class RemoteUnitStateDB(UnitStateDB): - """Represents a remote unit's state db.""" - - def __init__(self, model: Optional[str], target: JujuUnitName): - self._tempfile = tempfile.NamedTemporaryFile() - super().__init__(self._tempfile.name) - - self._model = model - self._target = target - - def _fetch_state(self): - fetch_file( - self._target, - remote_path=self._target.remote_charm_root / ".unit-state.db", - container_name="charm", - local_path=self._state_file, - model=self._model, - ) - - @property - def _has_state(self): - """Whether the state file exists.""" - return self._state_file.exists() and self._state_file.read_bytes() - - def _open_db(self) -> SQLiteStorage: - if not self._has_state: - self._fetch_state() - return super()._open_db() - - -def _snapshot( - target: str, - model: Optional[str] = None, - pprint: bool = True, - include: Optional[str] = None, - include_juju_relation_data=False, - include_dead_relation_networks=False, - format: FormatOption = "state", - event_name: Optional[str] = None, - fetch_files: Optional[Dict[str, List[Path]]] = None, - temp_dir_base_path: Path = SNAPSHOT_OUTPUT_DIR, -): - """see snapshot's docstring""" - - try: - target = JujuUnitName(target) - except InvalidTargetUnitName: - logger.critical( - f"invalid target: {target!r} is not a valid unit name. Should be formatted like so:" - f"`foo/1`, or `database/0`, or `myapp-foo-bar/42`.", - ) - sys.exit(1) - - logger.info(f'beginning snapshot of {target} in model {model or ""}...') - - def if_include(key, fn, default): - if include is None or key in include: - return fn() - return default - - try: - state_model = get_model(model) - except InvalidTargetModelName: - logger.critical(f"unable to get Model from name {model}.", exc_info=True) - sys.exit(1) - - # todo: what about controller? - model = state_model.name - - metadata = get_metadata(target, state_model) - if not metadata: - logger.critical(f"could not fetch metadata from {target}.") - sys.exit(1) - - try: - unit_state_db = RemoteUnitStateDB(model, target) - juju_status = get_juju_status(model) - endpoints = get_endpoints(juju_status, target) - status = get_status(juju_status, target=target) - - state = State( - leader=get_leader(target, model), - unit_status=status.unit, - app_status=status.app, - workload_version=status.workload_version, - model=state_model, - config=if_include("c", lambda: get_config(target, model), {}), - opened_ports=if_include( - "p", - lambda: get_opened_ports(target, model), - [], - ), - relations=if_include( - "r", - lambda: get_relations( - target, - model, - metadata=metadata, - include_juju_relation_data=include_juju_relation_data, - ), - [], - ), - containers=if_include( - "k", - lambda: get_containers( - target, - model, - metadata, - fetch_files=fetch_files, - temp_dir_base_path=temp_dir_base_path, - ), - [], - ), - networks=if_include( - "n", - lambda: get_networks( - target, - model, - metadata, - include_dead=include_dead_relation_networks, - relations=endpoints, - ), - [], - ), - secrets=if_include( - "S", - lambda: get_secrets( - target, - model, - metadata, - relations=endpoints, - ), - [], - ), - deferred=if_include( - "d", - unit_state_db.get_deferred_events, - [], - ), - stored_state=if_include( - "t", - unit_state_db.get_stored_state, - [], - ), - ) - - # todo: these errors should surface earlier. - except InvalidTargetUnitName: - _model = f"model {model}" or "the current model" - logger.critical(f"invalid target: {target!r} not found in {_model}") - sys.exit(1) - except InvalidTargetModelName: - logger.critical(f"invalid model: {model!r} not found.") - sys.exit(1) - - logger.info("snapshot done.") - - if pprint: - charm_version = get_charm_version(target, juju_status) - juju_version = get_juju_version(juju_status) - if format == FormatOption.pytest: - charm_type_name = try_guess_charm_type_name() - txt = format_test_case( - state, - event_name=event_name, - charm_type_name=charm_type_name, - juju_version=juju_version, - ) - elif format == FormatOption.state: - txt = format_state(state) - elif format == FormatOption.json: - txt = json.dumps(asdict(state), indent=2) - else: - raise ValueError(f"unknown format {format}") - - # json does not support comments, so it would be invalid output. - if format != FormatOption.json: - # print out some metadata - controller_timestamp = juju_status["controller"]["timestamp"] - local_timestamp = datetime.datetime.now().strftime("%m/%d/%Y, %H:%M:%S") - print( - f"# Generated by scenario.snapshot. \n" - f"# Snapshot of {state_model.name}:{target.unit_name} at {local_timestamp}. \n" - f"# Controller timestamp := {controller_timestamp}. \n" - f"# Juju version := {juju_version} \n" - f"# Charm fingerprint := {charm_version} \n", - ) - - print(txt) - - return state - - -def snapshot( - target: str = typer.Argument(..., help="Target unit."), - model: Optional[str] = typer.Option( - None, - "-m", - "--model", - help="Which model to look at.", - ), - format: FormatOption = typer.Option( - "state", - "-f", - "--format", - help="How to format the output. " - "``state``: Outputs a black-formatted repr() of the State object (if black is installed! " - "else it will be ugly but valid python code). All you need to do then is import the " - "necessary objects from scenario.state, and you should have a valid State object. " - "``json``: Outputs a Jsonified State object. Perfect for storage. " - "``pytest``: Outputs a full-blown pytest scenario test based on this State. " - "Pipe it to a file and fill in the blanks.", - ), - event_name: str = typer.Option( - None, - "--event_name", - "-e", - help="Event to include in the generate test file; only applicable " - "if the output format is 'pytest'.", - ), - include: str = typer.Option( - "rckndtp", - "--include", - "-i", - help="What data to include in the state. " - "``r``: relation, ``c``: config, ``k``: containers, " - "``n``: networks, ``S``: secrets(!), ``p``: opened ports, " - "``d``: deferred events, ``t``: stored state.", - ), - include_dead_relation_networks: bool = typer.Option( - False, - "--include-dead-relation-networks", - help="Whether to gather networks of inactive relation endpoints.", - is_flag=True, - ), - include_juju_relation_data: bool = typer.Option( - False, - "--include-juju-relation-data", - help="Whether to include in the relation data the default juju keys (egress-subnets," - "ingress-address, private-address).", - is_flag=True, - ), - fetch: Path = typer.Option( - None, - "--fetch", - help="Path to a local file containing a json spec of files to be fetched from the unit. " - "For k8s units, it's supposed to be a {container_name: List[Path]} mapping listing " - "the files that need to be fetched from the existing containers.", - ), - # TODO: generalize "fetch" to allow passing '.' for the 'charm' container or 'the machine'. - output_dir: Path = typer.Option( - SNAPSHOT_OUTPUT_DIR, - "--output-dir", - help="Directory in which to store any files fetched as part of the state. In the case " - "of k8s charms, this might mean files obtained through Mounts,", - ), -) -> State: - """Gather and output the State of a remote target unit. - - If black is available, the output will be piped through it for formatting. - - Usage: snapshot myapp/0 > ./tests/scenario/case1.py - """ - - fetch_files = json.loads(fetch.read_text()) if fetch else None - - return _snapshot( - target=target, - model=model, - format=format, - event_name=event_name, - include=include, - include_juju_relation_data=include_juju_relation_data, - include_dead_relation_networks=include_dead_relation_networks, - temp_dir_base_path=output_dir, - fetch_files=fetch_files, - ) - - -# for the benefit of script usage -_snapshot.__doc__ = snapshot.__doc__ - -if __name__ == "__main__": - # print(_snapshot("zookeeper/0", model="foo", format=FormatOption.pytest)) - - print( - _snapshot( - "traefik/0", - format=FormatOption.state, - include="r", - # fetch_files={ - # "traefik": [ - # Path("/opt/traefik/juju/certificates.yaml"), - # Path("/opt/traefik/juju/certificate.cert"), - # Path("/opt/traefik/juju/certificate.key"), - # Path("/etc/traefik/traefik.yaml"), - # ] - # }, - ), - ) diff --git a/scenario/scripts/state_apply.py b/scenario/scripts/state_apply.py deleted file mode 100644 index f864b141..00000000 --- a/scenario/scripts/state_apply.py +++ /dev/null @@ -1,256 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - -import json -import logging -import os -import sys -from pathlib import Path -from subprocess import CalledProcessError, run -from typing import Dict, Iterable, List, Optional - -import typer - -from scenario.scripts.errors import InvalidTargetUnitName, StateApplyError -from scenario.scripts.utils import JujuUnitName -from scenario.state import ( - Container, - DeferredEvent, - Port, - Relation, - Secret, - State, - StoredState, - _EntityStatus, -) - -SNAPSHOT_DATA_DIR = (Path(os.getcwd()).parent / "snapshot_storage").absolute() - -logger = logging.getLogger("snapshot") - - -def set_relations(relations: Iterable[Relation]) -> List[str]: # noqa: U100 - logger.info("preparing relations...") - logger.warning("set_relations not implemented yet") - return [] - - -def set_status( - unit_status: _EntityStatus, - app_status: _EntityStatus, - app_version: str, -) -> List[str]: - logger.info("preparing status...") - cmds = [] - - cmds.append(f"status-set {unit_status.name} {unit_status.message}") - cmds.append(f"status-set --application {app_status.name} {app_status.message}") - cmds.append(f"application-version-set {app_version}") - - return cmds - - -def set_config(config: Dict[str, str]) -> List[str]: # noqa: U100 - logger.info("preparing config...") - logger.warning("set_config not implemented yet") - return [] - - -def set_opened_ports(opened_ports: List[Port]) -> List[str]: - logger.info("preparing opened ports...") - # fixme: this will only open new ports, it will not close all already-open ports. - - cmds = [] - - for port in opened_ports: - cmds.append(f"open-port {port.port}/{port.protocol}") - - return cmds - - -def set_containers(containers: Iterable[Container]) -> List[str]: # noqa: U100 - logger.info("preparing containers...") - logger.warning("set_containers not implemented yet") - return [] - - -def set_secrets(secrets: Iterable[Secret]) -> List[str]: # noqa: U100 - logger.info("preparing secrets...") - logger.warning("set_secrets not implemented yet") - return [] - - -def set_deferred_events( - deferred_events: Iterable[DeferredEvent], # noqa: U100 -) -> List[str]: - logger.info("preparing deferred_events...") - logger.warning("set_deferred_events not implemented yet") - return [] - - -def set_stored_state(stored_state: Iterable[StoredState]) -> List[str]: # noqa: U100 - logger.info("preparing stored_state...") - logger.warning("set_stored_state not implemented yet") - return [] - - -def exec_in_unit(target: JujuUnitName, model: str, cmds: List[str]): - logger.info("Running juju exec...") - - _model = f" -m {model}" if model else "" - cmd_fmt = "; ".join(cmds) - try: - run(f'juju exec -u {target}{_model} -- "{cmd_fmt}"') - except CalledProcessError as e: - raise StateApplyError( - f"Failed to apply state: process exited with {e.returncode}; " - f"stdout = {e.stdout}; " - f"stderr = {e.stderr}.", - ) - - -def run_commands(cmds: List[str]): - logger.info("Applying remaining state...") - for cmd in cmds: - try: - run(cmd) - except CalledProcessError as e: - # todo: should we log and continue instead? - raise StateApplyError( - f"Failed to apply state: process exited with {e.returncode}; " - f"stdout = {e.stdout}; " - f"stderr = {e.stderr}.", - ) - - -def _state_apply( - target: str, - state: State, - model: Optional[str] = None, - include: str = None, - include_juju_relation_data=False, # noqa: U100 - push_files: Dict[str, List[Path]] = None, # noqa: U100 - snapshot_data_dir: Path = SNAPSHOT_DATA_DIR, # noqa: U100 -): - """see state_apply's docstring""" - logger.info("Starting state-apply...") - - try: - target = JujuUnitName(target) - except InvalidTargetUnitName: - logger.critical( - f"invalid target: {target!r} is not a valid unit name. Should be formatted like so:" - f"`foo/1`, or `database/0`, or `myapp-foo-bar/42`.", - ) - sys.exit(1) - - logger.info(f'beginning snapshot of {target} in model {model or ""}...') - - def if_include(key, fn): - if include is None or key in include: - return fn() - return [] - - j_exec_cmds: List[str] = [] - - j_exec_cmds += if_include( - "s", - lambda: set_status(state.unit_status, state.app_status, state.workload_version), - ) - j_exec_cmds += if_include("p", lambda: set_opened_ports(state.opened_ports)) - j_exec_cmds += if_include("r", lambda: set_relations(state.relations)) - j_exec_cmds += if_include("S", lambda: set_secrets(state.secrets)) - - cmds: List[str] = [] - - # todo: config is a bit special because it's not owned by the unit but by the cloud admin. - # should it be included in state-apply? - # if_include("c", lambda: set_config(state.config)) - cmds += if_include("k", lambda: set_containers(state.containers)) - cmds += if_include("d", lambda: set_deferred_events(state.deferred)) - cmds += if_include("t", lambda: set_stored_state(state.stored_state)) - - # we gather juju-exec commands to run them all at once in the unit. - exec_in_unit(target, model, j_exec_cmds) - # non-juju-exec commands are ran one by one, individually - run_commands(cmds) - - logger.info("Done!") - - -def state_apply( - target: str = typer.Argument(..., help="Target unit."), - state: Path = typer.Argument( - ..., - help="Source State to apply. Json file containing a State data structure; " - "the same you would obtain by running snapshot.", - ), - model: Optional[str] = typer.Option( - None, - "-m", - "--model", - help="Which model to look at.", - ), - include: str = typer.Option( - "scrkSdt", - "--include", - "-i", - help="What parts of the state to apply. Defaults to: all of them. " - "``r``: relation, ``c``: config, ``k``: containers, " - "``s``: status, ``S``: secrets(!), " - "``d``: deferred events, ``t``: stored state.", - ), - include_juju_relation_data: bool = typer.Option( - False, - "--include-juju-relation-data", - help="Whether to include in the relation data the default juju keys (egress-subnets," - "ingress-address, private-address).", - is_flag=True, - ), - push_files: Path = typer.Option( - None, - "--push-files", - help="Path to a local file containing a json spec of files to be fetched from the unit. " - "For k8s units, it's supposed to be a {container_name: List[Path]} mapping listing " - "the files that need to be pushed to the each container.", - ), - # TODO: generalize "push_files" to allow passing '.' for the 'charm' container or 'the machine'. - data_dir: Path = typer.Option( - SNAPSHOT_DATA_DIR, - "--data-dir", - help="Directory in which to any files associated with the state are stored. In the case " - "of k8s charms, this might mean files obtained through Mounts,", - ), -): - """Apply a State to a remote target unit. - - If black is available, the output will be piped through it for formatting. - - Usage: state-apply myapp/0 > ./tests/scenario/case1.py - """ - push_files_ = json.loads(push_files.read_text()) if push_files else None - state_json = json.loads(state.read_text()) - - # TODO: state_json to State - raise NotImplementedError("WIP: implement State.from_json") - state_: State = State.from_json(state_json) - - return _state_apply( - target=target, - state=state_, - model=model, - include=include, - include_juju_relation_data=include_juju_relation_data, - snapshot_data_dir=data_dir, - push_files=push_files_, - ) - - -# for the benefit of scripted usage -_state_apply.__doc__ = state_apply.__doc__ - -if __name__ == "__main__": - from scenario import State - - _state_apply("zookeeper/0", model="foo", state=State()) diff --git a/scenario/scripts/utils.py b/scenario/scripts/utils.py deleted file mode 100644 index de9dc01e..00000000 --- a/scenario/scripts/utils.py +++ /dev/null @@ -1,23 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. -from pathlib import Path - -from scenario.scripts.errors import InvalidTargetUnitName - - -class JujuUnitName(str): - """This class represents the name of a juju unit that can be snapshotted.""" - - def __init__(self, unit_name: str): - super().__init__() - app_name, _, unit_id = unit_name.rpartition("/") - if not app_name or not unit_id: - raise InvalidTargetUnitName(f"invalid unit name: {unit_name!r}") - self.unit_name = unit_name - self.app_name = app_name - self.unit_id = int(unit_id) - self.normalized = f"{app_name}-{unit_id}" - self.remote_charm_root = Path( - f"/var/lib/juju/agents/unit-{self.normalized}/charm", - ) diff --git a/tests/test_darkroom/test_harness_integration/test_darkroom_harness.py b/tests/test_darkroom/test_harness_integration/test_darkroom_harness.py deleted file mode 100644 index 1e179862..00000000 --- a/tests/test_darkroom/test_harness_integration/test_darkroom_harness.py +++ /dev/null @@ -1,20 +0,0 @@ -import yaml -from ops import CharmBase -from ops.testing import Harness - -from scenario.integrations.darkroom import Darkroom - - -class MyCharm(CharmBase): - META = {"name": "joseph", "requires": {"foo": {"interface": "bar"}}} - - -def test_attach(): - h = Harness(MyCharm, meta=yaml.safe_dump(MyCharm.META)) - l = [] - d = Darkroom().attach(lambda e, s: l.append((e, s))) - h.begin() - h.add_relation("foo", "remote") - - assert len(l) == 1 - assert l[0][0].name == "foo_relation_created" diff --git a/tests/test_darkroom/test_harness_integration/test_install_harness.py b/tests/test_darkroom/test_harness_integration/test_install_harness.py deleted file mode 100644 index 950ab5e4..00000000 --- a/tests/test_darkroom/test_harness_integration/test_install_harness.py +++ /dev/null @@ -1,33 +0,0 @@ -def test_install(): - from scenario.integrations.darkroom import Darkroom - - l = [] - - def register_trace(t): - l.append(t) - - Darkroom.install(register_trace) - - import yaml - from ops import CharmBase - from ops.testing import Harness - - class MyCharm(CharmBase): - META = {"name": "joseph", "requires": {"foo": {"interface": "bar"}}} - - h = Harness(MyCharm, meta=yaml.safe_dump(MyCharm.META)) - h.begin_with_initial_hooks() - - h = Harness(MyCharm, meta=yaml.safe_dump(MyCharm.META)) - h.begin_with_initial_hooks() - h.add_relation("foo", "remote") - - h = Harness(MyCharm, meta=yaml.safe_dump(MyCharm.META)) - h.begin_with_initial_hooks() - h.add_relation("foo", "remote2") - - assert len(l) == 3 - assert [len(x) for x in l] == [4, 5, 5] - assert l[0][1][0].name == "leader_settings_changed" - assert l[1][-1][0].name == "foo_relation_created" - assert l[2][-1][0].name == "foo_relation_created" diff --git a/tests/test_darkroom/test_harness_integration/test_integrations_harness.py b/tests/test_darkroom/test_harness_integration/test_integrations_harness.py deleted file mode 100644 index fd017f49..00000000 --- a/tests/test_darkroom/test_harness_integration/test_integrations_harness.py +++ /dev/null @@ -1,78 +0,0 @@ -import ops -import pytest -import yaml -from ops import CharmBase, BlockedStatus, WaitingStatus -from ops.testing import Harness - -import scenario -from scenario import Model -from scenario.integrations.darkroom import Darkroom - - -class MyCharm(CharmBase): - META = {"name": "joseph"} - - -@pytest.fixture -def harness(): - return Harness(MyCharm, meta=yaml.safe_dump(MyCharm.META)) - - -def test_base(harness): - harness.begin() - state = Darkroom().capture(harness.model._backend) - assert state.unit_id == 0 - - -@pytest.mark.parametrize("leader", (True, False)) -@pytest.mark.parametrize("model_name", ("foo", "bar-baz")) -@pytest.mark.parametrize("model_uuid", ("qux", "fiz")) -def test_static_attributes(harness, leader, model_name, model_uuid): - harness.set_model_info(model_name, model_uuid) - harness.begin() - harness.charm.unit.set_workload_version("42.42") - harness.set_leader(leader) - - state = Darkroom().capture(harness.model._backend) - - assert state.leader is leader - assert state.model == Model(name=model_name, uuid=model_uuid, type="lxd") - assert state.workload_version == "42.42" - - -def test_status(harness): - harness.begin() - harness.set_leader(True) # so we can set app status - harness.charm.app.status = BlockedStatus("foo") - harness.charm.unit.status = WaitingStatus("hol' up") - - state = Darkroom().capture(harness.model._backend) - - assert state.unit_status == WaitingStatus("hol' up") - assert state.app_status == BlockedStatus("foo") - - -@pytest.mark.parametrize( - "ports", - ( - [ - ops.Port("tcp", 2032), - ops.Port("udp", 2033), - ], - [ - ops.Port("tcp", 2032), - ops.Port("tcp", 2035), - ops.Port("icmp", None), - ], - ), -) -def test_opened_ports(harness, ports): - harness.begin() - harness.charm.unit.set_ports(*ports) - state = Darkroom().capture(harness.model._backend) - assert set(state.opened_ports) == set( - scenario.Port(port.protocol, port.port) for port in ports - ) - - -# todo add tests for all other State components diff --git a/tests/test_darkroom/test_live_integration/test_darkroom_scenario.py b/tests/test_darkroom/test_live_integration/test_darkroom_scenario.py deleted file mode 100644 index 1e179862..00000000 --- a/tests/test_darkroom/test_live_integration/test_darkroom_scenario.py +++ /dev/null @@ -1,20 +0,0 @@ -import yaml -from ops import CharmBase -from ops.testing import Harness - -from scenario.integrations.darkroom import Darkroom - - -class MyCharm(CharmBase): - META = {"name": "joseph", "requires": {"foo": {"interface": "bar"}}} - - -def test_attach(): - h = Harness(MyCharm, meta=yaml.safe_dump(MyCharm.META)) - l = [] - d = Darkroom().attach(lambda e, s: l.append((e, s))) - h.begin() - h.add_relation("foo", "remote") - - assert len(l) == 1 - assert l[0][0].name == "foo_relation_created" diff --git a/tests/test_darkroom/test_live_integration/test_install_scenario.py b/tests/test_darkroom/test_live_integration/test_install_scenario.py deleted file mode 100644 index 950ab5e4..00000000 --- a/tests/test_darkroom/test_live_integration/test_install_scenario.py +++ /dev/null @@ -1,33 +0,0 @@ -def test_install(): - from scenario.integrations.darkroom import Darkroom - - l = [] - - def register_trace(t): - l.append(t) - - Darkroom.install(register_trace) - - import yaml - from ops import CharmBase - from ops.testing import Harness - - class MyCharm(CharmBase): - META = {"name": "joseph", "requires": {"foo": {"interface": "bar"}}} - - h = Harness(MyCharm, meta=yaml.safe_dump(MyCharm.META)) - h.begin_with_initial_hooks() - - h = Harness(MyCharm, meta=yaml.safe_dump(MyCharm.META)) - h.begin_with_initial_hooks() - h.add_relation("foo", "remote") - - h = Harness(MyCharm, meta=yaml.safe_dump(MyCharm.META)) - h.begin_with_initial_hooks() - h.add_relation("foo", "remote2") - - assert len(l) == 3 - assert [len(x) for x in l] == [4, 5, 5] - assert l[0][1][0].name == "leader_settings_changed" - assert l[1][-1][0].name == "foo_relation_created" - assert l[2][-1][0].name == "foo_relation_created" diff --git a/tests/test_darkroom/test_live_integration/test_integrations_scenario.py b/tests/test_darkroom/test_live_integration/test_integrations_scenario.py deleted file mode 100644 index fd017f49..00000000 --- a/tests/test_darkroom/test_live_integration/test_integrations_scenario.py +++ /dev/null @@ -1,78 +0,0 @@ -import ops -import pytest -import yaml -from ops import CharmBase, BlockedStatus, WaitingStatus -from ops.testing import Harness - -import scenario -from scenario import Model -from scenario.integrations.darkroom import Darkroom - - -class MyCharm(CharmBase): - META = {"name": "joseph"} - - -@pytest.fixture -def harness(): - return Harness(MyCharm, meta=yaml.safe_dump(MyCharm.META)) - - -def test_base(harness): - harness.begin() - state = Darkroom().capture(harness.model._backend) - assert state.unit_id == 0 - - -@pytest.mark.parametrize("leader", (True, False)) -@pytest.mark.parametrize("model_name", ("foo", "bar-baz")) -@pytest.mark.parametrize("model_uuid", ("qux", "fiz")) -def test_static_attributes(harness, leader, model_name, model_uuid): - harness.set_model_info(model_name, model_uuid) - harness.begin() - harness.charm.unit.set_workload_version("42.42") - harness.set_leader(leader) - - state = Darkroom().capture(harness.model._backend) - - assert state.leader is leader - assert state.model == Model(name=model_name, uuid=model_uuid, type="lxd") - assert state.workload_version == "42.42" - - -def test_status(harness): - harness.begin() - harness.set_leader(True) # so we can set app status - harness.charm.app.status = BlockedStatus("foo") - harness.charm.unit.status = WaitingStatus("hol' up") - - state = Darkroom().capture(harness.model._backend) - - assert state.unit_status == WaitingStatus("hol' up") - assert state.app_status == BlockedStatus("foo") - - -@pytest.mark.parametrize( - "ports", - ( - [ - ops.Port("tcp", 2032), - ops.Port("udp", 2033), - ], - [ - ops.Port("tcp", 2032), - ops.Port("tcp", 2035), - ops.Port("icmp", None), - ], - ), -) -def test_opened_ports(harness, ports): - harness.begin() - harness.charm.unit.set_ports(*ports) - state = Darkroom().capture(harness.model._backend) - assert set(state.opened_ports) == set( - scenario.Port(port.protocol, port.port) for port in ports - ) - - -# todo add tests for all other State components diff --git a/tests/test_darkroom/test_scenario_integration/test_darkroom_live.py b/tests/test_darkroom/test_scenario_integration/test_darkroom_live.py deleted file mode 100644 index e837ea18..00000000 --- a/tests/test_darkroom/test_scenario_integration/test_darkroom_live.py +++ /dev/null @@ -1,2 +0,0 @@ -def test_live_charm(): - pass