diff --git a/acto/deploy.py b/acto/deploy.py index a2debb4dfe..c03f11bf42 100644 --- a/acto/deploy.py +++ b/acto/deploy.py @@ -1,47 +1,36 @@ +import logging import time -import kubernetes import yaml import acto.utils as utils -from acto.common import * +from acto.common import kubernetes_client, print_event from acto.kubectl_client.kubectl import KubectlClient from acto.lib.operator_config import DELEGATED_NAMESPACE, DeployConfig from acto.utils import get_thread_logger from acto.utils.preprocess import add_acto_label -def wait_for_pod_ready(apiclient: kubernetes.client.ApiClient): - logger = get_thread_logger(with_prefix=True) - logger.debug("Waiting for all pods to be ready") - time.sleep(5) - pod_ready = False - for tick in range(600): - # check if all pods are ready - pods = kubernetes.client.CoreV1Api( - apiclient).list_pod_for_all_namespaces().items - - all_pods_ready = True - for pod in pods: - if pod.status.phase == "Succeeded": - continue - if not utils.is_pod_ready(pod): - all_pods_ready = False - - if all_pods_ready: - logger.info("Operator ready") - pod_ready = True - break - time.sleep(5) - logger.info("All pods took %d seconds to get ready" % (tick * 5)) - if not pod_ready: - logger.error("Some pods failed to be ready within timeout") +def wait_for_pod_ready(kubectl_client: KubectlClient) -> bool: + """Wait for all pods to be ready""" + now = time.time() + p = kubectl_client.wait_for_all_pods(timeout=600) + if p.returncode != 0: + logging.error( + "Failed to wait for all pods to be ready due to error from kubectl" + + f" (returncode={p.returncode})" + + f" (stdout={p.stdout})" + + f" (stderr={p.stderr})" + ) return False - else: - return True + logging.info( + "Waited for all pods to be ready for %d seconds", time.time() - now + ) + return True -class Deploy(): +class Deploy: + """Deploy the operator using the deploy config""" def __init__(self, deploy_config: DeployConfig) -> None: self._deploy_config = deploy_config @@ -52,24 +41,30 @@ def __init__(self, deploy_config: DeployConfig) -> None: self._operator_yaml = step.apply.file break else: - raise Exception("No operator yaml found in deploy config") - + raise RuntimeError("No operator yaml found in deploy config") + # Extract the operator_container_name from config self._operator_container_name = None for step in self._deploy_config.steps: if step.apply and step.apply.operator: - self._operator_container_name = step.apply.operator_container_name + self._operator_container_name = ( + step.apply.operator_container_name + ) break @property def operator_yaml(self) -> str: + """Get the operator yaml file path""" return self._operator_yaml - def deploy(self, - kubeconfig: str, - context_name: str, - kubectl_client: KubectlClient, - namespace: str): + def deploy( + self, + kubeconfig: str, + context_name: str, + kubectl_client: KubectlClient, + namespace: str, + ): + """Deploy the operator using the deploy config""" logger = get_thread_logger(with_prefix=True) print_event("Deploying operator...") api_client = kubernetes_client(kubeconfig, context_name) @@ -97,14 +92,16 @@ def deploy(self, p = kubectl_client.kubectl(args, capture_output=True) if p.returncode != 0: logger.error( - "Failed to deploy operator due to error from kubectl" + - f" (returncode={p.returncode})" + - f" (stdout={p.stdout})" + - f" (stderr={p.stderr})") + "Failed to deploy operator due to error from kubectl" + + f" (returncode={p.returncode})" + + f" (stdout={p.stdout})" + + f" (stderr={p.stderr})" + ) return False - elif not wait_for_pod_ready(api_client): + elif not wait_for_pod_ready(kubectl_client): logger.error( - "Failed to deploy operator due to timeout waiting for pod to be ready") + "Failed to deploy operator due to timeout waiting for pod to be ready" + ) return False elif step.wait: # Simply wait for the specified duration @@ -112,7 +109,7 @@ def deploy(self, # Add acto label to the operator pod add_acto_label(api_client, namespace) - if not wait_for_pod_ready(api_client): + if not wait_for_pod_ready(kubectl_client): logger.error("Failed to deploy operator") return False @@ -121,14 +118,17 @@ def deploy(self, print_event("Operator deployed") return True - def deploy_with_retry(self, - kubeconfig: str, - context_name: str, - kubectl_client: KubectlClient, - namespace: str, - retry_count: int = 3): + def deploy_with_retry( + self, + kubeconfig: str, + context_name: str, + kubectl_client: KubectlClient, + namespace: str, + retry_count: int = 3, + ): + """Deploy the operator with retry""" logger = get_thread_logger(with_prefix=True) - for i in range(retry_count): + for _ in range(retry_count): if self.deploy(kubeconfig, context_name, kubectl_client, namespace): return True else: @@ -136,7 +136,8 @@ def deploy_with_retry(self, return False def operator_name(self) -> str: - with open(self._operator_yaml) as f: + """Get the name of the operator deployment""" + with open(self._operator_yaml, "r", encoding="utf-8") as f: operator_yamls = yaml.load_all(f, Loader=yaml.FullLoader) for yaml_ in operator_yamls: if yaml_["kind"] == "Deployment": @@ -145,4 +146,5 @@ def operator_name(self) -> str: @property def operator_container_name(self) -> str: + """Get the name of the operator container""" return self._operator_container_name diff --git a/acto/kubectl_client/helm.py b/acto/kubectl_client/helm.py new file mode 100644 index 0000000000..21d8f15b5f --- /dev/null +++ b/acto/kubectl_client/helm.py @@ -0,0 +1,46 @@ +import subprocess +from typing import Optional + + +class Helm: + """Helm client class""" + + def __init__(self, kubeconfig: str, context_name: str) -> None: + self.kubeconfig = kubeconfig + self.context_name = context_name + + def helm(self, args: list) -> subprocess.CompletedProcess: + """Executes a helm command""" + cmd = ["helm"] + cmd.extend(args) + cmd.extend(["--kubeconfig", self.kubeconfig]) + cmd.extend(["--kube-context", self.context_name]) + return subprocess.run(cmd, capture_output=True, text=True, check=False) + + def repo_add(self, name: str, url: str) -> subprocess.CompletedProcess: + """Adds a helm repository""" + cmd = ["repo", "add", name, url] + return self.helm(cmd) + + def install( + self, + release_name: str, + chart: str, + namespace: str, + repo: Optional[str] = None, + args: Optional[list] = None, + ) -> subprocess.CompletedProcess: + """Installs a helm chart""" + cmd = [ + "install", + release_name, + chart, + "--namespace", + namespace, + "--create-namespace", + ] + if repo: + cmd.extend(["--repo", repo]) + if args: + cmd.extend(args) + return self.helm(cmd) diff --git a/acto/kubectl_client/kubectl.py b/acto/kubectl_client/kubectl.py index 8f9ffa805f..da4a8181c8 100644 --- a/acto/kubectl_client/kubectl.py +++ b/acto/kubectl_client/kubectl.py @@ -1,44 +1,95 @@ +import logging import subprocess +from typing import Optional class KubectlClient: + """Kubectl client class""" def __init__(self, kubeconfig: str, context_name: str): if not kubeconfig: - raise ValueError('kubeconfig is required') + raise ValueError("kubeconfig is required") if not context_name: - raise ValueError('context_name is required') + raise ValueError("context_name is required") self.kubeconfig = kubeconfig self.context_name = context_name - def exec(self, - pod: str, - namespace: str, - commands: list, - capture_output=False, - text=False) -> subprocess.CompletedProcess: - '''Executes a command in a pod''' - cmd = ['exec'] + def exec( + self, + pod: str, + namespace: str, + commands: list, + capture_output=False, + text=False, + ) -> subprocess.CompletedProcess: + """Executes a command in a pod""" + cmd = ["exec"] cmd.extend([pod]) - cmd.extend(['--namespace', namespace]) - cmd.extend(['--']) + cmd.extend(["--namespace", namespace]) + cmd.extend(["--"]) cmd.extend(commands) return self.kubectl(cmd, capture_output, text) - def kubectl(self, - args: list, - capture_output=False, - text=False, - timeout: int = 600) -> subprocess.CompletedProcess: - '''Executes a kubectl command''' - cmd = ['kubectl'] - cmd.extend(['--kubeconfig', self.kubeconfig]) - cmd.extend(['--context', self.context_name]) + def kubectl( + self, args: list, capture_output=False, text=False, timeout: int = 600 + ) -> subprocess.CompletedProcess: + """Executes a kubectl command""" + cmd = ["kubectl"] + cmd.extend(["--kubeconfig", self.kubeconfig]) + cmd.extend(["--context", self.context_name]) cmd.extend(args) - p = subprocess.run(cmd, capture_output=capture_output, text=text, timeout=timeout) - return p \ No newline at end of file + logging.info("Running kubectl command: %s", " ".join(cmd)) + p = subprocess.run( + cmd, + capture_output=capture_output, + text=text, + timeout=timeout, + check=False, + ) + return p + + def wait( + self, + file: str, + for_condition: str, + timeout: int = 600, + namespace: Optional[str] = None, + ) -> subprocess.CompletedProcess: + """Waits for a condition to be true""" + cmd = [ + "wait", + "-f", + file, + "--for", + for_condition, + "--timeout", + f"{timeout}s", + ] + if namespace: + cmd.extend(["-n", namespace]) + else: + cmd.extend(["--all-namespaces"]) + return self.kubectl(cmd, capture_output=True, text=True) + + def wait_for_all_pods( + self, timeout: int = 600, namespace: Optional[str] = None + ) -> subprocess.CompletedProcess: + """Waits for all pods to be ready""" + cmd = [ + "wait", + "--for=condition=Ready", + "--timeout", + f"{timeout}s", + "pods", + "--all", + ] + if namespace: + cmd.extend(["-n", namespace]) + else: + cmd.extend(["--all-namespaces"]) + return self.kubectl(cmd, capture_output=True, text=True) diff --git a/acto/kubernetes_engine/kind.py b/acto/kubernetes_engine/kind.py index 09dac36752..ee5c1d00cf 100644 --- a/acto/kubernetes_engine/kind.py +++ b/acto/kubernetes_engine/kind.py @@ -2,7 +2,7 @@ import os import subprocess import time -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional import kubernetes import yaml @@ -22,7 +22,7 @@ def __init__( posthooks: List[base.KubernetesEnginePostHookType] = None, feature_gates: Dict[str, bool] = None, num_nodes=1, - version="", + version: Optional[str] = None, ): self._config_path = os.path.join( CONST.CLUSTER_CONFIG_FOLDER, f"KIND-{acto_namespace}.yaml" @@ -39,7 +39,7 @@ def __init__( extra_mounts.append( {"hostPath": "profile/data", "containerPath": "/tmp/profile"} ) - for _ in range(num_nodes - 1): + for _ in range(num_nodes): config_dict["nodes"].append( { "role": "worker", @@ -108,7 +108,8 @@ def create_cluster(self, name: str, kubeconfig: str): cmd.extend(["--config", self._config_path]) - cmd.extend(["--image", f"kindest/node:{self._k8s_version}"]) + if self._k8s_version: + cmd.extend(["--image", f"kindest/node:{self._k8s_version}"]) p = subprocess.run(cmd, check=False) i = 0 diff --git a/acto/lib/operator_config.py b/acto/lib/operator_config.py index 9d50e37006..0543c3d061 100644 --- a/acto/lib/operator_config.py +++ b/acto/lib/operator_config.py @@ -8,7 +8,8 @@ class ApplyStep(pydantic.BaseModel, extra="forbid"): """Configuration for each step of kubectl apply""" - file: str = pydantic.Field(description="Path to the file for kubectl apply") + file: str = pydantic.Field( + description="Path to the file for kubectl apply") operator: bool = pydantic.Field( description="If the file contains the operator deployment", default=False, diff --git a/acto/runner/fault_injection_runner.py b/acto/runner/fault_injection_runner.py new file mode 100644 index 0000000000..65259c4f1f --- /dev/null +++ b/acto/runner/fault_injection_runner.py @@ -0,0 +1,58 @@ +"""Runner module for Acto""" + +import logging +from typing import Callable + +import kubernetes + +from acto.common import kubernetes_client +from acto.kubectl_client import KubectlClient +from acto.snapshot import Snapshot + +RunnerHookType = Callable[[kubernetes.client.ApiClient], None] +CustomSystemStateHookType = Callable[ + [kubernetes.client.ApiClient, str, int], dict +] + + +class FaultInjectionRunner: + """Runner class for Acto. + This class is used to run the cmd and collect system state, + delta, operator log, events and input files. + """ + + def __init__( + self, + trial_dir: str, + kubeconfig: str, + context_name: str, + ): + self.trial_dir = trial_dir + self.kubeconfig = kubeconfig + self.context_name = context_name + + self.kubectl_client = KubectlClient(kubeconfig, context_name) + + apiclient = kubernetes_client(kubeconfig, context_name) + + def run( + self, + file_path: str, + namespace: str, + ) -> tuple[Snapshot, bool]: + """Apply the input CR""" + + cmd = ["apply", "-f", file_path, "-n", namespace] + + # submit the CR + cli_result = self.kubectl_client.kubectl( + cmd, capture_output=True, text=True + ) + + if cli_result.returncode != 0: + logging.error( + "kubectl apply failed with return code %d", + cli_result.returncode, + ) + logging.error("STDOUT: %s", cli_result.stdout) + logging.error("STDERR: %s", cli_result.stderr) diff --git a/acto/system_state/replica_set.py b/acto/system_state/replica_set.py index 7cf40816ef..2a2ec33d9b 100644 --- a/acto/system_state/replica_set.py +++ b/acto/system_state/replica_set.py @@ -1,4 +1,5 @@ """ReplicaSet state model.""" + import kubernetes import kubernetes.client.models as kubernetes_models import pydantic @@ -38,7 +39,12 @@ def check_health(self) -> tuple[bool, str]: ): return False, f"ReplicaSet[{name}] generation mismatch" - if replica_set.spec.replicas != replica_set.status.ready_replicas: + if ( + replica_set.status.ready_replicas is None + and replica_set.spec.replicas == 0 + ): + pass + elif replica_set.spec.replicas != replica_set.status.ready_replicas: return False, f"ReplicaSet[{name}] replicas mismatch" if replica_set.status.conditions is not None: