Skip to content

Commit

Permalink
make it possible to generate context.json
Browse files Browse the repository at this point in the history
  • Loading branch information
KashunCheng committed Jul 11, 2023
1 parent c86554d commit 2aa1977
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 65 deletions.
2 changes: 1 addition & 1 deletion acto/checker/checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def check(self, snapshot: Snapshot, prev_snapshot: Snapshot) -> OracleResult:
result = self._check(snapshot, prev_snapshot)
except Exception as e:
result = OracleResult(message=str(e), exception=e)
result.set_emitter(self.name)
result.set_emitter(self)
return result

@abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion acto/checker/checker_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

class CheckerSet:
def __init__(self, context: dict, input_model: InputModel, checker_generators: list = None):
if checker_generators is None or checker_generators == []:
if checker_generators is None:
checker_generators = [CrashChecker, HealthChecker, KubectlCliChecker, OperatorLogChecker, StateChecker, RecoveryChecker]
self.context = context
self.input_model = input_model
Expand Down
4 changes: 3 additions & 1 deletion acto/checker/impl/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Dict, List

from acto.checker.checker import Checker, OracleControlFlow, OracleResult
from acto.lib.dict import visit_dict
from acto.snapshot import Snapshot
from acto.utils import get_thread_logger

Expand Down Expand Up @@ -89,7 +90,8 @@ def _check(self, snapshot: Snapshot, __: Snapshot) -> OracleResult:
container['restart_count']))

# check Health of CRs
if system_state['custom_resource_status'] is not None and 'conditions' in system_state['custom_resource_status']:
_, conditions = visit_dict(system_state, ['custom_resource_status', 'conditions'])
if conditions is not None:
for condition in system_state['custom_resource_status']['conditions']:
if condition['type'] == 'Ready' and condition['status'] != 'True' and 'is forbidden' in condition['message'].lower():
unhealthy_resources['cr'].append('%s condition [%s] status [%s] message [%s]' %
Expand Down
17 changes: 9 additions & 8 deletions acto/deploy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enum import Enum, auto, unique
from functools import wraps
from enum import auto, unique
from functools import wraps, partial
from typing import TypeVar, Callable

import yaml
Expand Down Expand Up @@ -38,8 +38,8 @@ class Deploy:
"""

def __init__(self, file_path: str, init_yaml=None):
self.crd_yaml_files = list(yaml.load_all(open(file_path, 'r').read(), Loader=yaml.FullLoader))
self.init_yaml_files = list(yaml.load_all(open(init_yaml, 'r').read(), Loader=yaml.FullLoader)) if init_yaml else None
self.crd_yaml_files = list(yaml.safe_load_all(open(file_path, 'r').read()))
self.init_yaml_files = list(yaml.safe_load_all(open(init_yaml, 'r').read())) if init_yaml else None
self.wait = 20 # sec

def deploy(self, runner: Runner) -> str:
Expand All @@ -66,8 +66,8 @@ def check_status(self, runner: Runner):
crash later when running oracle if operator hasn't been ready
"""
collector_context = CollectorContext(namespace='kube-system', timeout=self.wait)
kube_snapshot_collector = with_context(collector_context, snapshot_collector)
trial = Trial(TrialInputIterator.__new__(TrialInputIterator), CheckerSet({}, InputModel.__new__(InputModel)), [])
kube_snapshot_collector = partial(with_context(collector_context, snapshot_collector), ignore_cli_error=True)
trial = Trial(TrialInputIterator.__new__(TrialInputIterator), CheckerSet({}, InputModel.__new__(InputModel), []))

for i in range(5):
kube_snapshot = kube_snapshot_collector(runner, trial, {
Expand Down Expand Up @@ -158,10 +158,11 @@ def deploy(self, runner: Runner) -> str:
ret = utils.create_namespace(kubectl_client.api_client, namespace)
if ret is None:
logger.critical('Failed to create namespace')
# use server side apply to avoid last-applied-configuration
if self.init_yaml_files:
kubectl_client.apply(self.init_yaml_files)
kubectl_client.apply(self.init_yaml_files, server_side=None)
self.check_status(runner)
kubectl_client.apply(self.crd_yaml_files, namespace=namespace)
kubectl_client.apply(self.crd_yaml_files, namespace=namespace, server_side=None)
self.check_status(runner)
print_event('Operator deployed')
return namespace
Expand Down
27 changes: 18 additions & 9 deletions acto/engine_new.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import importlib
import json
import os
Expand All @@ -21,7 +22,7 @@
from acto.kubernetes_engine.kind import Kind
from acto.lib.fp import drop_first_parameter
from acto.runner.ray_runner import Runner
from acto.runner.snapshot_collector import apply_system_input_and_wait, CollectorContext, with_context, snapshot_collector
from acto.runner.snapshot_collector import CollectorContext, with_context, snapshot_collector, wait_for_system_converge
from acto.runner.trial import Trial, TrialInputIterator
from acto.schema import ObjectSchema
from acto.serialization import ContextEncoder
Expand Down Expand Up @@ -75,9 +76,10 @@ def __init__(self,
self.__learn_context(context_file=context_file,
crd_name=operator_config.crd_name,
helper_crd=helper_crd,
analysis_config=operator_config.analysis)
analysis_config=operator_config.analysis,
seed_file_path=operator_config.seed_custom_resource)

if operator_config.analysis != None:
if operator_config.analysis is not None:
used_fields = self.context['analysis_result']['used_fields']
else:
used_fields = None
Expand Down Expand Up @@ -139,7 +141,7 @@ def __init__(self,
self.checkers = checkers
self.num_of_mutations = num_of_mutations

def __learn_context(self, context_file, crd_name, helper_crd, analysis_config):
def __learn_context(self, context_file, crd_name, helper_crd, analysis_config, seed_file_path):
logger = get_thread_logger(with_prefix=False)
context_file_up_to_date = os.path.exists(context_file)

Expand All @@ -157,25 +159,32 @@ def __learn_context(self, context_file, crd_name, helper_crd, analysis_config):
class LearnCompleteException(Exception):
context: dict

def __init__(self, ctx: dict):
self.context = ctx

def collect_learn_context(namespace_discovered: str, runner: Runner, trial: Trial, seed_input: dict):
logger.info('Starting learning run to collect information')
learn_task_context.namespace = namespace_discovered
apply_system_input_and_wait(learn_task_context, runner, seed_input)
context['namespace'] = namespace_discovered
runner.kubectl_client.apply(seed_input, namespace=namespace_discovered)

learn_task_context.set_collector_if_none(runner.kubectl_client)
asyncio.run(wait_for_system_converge(learn_task_context.kubectl_collector, learn_task_context.timeout))

update_preload_images(context, runner.cluster.get_node_list(runner.kubectl_client.context_name))
update_preload_images(context, runner.cluster.get_node_list(runner.cluster_name))
process_crd(context, runner.kubectl_client.api_client, runner.kubectl_client, crd_name, helper_crd)

raise LearnCompleteException(context)

task = self.deploy.chain_with(collect_learn_context)
# TODO, add protocols to Trial to suppress type error
runner_trial = Trial((), None)
runner_trial = Trial(TrialInputIterator(iter(()), None, yaml.safe_load(open(seed_file_path))), None)
self.runners.submit(lambda runner, trial: runner.run.remote(trial, task), runner_trial)
runner_trial = self.runners.get_next()
assert isinstance(runner_trial.error, LearnCompleteException)

self.context = runner_trial.error.context
self.context['analysis_result'] = do_static_analysis(analysis_config)
if analysis_config:
self.context['analysis_result'] = do_static_analysis(analysis_config)

with open(context_file, 'w') as context_fout:
json.dump(self.context, context_fout, cls=ContextEncoder, indent=4, sort_keys=True)
Expand Down
2 changes: 1 addition & 1 deletion acto/input/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def __init__(self,
if self.example_dir is not None:
for example_filepath in glob.glob(self.example_dir + '*.yaml'):
with open(example_filepath, 'r') as example_file:
docs = yaml.load_all(example_file, Loader=yaml.FullLoader)
docs = yaml.safe_load_all(example_file)
for doc in docs:
example_docs.append(doc)

Expand Down
12 changes: 9 additions & 3 deletions acto/kubectl_client/kubectl.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,22 @@ def kubectl(self,
def apply(self, file_content: dict, **kwargs) -> subprocess.CompletedProcess:
args = {
'namespace': '-n',
'server_side': '--server-side',
}
with tempfile.NamedTemporaryFile(mode='w+', suffix='.yaml') as f:
yaml.dump(file_content, f)
if isinstance(file_content, list):
yaml.safe_dump_all(file_content, f)
else:
yaml.safe_dump(file_content, f)
cmd = ['apply', '-f', f.name]
for k, v in kwargs.items():
if k in args:
cmd.extend([args[k], v])
cmd.extend([args[k]])
if v:
cmd.extend([v])
else:
raise ValueError(f'Invalid argument {k}')
return self.kubectl(cmd, **kwargs)
return self.kubectl(cmd)


def kubernetes_client(kubeconfig: str, context_name: str) -> kubernetes.client.ApiClient:
Expand Down
18 changes: 11 additions & 7 deletions acto/runner/ray_runner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import threading
import traceback
import uuid
from typing import Type, TypeVar, Callable

Expand Down Expand Up @@ -32,13 +33,14 @@ def run(self, trial: Trial, snapshot_collector: Callable[['Runner', Trial, dict]
snapshot = snapshot_collector(self, trial, system_input)
except Exception as e:
error = e
# TODO: do not use print
print(traceback.format_exc())
trial.send_snapshot(snapshot, error)
self.cluster_ok_event.clear()
threading.Thread(target=self.__reset_cluster_and_set_available).start()
return trial

def __reset_cluster_and_set_available(self):
ray.util.pdb.set_trace()
self.__teardown_cluster()
self.__setup_cluster()
self.cluster_ok_event.set()
Expand All @@ -48,17 +50,19 @@ def __setup_cluster_and_set_available(self):
self.cluster_ok_event.set()

def __setup_cluster(self):
context_name = str(uuid.uuid4())
os.makedirs('.kube', exist_ok=True)
kubeconfig = os.path.join(os.path.expanduser('~'), '.kube', context_name)
self.cluster_name = str(uuid.uuid4())
kube_dir = os.path.join(os.path.expanduser('~'), '.kube')
os.makedirs(kube_dir, exist_ok=True)

self.cluster = self.kubernetes_engine_class()
self.cluster.configure_cluster(self.num_nodes, self.engine_version)
self.cluster.create_cluster(context_name, kubeconfig, self.engine_version)
context_name = self.cluster.get_context_name(self.cluster_name)
kubeconfig = os.path.join(kube_dir, context_name)
self.cluster.create_cluster(self.cluster_name, kubeconfig, self.engine_version)

self.kubectl_client = KubectlClient(kubeconfig, context_name)


def __teardown_cluster(self):
self.cluster.delete_cluster(self.kubectl_client.context_name, self.kubectl_client.kubeconfig)
self.cluster.delete_cluster(self.cluster_name, self.kubectl_client.kubeconfig)
self.cluster_name = None
os.remove(self.kubectl_client.kubeconfig)
52 changes: 27 additions & 25 deletions acto/runner/snapshot_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
from dataclasses import dataclass
from datetime import datetime, timedelta
from functools import wraps
from subprocess import CompletedProcess
from typing import Callable, List, Optional
from typing import List, Optional

import kubernetes
from kubernetes.client import CoreV1EventList
from kubernetes.client import CoreV1EventList, CoreV1Event

from acto.checker.checker import OracleControlFlow
from acto.checker.impl.health import HealthChecker
Expand All @@ -30,32 +29,26 @@ def set_collector_if_none(self, kubectl_client: KubectlClient):
self.kubectl_collector = Collector(self.namespace, kubectl_client, self.crd_meta_info)


def with_context(ctx: CollectorContext, collector: Callable[[CollectorContext, Runner, Trial, dict], Snapshot]) -> Callable[[Runner, Trial, dict], Snapshot]:
def with_context(ctx: CollectorContext, collector):
# Todo: Fix type hints
@wraps(collector)
def wrapper(runner: Runner, trial: Trial, system_input: dict) -> Snapshot:
def wrapper(runner: Runner, *args, **kwargs) -> Snapshot:
ctx.set_collector_if_none(runner.kubectl_client)
return collector(ctx, runner, trial, system_input)
return collector(ctx, runner, *args, **kwargs)

return wrapper


def apply_system_input_and_wait(ctx: CollectorContext, runner: Runner, system_input: dict) -> CompletedProcess:
def snapshot_collector(ctx: CollectorContext, runner: Runner, trial: Trial, system_input: dict, ignore_cli_error=False) -> Snapshot:
cli_result = runner.kubectl_client.apply(system_input, namespace=ctx.namespace)
if cli_result.returncode != 0:
if cli_result.returncode != 0 and not ignore_cli_error:
logging.error(f'Failed to apply system input to namespace {ctx.namespace}.\n{system_input}')
raise RuntimeError(f'Failed to apply system input to namespace {ctx.namespace}.\n{system_input}')
asyncio.run(wait_for_system_converge(ctx.kubectl_collector, ctx.timeout))
return cli_result


def snapshot_collector(ctx: CollectorContext, runner: Runner, trial: Trial, system_input: dict) -> Snapshot:
cli_result = apply_system_input_and_wait(ctx, runner, system_input)

cli_result = {
"stdout": cli_result.stdout.strip(),
"stderr": cli_result.stderr.strip(),
"stdout": "" if cli_result.stdout is None else cli_result.stdout.strip(),
"stderr": "" if cli_result.stderr is None else cli_result.stderr.strip(),
}

asyncio.run(wait_for_system_converge(ctx.kubectl_collector, ctx.timeout))

system_state = ctx.kubectl_collector.collect_system_state()
Expand All @@ -69,30 +62,39 @@ def snapshot_collector(ctx: CollectorContext, runner: Runner, trial: Trial, syst
generation=trial.generation, trial_state=trial.state)


async def wait_until_no_future_events(core_api: kubernetes.client.CoreV1Api, namespace: str, timeout: int):
def extract_event_time(event: CoreV1Event) -> Optional[datetime]:
if event.event_time is not None:
return event.event_time
if event.last_timestamp is not None:
return event.last_timestamp
logging.warning(f'event {event} does not have a time')
return None


async def wait_until_no_future_events(core_api: kubernetes.client.CoreV1Api, timeout: int):
"""
Wait until no events are generated for the given namespace for the given timeout.
@param core_api: kubernetes api client, CoreV1Api
@param namespace: kubernetes namespace
@param timeout: timeout in seconds
@return:
"""
while True:
events: CoreV1EventList = core_api.list_namespaced_event(namespace)
events_last_time: List[datetime] = [event.last_timestamp for event in events.items]
events: CoreV1EventList = core_api.list_event_for_all_namespaces()
events_last_time: List[datetime] = [extract_event_time(event) for event in events.items]
events_last_time = list(filter(None, events_last_time))
if not events_last_time:
return True
max_time: datetime = max(events_last_time)
# check how much time has passed since the last event
time_since_last_event = datetime.now() - max_time
time_since_last_event = datetime.now(tz=max_time.tzinfo) - max_time
if time_since_last_event.total_seconds() > timeout:
return True
await asyncio.sleep((timedelta(seconds=timeout) - time_since_last_event).total_seconds())


async def wait_for_system_converge(collector: Collector, no_events_threshold: int, hard_timeout=480):
futures = [
asyncio.create_task(wait_until_no_future_events(collector.coreV1Api, collector.namespace, no_events_threshold)),
asyncio.create_task(wait_until_no_future_events(collector.coreV1Api, no_events_threshold)),
asyncio.create_task(asyncio.sleep(hard_timeout, result=False))
]

Expand All @@ -101,7 +103,7 @@ async def wait_for_system_converge(collector: Collector, no_events_threshold: in

async def wait_for_system_converge_expect_more_events(collector: Collector, no_events_threshold: int, hard_timeout=480) -> bool:
futures = [
asyncio.create_task(wait_until_no_future_events(collector.coreV1Api, collector.namespace, no_events_threshold)),
asyncio.create_task(wait_until_no_future_events(collector.coreV1Api, no_events_threshold)),
asyncio.create_task(asyncio.sleep(hard_timeout, result=False))
]

Expand All @@ -115,4 +117,4 @@ async def wait_for_system_converge_expect_more_events(collector: Collector, no_e
if health_result.means(OracleControlFlow.ok):
return True
# TODO: if the system is not healthy, but we have no events, do we need to wait more?
futures = pending_futures + [asyncio.create_task(wait_until_no_future_events(collector.coreV1Api, collector.namespace, no_events_threshold))]
futures = pending_futures + [asyncio.create_task(wait_until_no_future_events(collector.coreV1Api, no_events_threshold))]
1 change: 0 additions & 1 deletion acto/utils/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ def process_crd(context: dict,
'body': helper_crd_doc
}
context['crd'] = crd_data
logger.debug('CRD data: %s' % crd_data)


def add_acto_label(apiclient: kubernetes.client.ApiClient, context: dict):
Expand Down
Loading

0 comments on commit 2aa1977

Please sign in to comment.