diff --git a/src/rookify/modules/ceph.py b/src/rookify/modules/ceph.py new file mode 100644 index 0000000..64e0eed --- /dev/null +++ b/src/rookify/modules/ceph.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- + +import json +import rados +from typing import Any, Dict, List +from .exception import ModuleException + + +class Ceph: + def __init__(self, config: Dict[str, Any]): + try: + self.__ceph = rados.Rados( + conffile=config["config"], conf={"keyring": config["keyring"]} + ) + self.__ceph.connect() + except rados.ObjectNotFound as err: + raise ModuleException(f"Could not connect to ceph: {err}") + + def __getattr__(self, name: str) -> Any: + return getattr(self.__ceph, name) + + def mon_command( + self, command: str, **kwargs: str + ) -> Dict[str, Any] | List[Any]: + cmd = {"prefix": command, "format": "json"} + cmd.update(**kwargs) + result = self.__ceph.mon_command(json.dumps(cmd), b"") + if result[0] != 0: + raise ModuleException(f"Ceph did return an error: {result}") + data = json.loads(result[1]) + assert isinstance(data, dict) or isinstance(data, list) + return data diff --git a/src/rookify/modules/cephx_auth_config/main.py b/src/rookify/modules/cephx_auth_config/main.py index c7d8c58..acc1dfd 100644 --- a/src/rookify/modules/cephx_auth_config/main.py +++ b/src/rookify/modules/cephx_auth_config/main.py @@ -1,8 +1,9 @@ # -*- coding: utf-8 -*- from typing import Any +from ..exception import ModuleException from ..machine import Machine -from ..module import ModuleException, ModuleHandler +from ..module import ModuleHandler class CephXAuthHandler(ModuleHandler): diff --git a/src/rookify/modules/create_cluster/main.py b/src/rookify/modules/create_cluster/main.py index b744546..25d5bc2 100644 --- a/src/rookify/modules/create_cluster/main.py +++ b/src/rookify/modules/create_cluster/main.py @@ -1,9 +1,9 @@ # -*- coding: utf-8 -*- -import kubernetes from typing import Any +from ..exception import ModuleException from ..machine import Machine -from ..module import ModuleHandler, ModuleException +from ..module import ModuleHandler class CreateClusterHandler(ModuleHandler): @@ -16,7 +16,7 @@ class CreateClusterHandler(ModuleHandler): @property def __mon_placement_label(self) -> str: - return ( + return ( # type: ignore self._config["rook"]["cluster"]["mon_placement_label"] if "mon_placement_label" in self._config["rook"]["cluster"] else f"placement-{self._config["rook"]["cluster"]["name"]}-mon" @@ -24,7 +24,7 @@ def __mon_placement_label(self) -> str: @property def __mgr_placement_label(self) -> str: - return ( + return ( # type: ignore self._config["rook"]["cluster"]["mgr_placement_label"] if "mgr_placement_label" in self._config["rook"]["cluster"] else f"placement-{self._config["rook"]["cluster"]["name"]}-mgr" @@ -107,10 +107,8 @@ def execute(self) -> None: cluster_name = self._config["rook"]["cluster"]["name"] # Wait for CephCluster to get into Progressing phase - result = None - watcher = kubernetes.watch.Watch() - - stream = watcher.stream( + result = self.k8s.watch_events( + self._watch_cluster_phase_callback, self.k8s.custom_objects_api.list_namespaced_custom_object, "ceph.rook.io", "v1", @@ -119,24 +117,22 @@ def execute(self) -> None: timeout_seconds=60, ) - for event in stream: - event_object = event["object"] - - if event_object["metadata"]["name"] != cluster_name: - continue - - try: - if event_object["status"]["phase"] == "Progressing": - result = event_object - break - except KeyError: - pass - - watcher.stop() - if result == None: raise ModuleException("CephCluster did not come up") + def _watch_cluster_phase_callback(self, event_object: Any) -> Any: + try: + if ( + event_object["metadata"]["name"] + == self._config["rook"]["cluster"]["name"] + and event_object["status"]["phase"] == "Progressing" + ): + return event_object + except KeyError: + pass + + return None + @staticmethod def register_preflight_state( machine: Machine, state_name: str, handler: ModuleHandler, **kwargs: Any diff --git a/src/rookify/modules/create_configmap/main.py b/src/rookify/modules/create_configmap/main.py index 028f9bf..4a3136f 100644 --- a/src/rookify/modules/create_configmap/main.py +++ b/src/rookify/modules/create_configmap/main.py @@ -1,10 +1,10 @@ # -*- coding: utf-8 -*- import kubernetes -from ..machine import Machine -from ..module import ModuleHandler, ModuleException - from typing import Any, Dict +from ..exception import ModuleException +from ..machine import Machine +from ..module import ModuleHandler class CreateConfigMapHandler(ModuleHandler): diff --git a/src/rookify/modules/example/main.py b/src/rookify/modules/example/main.py index cddba34..fbbeb9a 100644 --- a/src/rookify/modules/example/main.py +++ b/src/rookify/modules/example/main.py @@ -1,7 +1,8 @@ # -*- coding: utf-8 -*- from typing import Any -from ..module import ModuleHandler, ModuleException +from ..exception import ModuleException +from ..module import ModuleHandler class ExampleHandler(ModuleHandler): diff --git a/src/rookify/modules/exception.py b/src/rookify/modules/exception.py new file mode 100644 index 0000000..43508d3 --- /dev/null +++ b/src/rookify/modules/exception.py @@ -0,0 +1,4 @@ +# -*- coding: utf-8 -*- + +class ModuleException(Exception): + pass diff --git a/src/rookify/modules/k8s.py b/src/rookify/modules/k8s.py new file mode 100644 index 0000000..25a3ea8 --- /dev/null +++ b/src/rookify/modules/k8s.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- + +import kubernetes +from typing import Any, Callable, Dict, List, Optional + +class K8s: + def __init__(self, config: Dict[str, Any]): + k8s_config = kubernetes.config.load_kube_config( + config_file=config["config"] + ) + self.__client = kubernetes.client.ApiClient(k8s_config) + self.__dynamic_client: Optional[kubernetes.dynamic.DynamicClient] = None + + @property + def core_v1_api(self) -> kubernetes.client.CoreV1Api: + return kubernetes.client.CoreV1Api(self.__client) + + @property + def apps_v1_api(self) -> kubernetes.client.AppsV1Api: + return kubernetes.client.AppsV1Api(self.__client) + + @property + def node_v1_api(self) -> kubernetes.client.NodeV1Api: + return kubernetes.client.NodeV1Api(self.__client) + + @property + def custom_objects_api(self) -> kubernetes.client.CustomObjectsApi: + return kubernetes.client.CustomObjectsApi(self.__client) + + @property + def dynamic_client(self) -> kubernetes.dynamic.DynamicClient: + if not self.__dynamic_client: + self.__dynamic_client = kubernetes.dynamic.DynamicClient(self.__client) + return self.__dynamic_client + + def crd_api( + self, api_version: str, kind: str + ) -> kubernetes.dynamic.resource.Resource: + return self.dynamic_client.resources.get(api_version=api_version, kind=kind) + + def crd_api_apply( + self, manifest: Dict[Any, Any] + ) -> kubernetes.dynamic.resource.ResourceInstance: + """ + This applies a manifest for custom CRDs + See https://github.com/kubernetes-client/python/issues/1792 for more information + :param manifest: Dict of the kubernetes manifest + """ + api_version = manifest["apiVersion"] + kind = manifest["kind"] + resource_name = manifest["metadata"]["name"] + namespace = manifest["metadata"]["namespace"] + crd_api = self.crd_api(api_version=api_version, kind=kind) + + try: + crd_api.get(namespace=namespace, name=resource_name) + return crd_api.patch( + body=manifest, content_type="application/merge-patch+json" + ) + except kubernetes.dynamic.exceptions.NotFoundError: + return crd_api.create(body=manifest, namespace=namespace) + + def watch_events(self, callback_func: Callable[[Any], Any], func: Callable[[Any], Any], *args: Any, **kwargs: Any) -> Any: + watcher = kubernetes.watch.Watch() + + stream = watcher.stream(func, *args, **kwargs) + + try: + for event in stream: + try: result = callback_func(event["object"]) + except StopIteration: continue + + if result is not None: return result + finally: watcher.stop() diff --git a/src/rookify/modules/k8s_prerequisites_check/main.py b/src/rookify/modules/k8s_prerequisites_check/main.py index 0ac4c9f..86e2b45 100644 --- a/src/rookify/modules/k8s_prerequisites_check/main.py +++ b/src/rookify/modules/k8s_prerequisites_check/main.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- -from ..module import ModuleException, ModuleHandler +from ..exception import ModuleException +from ..module import ModuleHandler class K8sPrerequisitesCheckHandler(ModuleHandler): diff --git a/src/rookify/modules/module.py b/src/rookify/modules/module.py index a02e743..2499916 100644 --- a/src/rookify/modules/module.py +++ b/src/rookify/modules/module.py @@ -1,22 +1,15 @@ # -*- coding: utf-8 -*- import os -import yaml -import json import abc -import rados -import kubernetes -import fabric -import jinja2 import structlog -from typing import Any, Dict, List, Optional - +from typing import Any, Dict, Optional from ..logger import get_logger +from .ceph import Ceph +from .k8s import K8s from .machine import Machine - - -class ModuleException(Exception): - pass +from .ssh import SSH +from .template import Template class ModuleHandler: @@ -24,136 +17,6 @@ class ModuleHandler: ModuleHandler is an abstract class that modules have to extend. """ - class __Ceph: - def __init__(self, config: Dict[str, Any]): - try: - self.__ceph = rados.Rados( - conffile=config["config"], conf={"keyring": config["keyring"]} - ) - self.__ceph.connect() - except rados.ObjectNotFound as err: - raise ModuleException(f"Could not connect to ceph: {err}") - - def __getattr__(self, name: str) -> Any: - return getattr(self.__ceph, name) - - def mon_command( - self, command: str, **kwargs: str - ) -> Dict[str, Any] | List[Any]: - cmd = {"prefix": command, "format": "json"} - cmd.update(**kwargs) - result = self.__ceph.mon_command(json.dumps(cmd), b"") - if result[0] != 0: - raise ModuleException(f"Ceph did return an error: {result}") - data = json.loads(result[1]) - assert isinstance(data, dict) or isinstance(data, list) - return data - - class __K8s: - def __init__(self, config: Dict[str, Any]): - k8s_config = kubernetes.config.load_kube_config( - config_file=config["config"] - ) - self.__client = kubernetes.client.ApiClient(k8s_config) - self.__dynamic_client: Optional[kubernetes.dynamic.DynamicClient] = None - - @property - def core_v1_api(self) -> kubernetes.client.CoreV1Api: - return kubernetes.client.CoreV1Api(self.__client) - - @property - def apps_v1_api(self) -> kubernetes.client.AppsV1Api: - return kubernetes.client.AppsV1Api(self.__client) - - @property - def node_v1_api(self) -> kubernetes.client.NodeV1Api: - return kubernetes.client.NodeV1Api(self.__client) - - @property - def custom_objects_api(self) -> kubernetes.client.CustomObjectsApi: - return kubernetes.client.CustomObjectsApi(self.__client) - - @property - def dynamic_client(self) -> kubernetes.dynamic.DynamicClient: - if not self.__dynamic_client: - self.__dynamic_client = kubernetes.dynamic.DynamicClient(self.__client) - return self.__dynamic_client - - def crd_api( - self, api_version: str, kind: str - ) -> kubernetes.dynamic.resource.Resource: - return self.dynamic_client.resources.get(api_version=api_version, kind=kind) - - def crd_api_apply( - self, manifest: Dict[Any, Any] - ) -> kubernetes.dynamic.resource.ResourceInstance: - """ - This applies a manifest for custom CRDs - See https://github.com/kubernetes-client/python/issues/1792 for more information - :param manifest: Dict of the kubernetes manifest - """ - api_version = manifest["apiVersion"] - kind = manifest["kind"] - resource_name = manifest["metadata"]["name"] - namespace = manifest["metadata"]["namespace"] - crd_api = self.crd_api(api_version=api_version, kind=kind) - - try: - crd_api.get(namespace=namespace, name=resource_name) - return crd_api.patch( - body=manifest, content_type="application/merge-patch+json" - ) - except kubernetes.dynamic.exceptions.NotFoundError: - return crd_api.create(body=manifest, namespace=namespace) - - class __SSH: - def __init__(self, config: Dict[str, Any]): - self.__config = config - - def command(self, host: str, command: str) -> fabric.runners.Result: - try: - address = self.__config["hosts"][host]["address"] - user = self.__config["hosts"][host]["user"] - port = ( - self.__config["hosts"][host]["port"] - if "port" in self.__config["hosts"][host] - else 22 - ) - private_key = self.__config["private_key"] - except KeyError as err: - raise ModuleException( - f"Could not find settings for {host} in config: {err}" - ) - connect_kwargs = {"key_filename": private_key} - result = fabric.Connection( - address, user=user, port=port, connect_kwargs=connect_kwargs - ).run(command, hide=True) - return result - - class __Template: - def __init__(self, template_path: str): - self.__result_raw: Optional[str] = None - self.__result_yaml: Optional[Any] = None - self.__template_path: str = template_path - with open(template_path) as file: - self.__template = jinja2.Template(file.read()) - - def render(self, **variables: Any) -> None: - self.__result_raw = self.__template.render(**variables) - self.__result_yaml = None - - @property - def raw(self) -> str: - if not self.__result_raw: - raise ModuleException("Template was not rendered") - return self.__result_raw - - @property - def yaml(self) -> Any: - if not self.__result_yaml: - self.__result_yaml = yaml.safe_load(self.raw) - return self.__result_yaml - def __init__(self, machine: Machine, config: Dict[str, Any]): """ Construct a new 'ModuleHandler' object. @@ -165,15 +28,15 @@ def __init__(self, machine: Machine, config: Dict[str, Any]): self._config = config self._machine = machine - self.__ceph: Optional[ModuleHandler.__Ceph] = None - self.__k8s: Optional[ModuleHandler.__K8s] = None - self.__ssh: Optional[ModuleHandler.__SSH] = None + self.__ceph: Optional[Ceph] = None + self.__k8s: Optional[K8s] = None + self.__ssh: Optional[SSH] = None self.__logger = get_logger() @property - def ceph(self) -> __Ceph: + def ceph(self) -> Ceph: if self.__ceph is None: - self.__ceph = ModuleHandler.__Ceph(self._config["ceph"]) + self.__ceph = Ceph(self._config["ceph"]) return self.__ceph @property @@ -181,9 +44,9 @@ def machine(self) -> Machine: return self._machine @property - def k8s(self) -> __K8s: + def k8s(self) -> K8s: if self.__k8s is None: - self.__k8s = ModuleHandler.__K8s(self._config["kubernetes"]) + self.__k8s = K8s(self._config["kubernetes"]) return self.__k8s @property @@ -191,9 +54,9 @@ def logger(self) -> structlog.getLogger: return self.__logger @property - def ssh(self) -> __SSH: + def ssh(self) -> SSH: if self.__ssh is None: - self.__ssh = ModuleHandler.__SSH(self._config["ssh"]) + self.__ssh = SSH(self._config["ssh"]) return self.__ssh @abc.abstractmethod @@ -212,14 +75,14 @@ def execute(self) -> None: """ pass - def load_template(self, filename: str, **variables: Any) -> __Template: + def load_template(self, filename: str, **variables: Any) -> Template: template_path = os.path.join( os.path.dirname(__file__), self.__class__.__module__.rsplit(".", 2)[1], "templates", filename, ) - template = ModuleHandler.__Template(template_path) + template = Template(template_path) template.render(**variables) return template diff --git a/src/rookify/modules/ssh.py b/src/rookify/modules/ssh.py new file mode 100644 index 0000000..da2d796 --- /dev/null +++ b/src/rookify/modules/ssh.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- + +import fabric +from typing import Any, Dict +from .exception import ModuleException + + +class SSH: + def __init__(self, config: Dict[str, Any]): + self.__config = config + + def command(self, host: str, command: str) -> fabric.runners.Result: + try: + address = self.__config["hosts"][host]["address"] + user = self.__config["hosts"][host]["user"] + port = ( + self.__config["hosts"][host]["port"] + if "port" in self.__config["hosts"][host] + else 22 + ) + private_key = self.__config["private_key"] + except KeyError as err: + raise ModuleException( + f"Could not find settings for {host} in config: {err}" + ) + connect_kwargs = {"key_filename": private_key} + result = fabric.Connection( + address, user=user, port=port, connect_kwargs=connect_kwargs + ).run(command, hide=True) + return result diff --git a/src/rookify/modules/template.py b/src/rookify/modules/template.py new file mode 100644 index 0000000..ed79f5b --- /dev/null +++ b/src/rookify/modules/template.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- + +import yaml +import jinja2 +from typing import Any, Optional +from .exception import ModuleException + + +class Template: + def __init__(self, template_path: str): + self.__result_raw: Optional[str] = None + self.__result_yaml: Optional[Any] = None + self.__template_path: str = template_path + with open(template_path) as file: + self.__template = jinja2.Template(file.read()) + + def render(self, **variables: Any) -> None: + self.__result_raw = self.__template.render(**variables) + self.__result_yaml = None + + @property + def raw(self) -> str: + if not self.__result_raw: + raise ModuleException("Template was not rendered") + return self.__result_raw + + @property + def yaml(self) -> Any: + if not self.__result_yaml: + self.__result_yaml = yaml.safe_load(self.raw) + return self.__result_yaml diff --git a/tests/mock_ceph.py b/tests/mock_ceph.py index 1f0d647..ce359d1 100644 --- a/tests/mock_ceph.py +++ b/tests/mock_ceph.py @@ -2,7 +2,7 @@ import json from collections.abc import Callable -from rookify.modules.module import ModuleException +from rookify.modules.exception import ModuleException from typing import Any, Dict, List, Tuple diff --git a/tests/modules/test_example.py b/tests/modules/test_example.py index 3615560..8f67449 100644 --- a/tests/modules/test_example.py +++ b/tests/modules/test_example.py @@ -3,8 +3,8 @@ import pytest from rookify.modules.example.main import ExampleHandler +from rookify.modules.exception import ModuleException from rookify.modules.machine import Machine -from rookify.modules.module import ModuleException def test_preflight() -> None: