diff --git a/acto/__main__.py b/acto/__main__.py index 20839c18f2..b7525ebb78 100644 --- a/acto/__main__.py +++ b/acto/__main__.py @@ -1,5 +1,5 @@ import argparse -from datetime import datetime +import importlib import json import logging import os @@ -7,8 +7,7 @@ import sys import threading import time -import importlib - +from datetime import datetime start_time = time.time() workdir_path = 'testrun-%s' % datetime.now().strftime('%Y-%m-%d-%H-%M') @@ -92,10 +91,9 @@ from acto import common from acto.engine import Acto, apply_testcase from acto.input.input import DeterministicInputModel, InputModel -from acto.post_process import PostDiffTest from acto.lib.operator_config import OperatorConfig +from acto.post_process.post_diff_test import PostDiffTest from acto.utils.error_handler import handle_excepthook, thread_excepthook - from acto.utils.thread_logger import get_thread_logger logger = get_thread_logger(with_prefix=False) diff --git a/acto/kubernetes_engine/kind.py b/acto/kubernetes_engine/kind.py index 05dd4922be..6386014028 100644 --- a/acto/kubernetes_engine/kind.py +++ b/acto/kubernetes_engine/kind.py @@ -93,7 +93,7 @@ def create_cluster(self, name: str, kubeconfig: str): cmd.extend(['--image', f"kindest/node:{self._k8s_version}"]) - p = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + p = subprocess.run(cmd) while p.returncode != 0: # TODO: retry for three times logging.error('Failed to create kind cluster, retrying') diff --git a/acto/post_process/__init__.py b/acto/post_process/__init__.py index 58f41b3134..e69de29bb2 100644 --- a/acto/post_process/__init__.py +++ b/acto/post_process/__init__.py @@ -1 +0,0 @@ -from .post_diff_test import PostDiffTest \ No newline at end of file diff --git a/acto/runner/runner.py b/acto/runner/runner.py index 641652e3d3..d35d3f257a 100644 --- a/acto/runner/runner.py +++ b/acto/runner/runner.py @@ -1,7 +1,8 @@ import base64 import queue import time -from multiprocessing import Process, Queue, set_start_method, get_start_method +from multiprocessing import Process, Queue, get_start_method, set_start_method +from typing import Callable import yaml @@ -12,6 +13,9 @@ from acto.snapshot import Snapshot from acto.utils import get_thread_logger +RunnerHookType = Callable[[kubernetes.client.ApiClient], None] +CustomSystemStateHookType = Callable[[kubernetes.client.ApiClient, str, int], dict] + class Runner(object): @@ -20,6 +24,7 @@ def __init__(self, trial_dir: str, kubeconfig: str, context_name: str, + custom_system_state_f: Callable[[], dict] = None, wait_time: int = 45): self.namespace = context["namespace"] self.crd_metainfo: dict = context['crd'] @@ -32,6 +37,7 @@ def __init__(self, self.kubectl_client = KubectlClient(kubeconfig, context_name) apiclient = kubernetes_client(kubeconfig, context_name) + self.apiclient = apiclient self.coreV1Api = kubernetes.client.CoreV1Api(apiclient) self.appV1Api = kubernetes.client.AppsV1Api(apiclient) self.batchV1Api = kubernetes.client.BatchV1Api(apiclient) @@ -63,12 +69,21 @@ def __init__(self, if get_start_method() != "fork": set_start_method("fork") - def run(self, input: dict, generation: int) -> Tuple[Snapshot, bool]: + self._custom_system_state_f = custom_system_state_f + + def _run_with_serialization(self, input: dict, generation: int) -> Tuple[Snapshot, bool]: + snapshot, err = self.run(input, generation) + snapshot.serialize(self.trial_dir) + return snapshot, err + + def run(self, input: dict, generation: int, hooks: List[RunnerHookType] = None) -> Tuple[Snapshot, bool]: '''Simply run the cmd and dumps system_state, delta, operator log, events and input files without checking. The function blocks until system converges. TODO: move the serialization part to a separate function Args: + input: CR to be applied in the format of dict + generation: the generation number of the input file Returns: result, err @@ -79,7 +94,13 @@ def run(self, input: dict, generation: int) -> Tuple[Snapshot, bool]: self.system_state_path = "%s/system-state-%03d.json" % (self.trial_dir, generation) self.events_log_path = "%s/events-%d.json" % (self.trial_dir, generation) self.cli_output_path = "%s/cli-output-%d.log" % (self.trial_dir, generation) - self.not_ready_pod_log_path = "{}/not-ready-pod-{}-{{}}.log".format(self.trial_dir, generation) + self.not_ready_pod_log_path = "{}/not-ready-pod-{}-{{}}.log".format( + self.trial_dir, generation) + + # call user-defined hooks + if hooks is not None: + for hook in hooks: + hook(self.apiclient) mutated_filename = '%s/mutated-%d.yaml' % (self.trial_dir, generation) with open(mutated_filename, 'w') as mutated_cr_file: @@ -87,6 +108,7 @@ def run(self, input: dict, generation: int) -> Tuple[Snapshot, bool]: cmd = ['apply', '-f', mutated_filename, '-n', self.namespace] + # submit the CR cli_result = self.kubectl_client.kubectl(cmd, capture_output=True, text=True) logger.debug('STDOUT: ' + cli_result.stdout) logger.debug('STDERR: ' + cli_result.stderr) @@ -97,7 +119,7 @@ def run(self, input: dict, generation: int) -> Tuple[Snapshot, bool]: logger.error('STDERR: ' + cli_result.stderr) return Snapshot(input, self.collect_cli_result(cli_result), {}, []), True err = None - + try: err = self.wait_for_system_converge() except (KeyError, ValueError) as e: @@ -108,6 +130,8 @@ def run(self, input: dict, generation: int) -> Tuple[Snapshot, bool]: # when client API raise an exception, catch it and write to log instead of crashing Acto try: + if self._custom_system_state_f is not None: + _ = self._custom_system_state_f(self.apiclient, self.trial_dir, generation) system_state = self.collect_system_state() operator_log = self.collect_operator_log() self.collect_events() @@ -243,24 +267,28 @@ def collect_events(self): def collect_not_ready_pods_logs(self): pods = self.coreV1Api.list_namespaced_pod(self.namespace) for pod in pods.items: - for container_statuses in [pod.status.container_statuses or [], pod.status.init_container_statuses or []]: + for container_statuses in [ + pod.status.container_statuses or [], + pod.status.init_container_statuses or []]: for container_status in container_statuses: if not container_status.ready: try: - log = self.coreV1Api.read_namespaced_pod_log(pod.metadata.name, self.namespace, - container=container_status.name) + log = self.coreV1Api.read_namespaced_pod_log( + pod.metadata.name, self.namespace, container=container_status.name) if len(log) != 0: with open(self.not_ready_pod_log_path.format(container_status.name), 'w') as f: f.write(log) if container_status.last_state.terminated is not None: - log = self.coreV1Api.read_namespaced_pod_log(pod.metadata.name, self.namespace, - container=container_status.name, previous=True) + log = self.coreV1Api.read_namespaced_pod_log( + pod.metadata.name, self.namespace, + container=container_status.name, previous=True) if len(log) != 0: with open(self.not_ready_pod_log_path.format('prev-'+container_status.name), 'w') as f: f.write(log) except kubernetes.client.rest.ApiException as e: logger = get_thread_logger(with_prefix=True) - logger.error('Failed to get previous log of pod %s' % pod.metadata.name, exc_info=e) + logger.error('Failed to get previous log of pod %s' % + pod.metadata.name, exc_info=e) def collect_cli_result(self, p: subprocess.CompletedProcess): cli_output = {} diff --git a/requirements.txt b/requirements.txt index 49577c7536..4b141b233f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ deepdiff~=6.3.0 -kubernetes==22.6.0 +kubernetes==26.1.0 exrex~=0.11.0 jsonschema~=4.17.3 jsonpatch~=1.33