Skip to content

Commit

Permalink
Externalize ModuleHandler subclasses
Browse files Browse the repository at this point in the history
Signed-off-by: Tobias Wolf <[email protected]>
  • Loading branch information
NotTheEvilOne committed Jun 5, 2024
1 parent f693cc5 commit 4dcc214
Show file tree
Hide file tree
Showing 13 changed files with 225 additions and 186 deletions.
30 changes: 30 additions & 0 deletions src/rookify/modules/ceph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-

import json
import rados
from typing import Any, Dict, List
from .exception import ModuleException


class Ceph:
def __init__(self, config: Dict[str, Any]):
try:
self.__ceph = rados.Rados(
conffile=config["config"], conf={"keyring": config["keyring"]}
)
self.__ceph.connect()
except rados.ObjectNotFound as err:
raise ModuleException(f"Could not connect to ceph: {err}")

def __getattr__(self, name: str) -> Any:
return getattr(self.__ceph, name)

def mon_command(self, command: str, **kwargs: str) -> Dict[str, Any] | List[Any]:
cmd = {"prefix": command, "format": "json"}
cmd.update(**kwargs)
result = self.__ceph.mon_command(json.dumps(cmd), b"")
if result[0] != 0:
raise ModuleException(f"Ceph did return an error: {result}")
data = json.loads(result[1])
assert isinstance(data, dict) or isinstance(data, list)
return data
3 changes: 2 additions & 1 deletion src/rookify/modules/cephx_auth_config/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# -*- coding: utf-8 -*-

from typing import Any
from ..exception import ModuleException
from ..machine import Machine
from ..module import ModuleException, ModuleHandler
from ..module import ModuleHandler


class CephXAuthHandler(ModuleHandler):
Expand Down
44 changes: 19 additions & 25 deletions src/rookify/modules/create_cluster/main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# -*- coding: utf-8 -*-

import kubernetes
from typing import Any
from ..exception import ModuleException
from ..machine import Machine
from ..module import ModuleHandler, ModuleException
from ..module import ModuleHandler


class CreateClusterHandler(ModuleHandler):
Expand All @@ -16,15 +16,15 @@ class CreateClusterHandler(ModuleHandler):

@property
def __mon_placement_label(self) -> str:
return (
return ( # type: ignore
self._config["rook"]["cluster"]["mon_placement_label"]
if "mon_placement_label" in self._config["rook"]["cluster"]
else f"placement-{self._config["rook"]["cluster"]["name"]}-mon"
)

@property
def __mgr_placement_label(self) -> str:
return (
return ( # type: ignore
self._config["rook"]["cluster"]["mgr_placement_label"]
if "mgr_placement_label" in self._config["rook"]["cluster"]
else f"placement-{self._config["rook"]["cluster"]["name"]}-mgr"
Expand Down Expand Up @@ -104,13 +104,9 @@ def execute(self) -> None:

self.k8s.crd_api_apply(cluster_definition)

cluster_name = self._config["rook"]["cluster"]["name"]

# Wait for CephCluster to get into Progressing phase
result = None
watcher = kubernetes.watch.Watch()

stream = watcher.stream(
result = self.k8s.watch_events(
self._watch_cluster_phase_callback,
self.k8s.custom_objects_api.list_namespaced_custom_object,
"ceph.rook.io",
"v1",
Expand All @@ -119,23 +115,21 @@ def execute(self) -> None:
timeout_seconds=60,
)

for event in stream:
event_object = event["object"]

if event_object["metadata"]["name"] != cluster_name:
continue

try:
if event_object["status"]["phase"] == "Progressing":
result = event_object
break
except KeyError:
pass
if result is None:
raise ModuleException("CephCluster did not come up")

watcher.stop()
def _watch_cluster_phase_callback(self, event_object: Any) -> Any:
try:
if (
event_object["metadata"]["name"]
== self._config["rook"]["cluster"]["name"]
and event_object["status"]["phase"] == "Progressing"
):
return event_object
except KeyError:
pass

if result == None:
raise ModuleException("CephCluster did not come up")
return None

@staticmethod
def register_preflight_state(
Expand Down
6 changes: 3 additions & 3 deletions src/rookify/modules/create_configmap/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# -*- coding: utf-8 -*-

import kubernetes
from ..machine import Machine
from ..module import ModuleHandler, ModuleException

from typing import Any, Dict
from ..exception import ModuleException
from ..machine import Machine
from ..module import ModuleHandler


class CreateConfigMapHandler(ModuleHandler):
Expand Down
3 changes: 2 additions & 1 deletion src/rookify/modules/example/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-

from typing import Any
from ..module import ModuleHandler, ModuleException
from ..exception import ModuleException
from ..module import ModuleHandler


class ExampleHandler(ModuleHandler):
Expand Down
5 changes: 5 additions & 0 deletions src/rookify/modules/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# -*- coding: utf-8 -*-


class ModuleException(Exception):
pass
83 changes: 83 additions & 0 deletions src/rookify/modules/k8s.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-

import kubernetes
from typing import Any, Callable, Dict, Optional


class K8s:
def __init__(self, config: Dict[str, Any]):
k8s_config = kubernetes.config.load_kube_config(config_file=config["config"])
self.__client = kubernetes.client.ApiClient(k8s_config)
self.__dynamic_client: Optional[kubernetes.dynamic.DynamicClient] = None

@property
def core_v1_api(self) -> kubernetes.client.CoreV1Api:
return kubernetes.client.CoreV1Api(self.__client)

@property
def apps_v1_api(self) -> kubernetes.client.AppsV1Api:
return kubernetes.client.AppsV1Api(self.__client)

@property
def node_v1_api(self) -> kubernetes.client.NodeV1Api:
return kubernetes.client.NodeV1Api(self.__client)

@property
def custom_objects_api(self) -> kubernetes.client.CustomObjectsApi:
return kubernetes.client.CustomObjectsApi(self.__client)

@property
def dynamic_client(self) -> kubernetes.dynamic.DynamicClient:
if not self.__dynamic_client:
self.__dynamic_client = kubernetes.dynamic.DynamicClient(self.__client)
return self.__dynamic_client

def crd_api(
self, api_version: str, kind: str
) -> kubernetes.dynamic.resource.Resource:
return self.dynamic_client.resources.get(api_version=api_version, kind=kind)

def crd_api_apply(
self, manifest: Dict[Any, Any]
) -> kubernetes.dynamic.resource.ResourceInstance:
"""
This applies a manifest for custom CRDs
See https://github.com/kubernetes-client/python/issues/1792 for more information
:param manifest: Dict of the kubernetes manifest
"""
api_version = manifest["apiVersion"]
kind = manifest["kind"]
resource_name = manifest["metadata"]["name"]
namespace = manifest["metadata"]["namespace"]
crd_api = self.crd_api(api_version=api_version, kind=kind)

try:
crd_api.get(namespace=namespace, name=resource_name)
return crd_api.patch(
body=manifest, content_type="application/merge-patch+json"
)
except kubernetes.dynamic.exceptions.NotFoundError:
return crd_api.create(body=manifest, namespace=namespace)

def watch_events(
self,
callback_func: Callable[[Any], Any],
func: Callable[[Any], Any],
*args: Any,
**kwargs: Any,
) -> Any:
watcher = kubernetes.watch.Watch()

stream = watcher.stream(func, *args, **kwargs)

try:
for event in stream:
try:
result = callback_func(event["object"])
except StopIteration:
continue

if result is not None:
return result
finally:
watcher.stop()
3 changes: 2 additions & 1 deletion src/rookify/modules/k8s_prerequisites_check/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-

from ..module import ModuleException, ModuleHandler
from ..exception import ModuleException
from ..module import ModuleHandler


class K8sPrerequisitesCheckHandler(ModuleHandler):
Expand Down
Loading

0 comments on commit 4dcc214

Please sign in to comment.