Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: support per-run hook for Runner #284

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions acto/__main__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import argparse
from datetime import datetime
import importlib
import json
import logging
import os
import signal
import sys
import threading
import time
import importlib

from datetime import datetime

start_time = time.time()
workdir_path = 'testrun-%s' % datetime.now().strftime('%Y-%m-%d-%H-%M')
Expand Down Expand Up @@ -92,10 +91,9 @@
from acto import common
from acto.engine import Acto, apply_testcase
from acto.input.input import DeterministicInputModel, InputModel
from acto.post_process import PostDiffTest
from acto.lib.operator_config import OperatorConfig
from acto.post_process.post_diff_test import PostDiffTest
from acto.utils.error_handler import handle_excepthook, thread_excepthook

from acto.utils.thread_logger import get_thread_logger

logger = get_thread_logger(with_prefix=False)
Expand Down
2 changes: 1 addition & 1 deletion acto/kubernetes_engine/kind.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def create_cluster(self, name: str, kubeconfig: str):

cmd.extend(['--image', f"kindest/node:{self._k8s_version}"])

p = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
p = subprocess.run(cmd)
while p.returncode != 0:
# TODO: retry for three times
logging.error('Failed to create kind cluster, retrying')
Expand Down
1 change: 0 additions & 1 deletion acto/post_process/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
from .post_diff_test import PostDiffTest
48 changes: 38 additions & 10 deletions acto/runner/runner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import base64
import queue
import time
from multiprocessing import Process, Queue, set_start_method, get_start_method
from multiprocessing import Process, Queue, get_start_method, set_start_method
from typing import Callable

import yaml

Expand All @@ -12,6 +13,9 @@
from acto.snapshot import Snapshot
from acto.utils import get_thread_logger

RunnerHookType = Callable[[kubernetes.client.ApiClient], None]
CustomSystemStateHookType = Callable[[kubernetes.client.ApiClient, str, int], dict]


class Runner(object):

Expand All @@ -20,6 +24,7 @@ def __init__(self,
trial_dir: str,
kubeconfig: str,
context_name: str,
custom_system_state_f: Callable[[], dict] = None,
wait_time: int = 45):
self.namespace = context["namespace"]
self.crd_metainfo: dict = context['crd']
Expand All @@ -32,6 +37,7 @@ def __init__(self,
self.kubectl_client = KubectlClient(kubeconfig, context_name)

apiclient = kubernetes_client(kubeconfig, context_name)
self.apiclient = apiclient
self.coreV1Api = kubernetes.client.CoreV1Api(apiclient)
self.appV1Api = kubernetes.client.AppsV1Api(apiclient)
self.batchV1Api = kubernetes.client.BatchV1Api(apiclient)
Expand Down Expand Up @@ -63,12 +69,21 @@ def __init__(self,
if get_start_method() != "fork":
set_start_method("fork")

def run(self, input: dict, generation: int) -> Tuple[Snapshot, bool]:
self._custom_system_state_f = custom_system_state_f

def _run_with_serialization(self, input: dict, generation: int) -> Tuple[Snapshot, bool]:
snapshot, err = self.run(input, generation)
snapshot.serialize(self.trial_dir)
return snapshot, err

def run(self, input: dict, generation: int, hooks: List[RunnerHookType] = None) -> Tuple[Snapshot, bool]:
'''Simply run the cmd and dumps system_state, delta, operator log, events and input files without checking.
The function blocks until system converges.
TODO: move the serialization part to a separate function

Args:
input: CR to be applied in the format of dict
generation: the generation number of the input file

Returns:
result, err
Expand All @@ -79,14 +94,21 @@ def run(self, input: dict, generation: int) -> Tuple[Snapshot, bool]:
self.system_state_path = "%s/system-state-%03d.json" % (self.trial_dir, generation)
self.events_log_path = "%s/events-%d.json" % (self.trial_dir, generation)
self.cli_output_path = "%s/cli-output-%d.log" % (self.trial_dir, generation)
self.not_ready_pod_log_path = "{}/not-ready-pod-{}-{{}}.log".format(self.trial_dir, generation)
self.not_ready_pod_log_path = "{}/not-ready-pod-{}-{{}}.log".format(
self.trial_dir, generation)

# call user-defined hooks
if hooks is not None:
for hook in hooks:
hook(self.apiclient)

mutated_filename = '%s/mutated-%d.yaml' % (self.trial_dir, generation)
with open(mutated_filename, 'w') as mutated_cr_file:
yaml.dump(input, mutated_cr_file)

cmd = ['apply', '-f', mutated_filename, '-n', self.namespace]

# submit the CR
cli_result = self.kubectl_client.kubectl(cmd, capture_output=True, text=True)
logger.debug('STDOUT: ' + cli_result.stdout)
logger.debug('STDERR: ' + cli_result.stderr)
Expand All @@ -97,7 +119,7 @@ def run(self, input: dict, generation: int) -> Tuple[Snapshot, bool]:
logger.error('STDERR: ' + cli_result.stderr)
return Snapshot(input, self.collect_cli_result(cli_result), {}, []), True
err = None

try:
err = self.wait_for_system_converge()
except (KeyError, ValueError) as e:
Expand All @@ -108,6 +130,8 @@ def run(self, input: dict, generation: int) -> Tuple[Snapshot, bool]:

# when client API raise an exception, catch it and write to log instead of crashing Acto
try:
if self._custom_system_state_f is not None:
_ = self._custom_system_state_f(self.apiclient, self.trial_dir, generation)
system_state = self.collect_system_state()
operator_log = self.collect_operator_log()
self.collect_events()
Expand Down Expand Up @@ -243,24 +267,28 @@ def collect_events(self):
def collect_not_ready_pods_logs(self):
pods = self.coreV1Api.list_namespaced_pod(self.namespace)
for pod in pods.items:
for container_statuses in [pod.status.container_statuses or [], pod.status.init_container_statuses or []]:
for container_statuses in [
pod.status.container_statuses or [],
pod.status.init_container_statuses or []]:
for container_status in container_statuses:
if not container_status.ready:
try:
log = self.coreV1Api.read_namespaced_pod_log(pod.metadata.name, self.namespace,
container=container_status.name)
log = self.coreV1Api.read_namespaced_pod_log(
pod.metadata.name, self.namespace, container=container_status.name)
if len(log) != 0:
with open(self.not_ready_pod_log_path.format(container_status.name), 'w') as f:
f.write(log)
if container_status.last_state.terminated is not None:
log = self.coreV1Api.read_namespaced_pod_log(pod.metadata.name, self.namespace,
container=container_status.name, previous=True)
log = self.coreV1Api.read_namespaced_pod_log(
pod.metadata.name, self.namespace,
container=container_status.name, previous=True)
if len(log) != 0:
with open(self.not_ready_pod_log_path.format('prev-'+container_status.name), 'w') as f:
f.write(log)
except kubernetes.client.rest.ApiException as e:
logger = get_thread_logger(with_prefix=True)
logger.error('Failed to get previous log of pod %s' % pod.metadata.name, exc_info=e)
logger.error('Failed to get previous log of pod %s' %
pod.metadata.name, exc_info=e)

def collect_cli_result(self, p: subprocess.CompletedProcess):
cli_output = {}
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
deepdiff~=6.3.0
kubernetes==22.6.0
kubernetes==26.1.0
exrex~=0.11.0
jsonschema~=4.17.3
jsonpatch~=1.33
Expand Down