diff --git a/avocado_vt/plugins/vt_bootstrap.py b/avocado_vt/plugins/vt_bootstrap.py index b985382bef..c349f81dca 100644 --- a/avocado_vt/plugins/vt_bootstrap.py +++ b/avocado_vt/plugins/vt_bootstrap.py @@ -142,6 +142,15 @@ def configure(self, parser): "generating the host configuration entry." ), ) + parser.add_argument( + "--vt-cluster-config", + action="store", + metavar="CLUSTER_CONFIG", + help=( + "The cluster config json file to be used when " + "generating the cluster hosts configuration entry." + ), + ) def run(self, config): try: diff --git a/avocado_vt/plugins/vt_cluster.py b/avocado_vt/plugins/vt_cluster.py new file mode 100644 index 0000000000..f859564082 --- /dev/null +++ b/avocado_vt/plugins/vt_cluster.py @@ -0,0 +1,53 @@ +import logging +import os +import sys + +from avocado.core import exit_codes +from avocado.core.plugin_interfaces import JobPostTests as Post +from avocado.core.plugin_interfaces import JobPreTests as Pre +from avocado.utils.stacktrace import log_exc_info + +from virttest.vt_cluster import cluster, node_metadata + + +class ClusterCreationError(Exception): + """ + Represents any error situation when attempting to create a cluster. + """ + + pass + + +class VTCluster(Pre, Post): + + name = "vt-cluster" + description = "Avocado-VT Cluster Pre/Post" + + def __init__(self, **kwargs): + self._log = logging.getLogger("avocado.app") + + def pre_tests(self, job): + if cluster.get_all_nodes(): + try: + for node in cluster.get_all_nodes(): + node.start_agent_server() + node_metadata.load_metadata() + + except Exception as detail: + msg = "Failure trying to set Avocado-VT job env: %s" % detail + self._log.error(msg) + log_exc_info(sys.exc_info(), self._log.name) + sys.exit(exit_codes.AVOCADO_JOB_FAIL | job.exitcode) + + def post_tests(self, job): + if cluster.get_all_nodes(): + cluster_dir = os.path.join(job.logdir, "cluster") + for node in cluster.get_all_nodes(): + try: + node_dir = os.path.join(cluster_dir, node.name) + os.makedirs(node_dir) + node.upload_agent_log(node_dir) + node.stop_agent_server() + except Exception: + pass + node_metadata.unload_metadata() diff --git a/avocado_vt/test.py b/avocado_vt/test.py index 71161f8f60..4d375c37a9 100644 --- a/avocado_vt/test.py +++ b/avocado_vt/test.py @@ -36,6 +36,7 @@ version, ) from virttest._wrappers import load_source +from virttest.vt_cluster import cluster, logger, selector # avocado-vt no longer needs autotest for the majority of its functionality, # except by: @@ -115,6 +116,10 @@ def __init__(self, **kwargs): utils_logfile.set_log_file_dir(self.logdir) self.__status = None self.__exc_info = None + self._cluster_partition = None + self._logger_server = logger.LoggerServer( + cluster.logger_server_host, cluster.logger_server_port, self.log + ) @property def params(self): @@ -262,6 +267,10 @@ def _runTest(self): try: try: + self._init_partition() + self._setup_partition() + self._logger_server.start() + self._start_logger_client() try: # Pre-process try: @@ -321,6 +330,14 @@ def _runTest(self): or params.get("env_cleanup", "no") == "yes" ): env.destroy() # Force-clean as it can't be stored + self._stop_logger_client() + self._logger_server.stop() + self._clear_partition() + if ( + self._safe_env_save(env) + or params.get("env_cleanup", "no") == "yes" + ): + env.destroy() # Force-clean as it can't be stored except Exception as e: if params.get("abort_on_error") != "yes": @@ -345,3 +362,55 @@ def _runTest(self): raise exceptions.JobError("Abort requested (%s)" % e) return test_passed + + def _init_partition(self): + self._cluster_partition = cluster.create_partition() + + def _setup_partition(self): + for node in self.params.objects("nodes"): + node_params = self.params.object_params(node) + node_selectors = node_params.get("node_selectors") + _node = selector.select_node(cluster.free_nodes, node_selectors) + if not _node: + raise selector.SelectorError( + f'No available host for node "{node}" with "{node_selectors}"' + ) + _node.tag = node + self._cluster_partition.add_node(_node) + + def _clear_partition(self): + cluster_dir = os.path.join(self.resultsdir, "cluster") + if self._cluster_partition.nodes: + for node in self._cluster_partition.nodes: + node_dir = os.path.join(cluster_dir, node.tag) + os.makedirs(node_dir) + node.upload_service_log(node_dir) + cluster.clear_partition(self._cluster_partition) + self._cluster_partition = None + + def _start_logger_client(self): + if self._cluster_partition.nodes: + for node in self._cluster_partition.nodes: + try: + node.proxy.api.start_logger_client( + cluster.logger_server_host, cluster.logger_server_port + ) + except ModuleNotFoundError: + pass + + def _stop_logger_client(self): + if self._cluster_partition.nodes: + for node in self._cluster_partition.nodes: + try: + node.proxy.api.stop_logger_client() + except ModuleNotFoundError: + pass + + @property + def nodes(self): + return self._cluster_partition.nodes + + def get_node(self, node_tag): + for node in self._cluster_partition.nodes: + if node_tag == node.tag: + return node diff --git a/examples/tests/vt_node_test.cfg b/examples/tests/vt_node_test.cfg new file mode 100644 index 0000000000..d71c642e0f --- /dev/null +++ b/examples/tests/vt_node_test.cfg @@ -0,0 +1,11 @@ +- vt_node_test: + type = vt_node_test + start_vm = no + not_preprocess = yes + nodes = node1 node2 node3 + node_selectors_node1 = [{"key": "cpu_vendor_id", "operator": "eq", "values": "AuthenticAMD"}, + node_selectors_node1 += {"key": "hostname", "operator": "contains", "values": "redhat.com"}] + node_selectors_node2 = [{"key": "cpu_vendor_id", "operator": "==", "values": "AuthenticAMD"}, + node_selectors_node2 += {"key": "hostname", "operator": "contains", "values": "redhat.com"}] + node_selectors_node3 = [{"key": "cpu_vendor_id", "operator": "==", "values": "AuthenticAMD"}, + node_selectors_node3 += {"key": "hostname", "operator": "contains", "values": "redhat.com"}] diff --git a/examples/tests/vt_node_test.py b/examples/tests/vt_node_test.py new file mode 100644 index 0000000000..93a0edbc18 --- /dev/null +++ b/examples/tests/vt_node_test.py @@ -0,0 +1,14 @@ +""" +Simple vt node handling test. + +Please put the configuration file vt_node_test.cfg into $tests/cfg/ directory. + +""" + + +def run(test, params, env): + for node in test.nodes: + test.log.info("========Start test on %s========", node.name) + node.proxy.unittest.hello.say() + node.proxy.unittest.testcase.vm.boot_up() + test.log.info("========End test========") diff --git a/setup.py b/setup.py index eb21f7869f..7737a6a0af 100644 --- a/setup.py +++ b/setup.py @@ -82,6 +82,7 @@ def run(self): ], "avocado.plugins.result_events": [ "vt-joblock = avocado_vt.plugins.vt_joblock:VTJobLock", + "vt-cluster = avocado_vt.plugins.vt_cluster:VTCluster", ], "avocado.plugins.init": [ "vt-init = avocado_vt.plugins.vt_init:VtInit", diff --git a/virttest/bootstrap.py b/virttest/bootstrap.py index 581434eca7..38a211103b 100644 --- a/virttest/bootstrap.py +++ b/virttest/bootstrap.py @@ -1,4 +1,5 @@ import glob +import json import logging import os import re @@ -10,6 +11,8 @@ from avocado.utils import path as utils_path from avocado.utils import process +from virttest.vt_cluster import cluster, node + from . import arch, asset, cartesian_config, data_dir, defaults, utils_selinux from .compat import get_opt @@ -875,6 +878,33 @@ def verify_selinux(datadir, imagesdir, isosdir, tmpdir, interactive, selinux=Fal LOG.info("Corrected contexts on %d files/dirs", len(changes)) +def _load_cluster_config(cluster_config): + """Load the cluster config""" + with open(cluster_config, "r") as config: + return json.load(config) + + +def _register_hosts(hosts_configs): + """Register the configs of the hosts into the cluster.""" + if hosts_configs: + for host, host_params in hosts_configs.items(): + _node = node.Node(host_params, host) + _node.setup_agent_env() + cluster.register_node(_node.name, _node) + LOG.debug("Host %s registered", host) + + +def _config_master_server(master_config): + """Configure the master server.""" + if master_config: + logger_server_host = master_config.get("logger_server_host") + if logger_server_host: + cluster.assign_logger_server_host(logger_server_host) + logger_server_port = master_config.get("logger_server_port") + if logger_server_port: + cluster.assign_logger_server_port(logger_server_port) + + def bootstrap(options, interactive=False): """ Common virt test assistant module. @@ -1042,6 +1072,18 @@ def bootstrap(options, interactive=False): else: LOG.debug("Module %s loaded", module) + # Setup the cluster environment. + vt_cluster_config = get_opt(options, "vt_cluster_config") + if vt_cluster_config: + LOG.info("") + step += 1 + LOG.info( + "%d - Setting up the cluster environment via %s", step, vt_cluster_config + ) + cluster_config = _load_cluster_config(vt_cluster_config) + _register_hosts(cluster_config.get("hosts")) + _config_master_server(cluster_config.get("master")) + LOG.info("") LOG.info("VT-BOOTSTRAP FINISHED") LOG.debug("You may take a look at the following online docs for more info:") diff --git a/virttest/vt_agent/__init__.py b/virttest/vt_agent/__init__.py new file mode 100644 index 0000000000..62d2175604 --- /dev/null +++ b/virttest/vt_agent/__init__.py @@ -0,0 +1,10 @@ +import os +import sys + +BASE_DIR = os.path.dirname(__file__) +LOG_DIR = os.path.join(BASE_DIR, "log") +AGENT_LOG_FILENAME = os.path.join(LOG_DIR, "agent.log") +SERVICE_LOG_FILENAME = os.path.join(LOG_DIR, "service.log") +LOG_FORMAT = "%(asctime)s %(name)s %(levelname)-5.5s| %(message)s" + +sys.path.append(BASE_DIR) diff --git a/virttest/vt_agent/__main__.py b/virttest/vt_agent/__main__.py new file mode 100644 index 0000000000..0df5c17666 --- /dev/null +++ b/virttest/vt_agent/__main__.py @@ -0,0 +1,40 @@ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat Inc. 2022 +# Authors: Yongxue Hong + +""" +Main entry point when called by 'python -m'. +""" + +import os +import shutil + +from . import LOG_DIR +from .app.args import init_arguments +from .app.cmd import run +from .core.logger import init_logger + +args = init_arguments() + +try: + shutil.rmtree(LOG_DIR) + os.remove(args.pid_file) +except (FileNotFoundError, OSError): + pass + +os.makedirs(LOG_DIR) + +root_logger = init_logger() + +if __name__ == "__main__": + run(args.host, args.port, args.pid_file) diff --git a/virttest/vt_agent/api.py b/virttest/vt_agent/api.py new file mode 100644 index 0000000000..1b5c776112 --- /dev/null +++ b/virttest/vt_agent/api.py @@ -0,0 +1,71 @@ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat Inc. 2022 +# Authors: Yongxue Hong + +import logging.handlers +import os +import signal + +from . import AGENT_LOG_FILENAME, LOG_FORMAT, SERVICE_LOG_FILENAME + +LOG = logging.getLogger("avocado.agent." + __name__) + + +def quit(): + """Quit the agent server.""" + pid = os.getpid() + LOG.info("Quit the server daemon(PID:%s).", pid) + os.kill(pid, signal.SIGKILL) + + +def is_alive(): + """Check whether the agent server is alive.""" + LOG.info("The server daemon is alive.") + return True + + +def start_logger_client(host, port): + """Start the agent logger client""" + try: + os.remove(SERVICE_LOG_FILENAME) + except FileNotFoundError: + pass + + logger = logging.getLogger("avocado.service") + logger.setLevel(logging.DEBUG) + file_handler = logging.FileHandler(filename=SERVICE_LOG_FILENAME) + file_handler.setFormatter(logging.Formatter(fmt=LOG_FORMAT)) + logger.addHandler(file_handler) + + LOG.info("Start the logger client.") + socket_handler = logging.handlers.SocketHandler(host, port) + socket_handler.setLevel(logging.DEBUG) + logger.addHandler(socket_handler) + + +def stop_logger_client(): + """Stop the agent logger client.""" + LOG.info("Stop the logger client.") + for handler in logging.getLogger("avocado.service").handlers: + handler.close() + logging.getLogger("avocado.service").handlers.clear() + + +def get_agent_log_filename(): + """Get the filename of the agent log.""" + return AGENT_LOG_FILENAME + + +def get_service_log_filename(): + """Get the filename of the service log.""" + return SERVICE_LOG_FILENAME diff --git a/virttest/vt_agent/app/__init__.py b/virttest/vt_agent/app/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/virttest/vt_agent/app/args.py b/virttest/vt_agent/app/args.py new file mode 100644 index 0000000000..2108d0eaea --- /dev/null +++ b/virttest/vt_agent/app/args.py @@ -0,0 +1,42 @@ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat Inc. 2022 +# Authors: Yongxue Hong + +import argparse + + +def init_arguments(): + """ + Initialize the arguments from the command line. + + :return: The populated namespace of arguments. + :rtype: argparse.Namespace + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--host", + action="store", + default="0.0.0.0", + nargs="?", + help='Specify alternate host [default: "0.0.0.0"]', + ) + parser.add_argument( + "--port", + action="store", + default=8000, + type=int, + nargs="?", + help="Specify alternate port [default: 8000]", + ) + parser.add_argument("--pid-file", required=True, help="Specify the file of pid.") + return parser.parse_args() diff --git a/virttest/vt_agent/app/cmd.py b/virttest/vt_agent/app/cmd.py new file mode 100644 index 0000000000..3e42a26128 --- /dev/null +++ b/virttest/vt_agent/app/cmd.py @@ -0,0 +1,57 @@ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat Inc. 2022 +# Authors: Yongxue Hong + +import logging.handlers +import os +import sys + +from .. import core + +LOG = logging.getLogger("avocado.agent." + __name__) + + +def run(host, port, pid_file): + """ + Run the agent server. + + :param host: The host of agent server. + :type host: str + :param port: The port of agent server to be listened. + :type port: int + :param pid_file: The PID file. + :type pid_file: str + """ + try: + LOG.info("Serving VT agent on %s:%s", host, port) + pid = str(os.getpid()) + LOG.info("Running the agent daemon with PID %s", pid) + services = core.service.load_services() + server = core.server.RPCServer((host, port)) + server.register_services(services) + LOG.info("Waiting for connecting.") + + with open(pid_file, "w+") as f: + f.write(pid + "\n") + server.serve_forever() + except KeyboardInterrupt: + LOG.warn("Keyboard interrupt received, exiting.") + sys.exit(0) + except Exception as e: + LOG.error(e, exc_info=True) + sys.exit(-1) + finally: + try: + os.remove(pid_file) + except OSError: + pass diff --git a/virttest/vt_agent/core/__init__.py b/virttest/vt_agent/core/__init__.py new file mode 100644 index 0000000000..f36e03009b --- /dev/null +++ b/virttest/vt_agent/core/__init__.py @@ -0,0 +1 @@ +from . import logger, server, service diff --git a/virttest/vt_agent/core/data_dir.py b/virttest/vt_agent/core/data_dir.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/virttest/vt_agent/core/logger.py b/virttest/vt_agent/core/logger.py new file mode 100644 index 0000000000..074d465c15 --- /dev/null +++ b/virttest/vt_agent/core/logger.py @@ -0,0 +1,40 @@ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat Inc. 2022 +# Authors: Yongxue Hong + +import logging.handlers + +from .. import AGENT_LOG_FILENAME, LOG_FORMAT + + +def init_logger(): + """ + Initialize the agent logger client. + + :return: The logger client obj. + :rtype: logging.Logger + """ + logger = logging.getLogger("avocado.agent") + logger.setLevel(logging.DEBUG) + + stream_handler = logging.StreamHandler() + stream_handler.setLevel(logging.DEBUG) + stream_handler.setFormatter(logging.Formatter(fmt=LOG_FORMAT)) + logger.addHandler(stream_handler) + + file_handler = logging.FileHandler(filename=AGENT_LOG_FILENAME) + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter(logging.Formatter(fmt=LOG_FORMAT)) + logger.addHandler(file_handler) + + return logger diff --git a/virttest/vt_agent/core/server.py b/virttest/vt_agent/core/server.py new file mode 100644 index 0000000000..cdccd5c2d1 --- /dev/null +++ b/virttest/vt_agent/core/server.py @@ -0,0 +1,103 @@ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat Inc. 2022 +# Authors: Yongxue Hong + +import inspect +import logging +import sys +import traceback +from xmlrpc.client import Fault, dumps, loads +from xmlrpc.server import SimpleXMLRPCServer + +from .. import api + +LOG = logging.getLogger("avocado.agent." + __name__) + + +class _CustomsSimpleXMLRPCServer(SimpleXMLRPCServer): + def _marshaled_dispatch(self, data, dispatch_method=None, path=None): + try: + params, method = loads(data, use_builtin_types=self.use_builtin_types) + + # generate response + if dispatch_method is not None: + response = dispatch_method(method, params) + else: + response = self._dispatch(method, params) + # wrap response in a singleton tuple + response = (response,) + response = dumps( + response, + methodresponse=1, + allow_none=self.allow_none, + encoding=self.encoding, + ) + except Fault as fault: + response = dumps(fault, allow_none=self.allow_none, encoding=self.encoding) + except: + # report exception back to server + exc_type, exc_value, exc_tb = sys.exc_info() + + tb_info = traceback.format_exception(exc_type, exc_value, exc_tb.tb_next) + tb_info = "".join([_ for _ in tb_info]) + try: + mod = exc_type.__dict__.get("__module__", "") + if mod: + _exc_type = ".".join((mod, exc_type.__name__)) + _exc_value = exc_value.__dict__ + else: + _exc_type = exc_type.__name__ + _exc_value = str(exc_value) + response = dumps( + Fault((_exc_type, _exc_value), tb_info), + encoding=self.encoding, + allow_none=self.allow_none, + ) + logging.error(tb_info) + finally: + pass + + return response.encode(self.encoding, "xmlcharrefreplace") + + +class RPCServer(object): + def __init__(self, addr=()): + self._server = _CustomsSimpleXMLRPCServer( + addr, allow_none=True, use_builtin_types=False + ) + self._load_server_api() + + def _load_server_api(self): + for m in inspect.getmembers(api): + if inspect.isfunction(m[1]): + name = ".".join( + (".".join(api.__dict__["__name__"].split(".")[1:]), m[0]) + ) + self._server.register_function(m[1], name) + + def register_services(self, services): + service_list = [] + for name, service in services: + service_list.append(name) + members = [_ for _ in inspect.getmembers(service)] + for member in members: + member_name = member[0] + member_obj = member[1] + if inspect.isfunction(member_obj): + function_name = ".".join((name, member_name)) + self._server.register_function(member_obj, function_name) + services = ", ".join([_ for _ in service_list]) + LOG.info("Services registered: %s" % services) + + def serve_forever(self): + self._server.serve_forever() diff --git a/virttest/vt_agent/core/service.py b/virttest/vt_agent/core/service.py new file mode 100644 index 0000000000..f97dd42092 --- /dev/null +++ b/virttest/vt_agent/core/service.py @@ -0,0 +1,72 @@ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat Inc. 2022 +# Authors: Yongxue Hong + +import logging +import os + +try: + import imp +except ModuleNotFoundError: + import importlib as imp + + +class ServiceError(Exception): + pass + + +LOG = logging.getLogger("avocado.agent." + __name__) + + +class _Services(object): + """The representation of the services.""" + + def __init__(self): + self._services = {} + + def register_service(self, name, service): + self._services[name] = service + + def get_service(self, name): + try: + return self._services[name] + except KeyError: + raise ServiceError("No support service '%s'." % name) + + def __iter__(self): + for name, service in self._services.items(): + yield name, service + + +def load_services(): + """Load all the services.""" + services = _Services() + basedir = os.path.dirname(os.path.dirname(__file__)) + service_dir = os.path.join(basedir, "services") + service_mods = [] + for root, dirs, files in os.walk(service_dir): + for file in files: + if file.endswith(".py") and not file.startswith("__"): + service_mods.append(os.path.join(root, file[:-3])) + + modules = [] + for service in service_mods: + f, p, d = imp.find_module(service) + modules.append(imp.load_module(service, f, p, d)) + f.close() + + for service in modules: + name = service.__dict__["__name__"] + name = ".".join(name.split(basedir + "/")[-1].split("/")[1:]) + services.register_service(name, service) + return services diff --git a/virttest/vt_agent/services/README b/virttest/vt_agent/services/README new file mode 100644 index 0000000000..b90254b42e --- /dev/null +++ b/virttest/vt_agent/services/README @@ -0,0 +1 @@ +The guideline of structure services: diff --git a/virttest/vt_agent/services/__init__.py b/virttest/vt_agent/services/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/virttest/vt_agent/services/unittest/__init__.py b/virttest/vt_agent/services/unittest/__init__.py new file mode 100644 index 0000000000..d1eabd0bce --- /dev/null +++ b/virttest/vt_agent/services/unittest/__init__.py @@ -0,0 +1 @@ +from . import cpu, hello, testcase diff --git a/virttest/vt_agent/services/unittest/cpu.py b/virttest/vt_agent/services/unittest/cpu.py new file mode 100644 index 0000000000..a3fadd9ada --- /dev/null +++ b/virttest/vt_agent/services/unittest/cpu.py @@ -0,0 +1,21 @@ +import logging +import socket + +from avocado.utils import process + +LOG = logging.getLogger("avocado.service." + __name__) + + +def __get_cpu_info(): + cmd = "lscpu | tee" + output = process.run(cmd, shell=True, ignore_status=True).stdout_text.splitlines() + cpu_info = dict(map(lambda x: [i.strip() for i in x.split(":", 1)], output)) + return cpu_info + + +def get_vendor_id(): + hostname = socket.gethostname() + info = __get_cpu_info() + vendor_id = info.get("Vendor ID") + LOG.info(f"The vendor id is {vendor_id} on the {hostname}") + return vendor_id diff --git a/virttest/vt_agent/services/unittest/hello.py b/virttest/vt_agent/services/unittest/hello.py new file mode 100644 index 0000000000..95c6293ef1 --- /dev/null +++ b/virttest/vt_agent/services/unittest/hello.py @@ -0,0 +1,9 @@ +import logging +import socket + +LOG = logging.getLogger("avocado.service." + __name__) + + +def say(): + hostname = socket.gethostname() + LOG.info(f'Say "Hello", from the {hostname}') diff --git a/virttest/vt_agent/services/unittest/testcase/__init__.py b/virttest/vt_agent/services/unittest/testcase/__init__.py new file mode 100644 index 0000000000..52f1841610 --- /dev/null +++ b/virttest/vt_agent/services/unittest/testcase/__init__.py @@ -0,0 +1 @@ +from . import vm diff --git a/virttest/vt_agent/services/unittest/testcase/vm.py b/virttest/vt_agent/services/unittest/testcase/vm.py new file mode 100644 index 0000000000..198a5c1b84 --- /dev/null +++ b/virttest/vt_agent/services/unittest/testcase/vm.py @@ -0,0 +1,9 @@ +import logging +import socket + +LOG = logging.getLogger("avocado.service." + __name__) + + +def boot_up(): + hostname = socket.gethostname() + LOG.info(f"Boot up a guest on the {hostname}") diff --git a/virttest/vt_cluster/__init__.py b/virttest/vt_cluster/__init__.py new file mode 100644 index 0000000000..3bd707bd9a --- /dev/null +++ b/virttest/vt_cluster/__init__.py @@ -0,0 +1,189 @@ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat Inc. 2022 +# Authors: Yongxue Hong + +""" +Module for providing the interface of cluster for virt test. +""" + +import os +import pickle + +from virttest import data_dir + + +class ClusterError(Exception): + """The generic cluster error.""" + + pass + + +class _Partition(object): + """The representation of the partition of the cluster.""" + + def __init__(self): + self._nodes = set() + + def add_node(self, node): + """ + Add the node into the partition. + + :param node: The node to be added. + :type node: vt_cluster.node.Node + """ + self._nodes.add(node) + + def del_node(self, node): + """ + delete the node from the partition. + + :param node: The node to be deleted. + :type node: vt_cluster.node.Node + """ + self._nodes.remove(node) + + @property + def nodes(self): + return self._nodes + + +class _Cluster(object): + """The representation of the cluster.""" + + def __init__(self): + self._filename = os.path.join(data_dir.get_base_backend_dir(), "cluster_env") + if os.path.isfile(self._filename): + data = self._data() + self._logger_server_host = data.get("_logger_server_host") + self._logger_server_port = data.get("_logger_server_port") + self._partitions = data.get("_partitions") + self._nodes = data.get("_nodes") + else: + self._logger_server_host = "localhost" + self._logger_server_port = 9999 + self._partitions = [] + self._nodes = {} + + def _save(self): + _data = {"data": self.__dict__} + with open(self._filename, "wb") as f: + pickle.dump(_data, f) + + def _data(self): + with open(self._filename, "rb") as f: + return pickle.load(f).get("data", {}) + + def register_node(self, name, node): + """ + Register the node into the cluster. + + :param name: the node name + :type name: str + :param node: the node object + :type node: vt_node.Node + """ + self._nodes[name] = node + self._save() + + def unregister_node(self, name): + """ + Unregister the node from the cluster. + + :param name: the node name + """ + del self._nodes[name] + self._save() + + def get_node(self, name): + """ + Get the node from the cluster. + + :param name: the node name + :type name: str + :return: the node object + :rtype: vt_node.Node + """ + return self._nodes.get(name) + + def get_all_nodes(self): + """ + Get the all nodes. + + :return: the list of all nodes + :rtype: list + """ + return [_ for _ in self._nodes.values()] + + def assign_logger_server_host(self, host="localhost"): + """ + Assign the host for the master logger server. + + :param host: The host of server. + :type host: str + """ + self._logger_server_host = host + self._save() + + @property + def logger_server_host(self): + return self._logger_server_host + + def assign_logger_server_port(self, port=9999): + """ + Assign the port for the master logger server. + + :param port: The port of server. + :type port: int + """ + self._logger_server_port = port + self._save() + + @property + def logger_server_port(self): + return self._logger_server_port + + @property + def metadata_file(self): + return os.path.join(data_dir.get_base_backend_dir(), "cluster_metadata.json") + + def create_partition(self): + """ + Create a partition for the cluster. + + :return: The partition obj + :rtype: _Partition + """ + partition = _Partition() + self._partitions.append(partition) + self._save() + return partition + + def clear_partition(self, partition): + """ + Clear a partition from the cluster. + + :param partition: The partition to be cleared + :type partition: _Partition + """ + self._partitions.remove(partition) + self._save() + + @property + def free_nodes(self): + nodes = set(self.get_all_nodes()[:]) + for partition in self._partitions: + nodes = nodes - partition.nodes + return list(nodes) + + +cluster = _Cluster() diff --git a/virttest/vt_cluster/logger.py b/virttest/vt_cluster/logger.py new file mode 100644 index 0000000000..03250da5ff --- /dev/null +++ b/virttest/vt_cluster/logger.py @@ -0,0 +1,143 @@ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat Inc. 2022 +# Authors: Yongxue Hong + +import logging +import pickle +import select +import socketserver +import struct +import threading + +from . import ClusterError, cluster + +_logger = logging.getLogger("") + + +class LoggerServerError(ClusterError): + """Generic LoggerServerError.""" + + pass + + +class _LoggerStreamHandler(socketserver.StreamRequestHandler): + """ + Handler for a streaming logging request. + + This basically logs the record using whatever logging policy is + configured locally. + """ + + def __init__(self, request, client_address, server): + super(_LoggerStreamHandler, self).__init__(request, client_address, server) + self.server = server + + def handle(self): + """ + Handle multiple requests - each expected to be a 4-byte length, + followed by the LogRecord in pickle format. Logs the record + according to whatever policy is configured locally. + """ + while True: + chunk = self.connection.recv(4) + if len(chunk) < 4: + break + slen = struct.unpack(">L", chunk)[0] + chunk = self.connection.recv(slen) + while len(chunk) < slen: + chunk = chunk + self.connection.recv(slen - len(chunk)) + obj = self.unpickle(chunk) + record = logging.makeLogRecord(obj) + self.handle_logger(record) + + def unpickle(self, data): + _data = pickle.loads(data) + self.server.last_output_lines = _data.get("msg") + return _data + + def handle_logger(self, record): + client = "Unknown" + for node in cluster.get_all_nodes(): + if self.client_address[0] == node.address: + client = node.tag + break + format_str = "{client}({address}) {asctime} {module} {levelname} | {msg}" + _logger.info( + format_str.format( + asctime=record.asctime, + module=record.name, + levelname=record.levelname, + msg=record.msg, + client=client, + address=self.client_address[0], + ) + ) + + +class _Server(socketserver.ThreadingTCPServer): + allow_reuse_address = True + + def __init__(self, host, port, handler=_LoggerStreamHandler): + socketserver.ThreadingTCPServer.__init__(self, (host, port), handler) + self.abort = False + self.timeout = 1 + self.last_output_lines = "" + + def run_server_forever(self): + _logger.info("Run the logger server.") + abort = False + while not abort: + rd, wr, ex = select.select([self.socket.fileno()], [], [], self.timeout) + if rd: + self.handle_request() + abort = self.abort + + +class LoggerServer(object): + """ + Handler for receiving the log content from the agent node. + + """ + + def __init__(self, host, port, logger=None): + self._host = host + self._port = port + self._server = _Server(host, port) + self._thread = None + global _logger + _logger = logger + _logger.setLevel(logging.DEBUG) + + @property + def host(self): + return self._host + + @property + def port(self): + return self._port + + def start(self): + """Start the logger server""" + self._thread = threading.Thread( + target=self._server.run_server_forever, name="logger_server", args=() + ) + self._thread.daemon = True + self._thread.start() + + def stop(self): + """Stop the logger server""" + self._server.abort = True + + @property + def last_output_lines(self): + return self._server.last_output_lines diff --git a/virttest/vt_cluster/node.py b/virttest/vt_cluster/node.py new file mode 100644 index 0000000000..06d6c8b939 --- /dev/null +++ b/virttest/vt_cluster/node.py @@ -0,0 +1,360 @@ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat Inc. 2022 +# Authors: Yongxue Hong + +""" +Module for providing the interface of node for virt test. +""" + +import inspect +import logging +import os + +import avocado +from aexpect import remote +from aexpect.client import RemoteSession + +import virttest +from virttest import utils_misc + +from . import ClusterError, proxy + +LOG = logging.getLogger("avocado." + __name__) +AGENT_MOD = "vt_agent" +AGENT_MOD_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), AGENT_MOD) + + +class NodeError(ClusterError): + """Generic Node Error.""" + + pass + + +def _remote_login(client, host, port, username, password, prompt, auto_close, timeout): + cmd = ( + "ssh -o UserKnownHostsFile=/dev/null " + "-o StrictHostKeyChecking=no -p %s" % port + ) + cmd += " -o PreferredAuthentications=password" + cmd += " %s@%s" % (username, host) + + session = RemoteSession( + cmd, + linesep="\n", + prompt=prompt, + status_test_command="echo $?", + client=client, + host=host, + port=port, + username=username, + password=password, + auto_close=auto_close, + ) + try: + remote.handle_prompts(session, username, password, prompt, timeout) + except Exception as e: + session.close() + raise NodeError(e) + return session + + +class Node(object): + """ + Node representation. + + """ + + def __init__(self, params, name): + self._params = params + self._name = name + self._host = self.address + _uri = "http://%s:%s/" % (self._host, self.proxy_port) + self._uri = None if self.master_node else _uri + self._agent_server_daemon_pid = None + self._is_remote_node = not self.master_node + self._server_daemon_pid_file = None + self._logger_server = None + self._session_daemon = None + + @property + def name(self): + return self._name + + @property + def hostname(self): + return self._params.get("hostname") + + @property + def address(self): + return self._params.get("address") + + @property + def password(self): + return self._params.get("password") + + @property + def username(self): + return self._params.get("username", "root") + + @property + def proxy_port(self): + return self._params.get("proxy_port", "8000") + + @property + def shell_port(self): + return self._params.get("shell_port", "22") + + @property + def shell_prompt(self): + return self._params.get("shell_prompt", "^\[.*\][\#\$]\s*$") + + @property + def proxy(self): + return proxy.get_server_proxy(self._uri) + + @property + def master_node(self): + return self._params.get("master_node", "no") == "yes" + + @property + def agent_server_name(self): + if self._is_remote_node: + return "server-%s" % self.name + + @property + def agent_server_dir(self): + if self._is_remote_node: + return "/var/run/agent-server/%s" % self.agent_server_name + + @property + def agent_server_daemon_pid(self): + if self._is_remote_node: + return self._agent_server_daemon_pid + + @property + def logger_server(self): + if self._is_remote_node: + return self._logger_server + + def __eq__(self, other): + if not isinstance(other, Node): + return False + return self.address == other.address + + def __ne__(self, other): + return not self.__eq__(other) + + def __hash__(self): + return hash(self._name) + + def _scp_from_remote(self, source, dest, timeout=600): + remote.scp_to_remote( + self._host, + self.shell_port, + self.username, + self.password, + source, + dest, + timeout=timeout, + ) + + def _setup_agent_server_components(self): + self._scp_from_remote(AGENT_MOD_PATH, self.agent_server_dir) + + def _setup_agent_server_pkgs(self): + dest_path = os.path.join(self.agent_server_dir, AGENT_MOD) + for pkg in (avocado, virttest): + pkg_path = os.path.dirname(inspect.getfile(pkg)) + self._scp_from_remote(pkg_path, dest_path) + + session = self.create_session() + # workaround to fix the issue + # "pkg_resources.DistributionNotFound" when import the module + # avocado. + target_file = os.path.join(dest_path, "avocado", "__init__.py") + session.cmd("sed -i 's/initialize_plugins()//g' %s" % target_file) + + # workaround to install aexpect module + target_file = os.path.join(dest_path, "aexpect") + session.cmd_output("rm -rf %s" % target_file, timeout=300) + cmd = ( + "git clone https://github.com/avocado-framework/aexpect.git %s" + % target_file + ) + session.cmd(cmd, timeout=300) + session.cmd("cd %s && python setup.py develop" % target_file) + session.close() + + def setup_agent_env(self): + """Setup the agent environment of node""" + if self._is_remote_node: + self.cleanup_agent_env() + session = self.create_session() + session.cmd("mkdir -p %s" % self.agent_server_dir) + session.close() + self._setup_agent_server_components() + self._setup_agent_server_pkgs() + + def cleanup_agent_env(self): + """Cleanup the agent environment of node""" + if self._is_remote_node: + agent_session = self.create_session() + if self.agent_server_dir: + agent_session.cmd_output("rm -rf %s" % self.agent_server_dir) + agent_session.close() + + def _start_server_daemon(self, name, host, port, pidfile): + LOG.info("Starting the server daemon on %s", self.name) + self._server_daemon_pid_file = pidfile + self._session_daemon = self.create_session(auto_close=False) + pythonpath = os.path.join(self.agent_server_dir, AGENT_MOD) + self._session_daemon.cmd("export PYTHONPATH=%s" % pythonpath) + self._daemon_cmd = "cd %s &&" % self.agent_server_dir + self._daemon_cmd += " python3 -m %s" % name + self._daemon_cmd += " --host=%s" % host + self._daemon_cmd += " --port=%s" % port + self._daemon_cmd += " --pid-file=%s" % pidfile + LOG.info("Sending command line: %s", self._daemon_cmd) + self._session_daemon.sendline(self._daemon_cmd) + + end_str = "Waiting for connecting." + timeout = 3 + if not utils_misc.wait_for( + lambda: ( + end_str in self._session_daemon.get_output() and self.is_server_alive() + ), + timeout, + ): + err_info = self._session_daemon.get_output() + LOG.error( + "Failed to start the server daemon on %s.\n" "The output:\n%s", + self.name, + err_info, + ) + return False + LOG.info("Start the server daemon successfully on %s.", self.name) + return True + + def start_agent_server(self): + """Start the agent server on the node""" + if self._is_remote_node: + pidfile = os.path.join( + self.agent_server_dir, "agent_server_%s.pid" % self.name + ) + if not self._start_server_daemon( + name=AGENT_MOD, host=self.address, port=self.proxy_port, pidfile=pidfile + ): + raise NodeError("Failed to start agent node daemon on %s" % self.name) + + def stop_agent_server(self): + """Stop the agent server on the node""" + if self._is_remote_node: + if self.is_server_alive(): + try: + self.proxy.api.quit() + except Exception: + pass + + if self._session_daemon: + try: + self._session_daemon.close() + except Exception: + pass + + def upload_agent_log(self, target_path): + """ + Upload the agent server log to the master node. + + :param target_path: The path of target. + :type target_path: str + """ + if self._is_remote_node: + remote_path = self.proxy.api.get_agent_log_filename() + remote.scp_from_remote( + self._host, + self.shell_port, + self.username, + self.password, + remote_path=remote_path, + local_path=target_path, + ) + + def upload_service_log(self, target_path): + """ + Upload the agent service log to the master node. + + :param target_path: The path of target. + :type target_path: str + """ + if self._is_remote_node: + remote_path = self.proxy.api.get_service_log_filename() + remote.scp_from_remote( + self._host, + self.shell_port, + self.username, + self.password, + remote_path=remote_path, + local_path=target_path, + ) + + def create_session(self, auto_close=True, timeout=300): + """Create a session of the node.""" + session = _remote_login( + "ssh", + self._host, + self.shell_port, + self.username, + self.password, + self.shell_prompt, + auto_close, + timeout, + ) + return session + + def get_server_pid(self): + """ + Get the PID of the server. + + :return: The PID of the server. + :type: str + """ + if self._server_daemon_pid_file: + _session = self.create_session() + cmd_open = "cat %s" % self._server_daemon_pid_file + try: + pid = _session.cmd_output(cmd_open).strip() + if pid: + self._agent_server_daemon_pid = pid + return pid + except Exception as e: + raise NodeError(e) + finally: + _session.close() + return None + + def is_server_alive(self): + """ + Check whether the server is alive. + + :return: True if the server is alive otherwise False. + :rtype: bool + """ + if not self.get_server_pid(): + return False + + try: + if not self.proxy.api.is_alive(): + return False + except Exception as e: + raise NodeError(e) + return True diff --git a/virttest/vt_cluster/node_metadata.py b/virttest/vt_cluster/node_metadata.py new file mode 100644 index 0000000000..f9f7d720ae --- /dev/null +++ b/virttest/vt_cluster/node_metadata.py @@ -0,0 +1,65 @@ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat Inc. 2022 +# Authors: Yongxue Hong + +""" +Module for providing the interface of cluster for virt test. +""" + +import json +import logging +import os + +from . import cluster + +LOG = logging.getLogger("avocado." + __name__) + + +def dump_metadata_file(nodes_metadata): + """Dump the metadata into the file.""" + with open(cluster.metadata_file, "w") as metadata_file: + json.dump(nodes_metadata, metadata_file) + + +def load_metadata_file(): + """Load the metadata from the file.""" + try: + with open(cluster.metadata_file, "r") as metadata_file: + return json.load(metadata_file) + except Exception: + return {} + + +def load_metadata(): + """Load the metadata of the nodes.""" + if os.path.exists(cluster.metadata_file): + os.remove(cluster.metadata_file) + + _meta = {} + for node in cluster.get_all_nodes(): + _meta[node.name] = {} + _meta[node.name]["hostname"] = node.hostname + _meta[node.name]["address"] = node.address + + # just an example for getting the metadata + _meta[node.name]["cpu_vendor_id"] = node.proxy.unittest.cpu.get_vendor_id() + + dump_metadata_file(_meta) + + +def unload_metadata(): + """Unload the metadata of the nodes""" + try: + os.remove(cluster.metadata_file) + except OSError: + pass diff --git a/virttest/vt_cluster/proxy.py b/virttest/vt_cluster/proxy.py new file mode 100644 index 0000000000..932924c630 --- /dev/null +++ b/virttest/vt_cluster/proxy.py @@ -0,0 +1,94 @@ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat Inc. 2022 +# Authors: Yongxue Hong + +""" +This module provides VT Proxy interfaces. +""" + +import importlib +from xmlrpc import client + +from ..vt_agent import core as vt_agent_core +from . import ClusterError + + +class ServerProxyError(ClusterError): + """Generic Server Proxy Error.""" + + pass + + +def _importer(name, root_package=False, relative_globals=None, level=0): + return __import__( + name, + locals={}, + globals=relative_globals, + fromlist=[] if root_package else [None], + level=level, + ) + + +class _ClientMethod: + def __init__(self, send, name): + self.__send = send + self.__name = name + + def __getattr__(self, name): + return _ClientMethod(self.__send, "%s.%s" % (self.__name, name)) + + def __call__(self, *args): + root_mod = None + exc_type = None + try: + return self.__send(self.__name, args) + except client.Fault as e: + if "." in e.faultCode[0]: + root_mod = ".".join(e.faultCode[0].split(".")[:-1]) + exc_type = e.faultCode[0].split(".")[-1] + kargs = e.faultCode[1] + if isinstance(kargs, dict): + raise getattr(_importer(root_mod), exc_type)(**kargs) + elif isinstance(kargs, str): + raise eval(e.faultCode[0])(kargs) + else: + raise ServerProxyError + + +class _ClientProxy(client.ServerProxy): + def __init__(self, uri): + super(_ClientProxy, self).__init__(uri, allow_none=True, use_builtin_types=True) + + def __getattr__(self, name): + return _ClientMethod(self._ServerProxy__request, name) + + +class _LocalProxy(object): + def __init__(self): + self._services = vt_agent_core.service.load_services() + + def __getattr__(self, name): + return importlib.import_module("virttest.vt_agent.services.%s" % name) + + +def get_server_proxy(uri=None): + """ + Get the server proxy. + + :param uri: The URI of the server proxy. + e.g: + :type uri: str + :return: The proxy obj. + :rtype: _ClientProxy or _LocalProxy + """ + return _ClientProxy(uri) if uri else _LocalProxy() diff --git a/virttest/vt_cluster/selector.py b/virttest/vt_cluster/selector.py new file mode 100644 index 0000000000..0f2d404d23 --- /dev/null +++ b/virttest/vt_cluster/selector.py @@ -0,0 +1,166 @@ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat Inc. 2022 +# Authors: Yongxue Hong + +""" +Module for providing the interface of cluster for virt test. +""" + +import ast +import logging +import operator + +from . import ClusterError, cluster, node_metadata + +LOG = logging.getLogger("avocado." + __name__) + + +class SelectorError(ClusterError): + """Generic Selector Error.""" + + pass + + +class OperatorError(ClusterError): + """Generic Operator Error.""" + + pass + + +class _MatchExpression(object): + def __init__(self, key, op, values): + self._key = key + self._operator = op + self._values = values + + def __str__(self): + return " ".join((self.key, self.operator, self.values)) + + @property + def key(self): + return self._key + + @property + def operator(self): + return self._operator + + @property + def values(self): + return self._values + + +class _Operator(object): + @classmethod + def operate(cls, name, left, right=None): + operators_mapping = { + "<": cls._lt, + "lt": cls._lt, + ">": cls._gt, + "gt": cls._gt, + "==": cls._eq, + "eq": cls._eq, + "contains": cls._contains, + "not contains": cls._not_contains, + } + try: + if right: + return operators_mapping[name](left, right) + return operators_mapping[name](left) + except KeyError: + raise OperatorError("No support operator '%s'" % name) + + @staticmethod + def _lt(left, right): + return operator.lt(left, right) + + @staticmethod + def _gt(left, right): + return operator.gt(left, right) + + @staticmethod + def _eq(left, right): + return operator.eq(left, right) + + @staticmethod + def _contains(left, right): + return operator.contains(left, right) + + @staticmethod + def _not_contains(left, right): + return not operator.contains(left, right) + + +class _Selector(object): + """ + Handler for selecting the corresponding node from the cluster + according to the node selectors. + Node selector is the simplest recommended form of node selection constraint. + You can add the node selector field to your node specification you want the + target node to have. + + """ + + def __init__(self, node_selectors): + self._node_selectors = ast.literal_eval(node_selectors) + self._match_expressions = [] + for node_selector in self._node_selectors: + self._match_expressions.append( + _MatchExpression( + node_selector.get("key"), + node_selector.get("operator"), + node_selector.get("values"), + ) + ) + + self._metadata = node_metadata.load_metadata_file() + + def match_node(self, free_nodes): + """ + Match the corresponding node with the node metadata and node selectors. + + :return: The node obj + :rtype: vt_cluster.node.Node + """ + if free_nodes is None: + return None + for node_name, meta in self._metadata.items(): + node = cluster.get_node(node_name) + if node not in free_nodes: + continue + for match_expression in self._match_expressions: + key = match_expression.key + op = match_expression.operator + values = match_expression.values + if key not in meta: + raise SelectorError("No support metadata '%s'" % key) + if not _Operator.operate(op, meta[key], values): + break + else: + return node + return None + + +def select_node(candidates, selectors=None): + """ + Select the node according to the node selectors. + + :param candidates: The list of candidates for selecting. + :type candidates: list + :param selectors: The selectors of node. + :type selectors: str + :rtype: vt_cluster.node.Node + """ + if selectors: + selector = _Selector(selectors) + return selector.match_node(candidates) + return candidates.pop() if candidates else None