From 3a188159a813a2235f6776c86044ed34afd401ba Mon Sep 17 00:00:00 2001 From: Tullio Sebastiani Date: Mon, 10 Jul 2023 10:24:35 +0200 Subject: [PATCH] network chaos scenarios --- kraken/network_chaos/actions.py | 158 ++++++++++++++++++-------------- 1 file changed, 87 insertions(+), 71 deletions(-) diff --git a/kraken/network_chaos/actions.py b/kraken/network_chaos/actions.py index b06ef7644..9909e1bb4 100644 --- a/kraken/network_chaos/actions.py +++ b/kraken/network_chaos/actions.py @@ -8,88 +8,103 @@ from jinja2 import Environment, FileSystemLoader import kraken.cerberus.setup as cerberus import kraken.node_actions.common_node_functions as common_node_functions - +from krkn_lib_kubernetes import ScenarioTelemetry, KrknTelemetry # krkn_lib_kubernetes # Reads the scenario config and introduces traffic variations in Node's host network interface. -def run(scenarios_list, config, wait_duration, kubecli: krkn_lib_kubernetes.KrknLibKubernetes): +def run(scenarios_list, config, wait_duration, kubecli: krkn_lib_kubernetes.KrknLibKubernetes, telemetry: KrknTelemetry) -> (list[str], list[ScenarioTelemetry]): failed_post_scenarios = "" logging.info("Runing the Network Chaos tests") + failed_post_scenarios = "" + scenario_telemetries: list[ScenarioTelemetry] = [] + failed_scenarios = [] for net_config in scenarios_list: - with open(net_config, "r") as file: - param_lst = ["latency", "loss", "bandwidth"] - test_config = yaml.safe_load(file) - test_dict = test_config["network_chaos"] - test_duration = int(test_dict.get("duration", 300)) - test_interface = test_dict.get("interfaces", []) - test_node = test_dict.get("node_name", "") - test_node_label = test_dict.get("label_selector", "node-role.kubernetes.io/master") - test_execution = test_dict.get("execution", "serial") - test_instance_count = test_dict.get("instance_count", 1) - test_egress = test_dict.get("egress", {"bandwidth": "100mbit"}) - if test_node: - node_name_list = test_node.split(",") - else: - node_name_list = [test_node] - nodelst = [] - for single_node_name in node_name_list: - nodelst.extend(common_node_functions.get_node(single_node_name, test_node_label, test_instance_count, kubecli)) - file_loader = FileSystemLoader(os.path.abspath(os.path.dirname(__file__))) - env = Environment(loader=file_loader, autoescape=True) - pod_template = env.get_template("pod.j2") - test_interface = verify_interface(test_interface, nodelst, pod_template, kubecli) - joblst = [] - egress_lst = [i for i in param_lst if i in test_egress] - chaos_config = { - "network_chaos": { - "duration": test_duration, - "interfaces": test_interface, - "node_name": ",".join(nodelst), - "execution": test_execution, - "instance_count": test_instance_count, - "egress": test_egress, + scenario_telemetry = ScenarioTelemetry() + scenario_telemetry.scenario = net_config + scenario_telemetry.startTimeStamp = time.time() + telemetry.set_parameters_base64(scenario_telemetry, net_config) + try: + with open(net_config, "r") as file: + param_lst = ["latency", "loss", "bandwidth"] + test_config = yaml.safe_load(file) + test_dict = test_config["network_chaos"] + test_duration = int(test_dict.get("duration", 300)) + test_interface = test_dict.get("interfaces", []) + test_node = test_dict.get("node_name", "") + test_node_label = test_dict.get("label_selector", "node-role.kubernetes.io/master") + test_execution = test_dict.get("execution", "serial") + test_instance_count = test_dict.get("instance_count", 1) + test_egress = test_dict.get("egress", {"bandwidth": "100mbit"}) + if test_node: + node_name_list = test_node.split(",") + else: + node_name_list = [test_node] + nodelst = [] + for single_node_name in node_name_list: + nodelst.extend(common_node_functions.get_node(single_node_name, test_node_label, test_instance_count, kubecli)) + file_loader = FileSystemLoader(os.path.abspath(os.path.dirname(__file__))) + env = Environment(loader=file_loader, autoescape=True) + pod_template = env.get_template("pod.j2") + test_interface = verify_interface(test_interface, nodelst, pod_template, kubecli) + joblst = [] + egress_lst = [i for i in param_lst if i in test_egress] + chaos_config = { + "network_chaos": { + "duration": test_duration, + "interfaces": test_interface, + "node_name": ",".join(nodelst), + "execution": test_execution, + "instance_count": test_instance_count, + "egress": test_egress, + } } - } - logging.info("Executing network chaos with config \n %s" % yaml.dump(chaos_config)) - job_template = env.get_template("job.j2") - try: - for i in egress_lst: - for node in nodelst: - exec_cmd = get_egress_cmd( - test_execution, test_interface, i, test_dict["egress"], duration=test_duration - ) - logging.info("Executing %s on node %s" % (exec_cmd, node)) - job_body = yaml.safe_load( - job_template.render(jobname=i + str(hash(node))[:5], nodename=node, cmd=exec_cmd) - ) - joblst.append(job_body["metadata"]["name"]) - api_response = kubecli.create_job(job_body) - if api_response is None: - raise Exception("Error creating job") - if test_execution == "serial": - logging.info("Waiting for serial job to finish") + logging.info("Executing network chaos with config \n %s" % yaml.dump(chaos_config)) + job_template = env.get_template("job.j2") + try: + for i in egress_lst: + for node in nodelst: + exec_cmd = get_egress_cmd( + test_execution, test_interface, i, test_dict["egress"], duration=test_duration + ) + logging.info("Executing %s on node %s" % (exec_cmd, node)) + job_body = yaml.safe_load( + job_template.render(jobname=i + str(hash(node))[:5], nodename=node, cmd=exec_cmd) + ) + joblst.append(job_body["metadata"]["name"]) + api_response = kubecli.create_job(job_body) + if api_response is None: + raise Exception("Error creating job") + if test_execution == "serial": + logging.info("Waiting for serial job to finish") + start_time = int(time.time()) + wait_for_job(joblst[:], kubecli, test_duration + 300) + logging.info("Waiting for wait_duration %s" % wait_duration) + time.sleep(wait_duration) + end_time = int(time.time()) + cerberus.publish_kraken_status(config, failed_post_scenarios, start_time, end_time) + if test_execution == "parallel": + break + if test_execution == "parallel": + logging.info("Waiting for parallel job to finish") start_time = int(time.time()) wait_for_job(joblst[:], kubecli, test_duration + 300) logging.info("Waiting for wait_duration %s" % wait_duration) time.sleep(wait_duration) end_time = int(time.time()) cerberus.publish_kraken_status(config, failed_post_scenarios, start_time, end_time) - if test_execution == "parallel": - break - if test_execution == "parallel": - logging.info("Waiting for parallel job to finish") - start_time = int(time.time()) - wait_for_job(joblst[:], kubecli, test_duration + 300) - logging.info("Waiting for wait_duration %s" % wait_duration) - time.sleep(wait_duration) - end_time = int(time.time()) - cerberus.publish_kraken_status(config, failed_post_scenarios, start_time, end_time) - except Exception as e: - logging.error("Network Chaos exiting due to Exception %s" % e) - sys.exit(1) - finally: - logging.info("Deleting jobs") - delete_job(joblst[:], kubecli) + except Exception as e: + logging.error("Network Chaos exiting due to Exception %s" % e) + raise RuntimeError() + finally: + logging.info("Deleting jobs") + delete_job(joblst[:], kubecli) + except (RuntimeError, Exception): + scenario_telemetry.exitStatus = 1 + failed_scenarios.append(net_config) + telemetry.log_exception(net_config) + else: + scenario_telemetry.exitStatus = 0 + return failed_scenarios, scenario_telemetries # krkn_lib_kubernetes @@ -110,7 +125,8 @@ def verify_interface(test_interface, nodelst, template, kubecli: krkn_lib_kubern for interface in test_interface: if interface not in interface_lst: logging.error("Interface %s not found in node %s interface list %s" % (interface, nodelst[pod_index], interface_lst)) - sys.exit(1) + #sys.exit(1) + raise RuntimeError() return test_interface finally: logging.info("Deleteing pod to query interface on node") @@ -158,7 +174,7 @@ def delete_job(joblst, kubecli: krkn_lib_kubernetes.KrknLibKubernetes): logging.error(pod_log) except Exception: logging.warning("Exception in getting job status") - api_response = kubecli.delete_job(name=jobname, namespace="default") + kubecli.delete_job(name=jobname, namespace="default") def get_egress_cmd(execution, test_interface, mod, vallst, duration=30):