From 99f674baf08416b75338c7a1778df9502426c4f0 Mon Sep 17 00:00:00 2001 From: Zhenchao Liu Date: Fri, 8 Mar 2024 15:53:07 +0800 Subject: [PATCH] Add storage and image management support Added pool_selectors and image_pool_name Signed-off-by: Zhenchao Liu --- avocado_vt/plugins/vt_cluster.py | 14 +- avocado_vt/test.py | 15 + virttest/bootstrap.py | 6 + virttest/env_process.py | 103 +++- virttest/vt_agent/managers/__init__.py | 4 + virttest/vt_agent/managers/image.py | 26 + virttest/vt_agent/managers/images/__init__.py | 16 + .../vt_agent/managers/images/qemu/__init__.py | 1 + .../images/qemu/qemu_image_handlers.py | 495 ++++++++++++++++++ .../vt_agent/managers/resource_backing.py | 120 +++++ .../managers/resource_backings/__init__.py | 29 + .../managers/resource_backings/backing.py | 63 +++ .../resource_backings/cvm/__init__.py | 27 + .../resource_backings/cvm/_sev_resmgr.py | 98 ++++ .../resource_backings/pool_connection.py | 32 ++ .../resource_backings/storage/__init__.py | 4 + .../resource_backings/storage/dir/__init__.py | 2 + .../storage/dir/dir_backing.py | 76 +++ .../storage/dir/dir_pool_connection.py | 32 ++ .../resource_backings/storage/nfs/__init__.py | 2 + .../storage/nfs/nfs_backing.py | 75 +++ .../storage/nfs/nfs_pool_connection.py | 58 ++ virttest/vt_agent/services/image.py | 15 + virttest/vt_agent/services/resource.py | 115 ++++ virttest/vt_cluster/__init__.py | 23 + virttest/vt_cluster/selector.py | 72 +++ virttest/vt_imgr/__init__.py | 1 + virttest/vt_imgr/images/__init__.py | 12 + virttest/vt_imgr/images/image.py | 95 ++++ virttest/vt_imgr/images/qemu/__init__.py | 1 + virttest/vt_imgr/images/qemu/qemu_image.py | 367 +++++++++++++ .../images/qemu/qemu_virt_image/__init__.py | 16 + .../qemu_virt_image/luks_qemu_virt_image.py | 31 ++ .../qemu_virt_image/qcow2_qemu_virt_image.py | 40 ++ .../qemu/qemu_virt_image/qemu_virt_image.py | 58 ++ .../qemu_virt_image/raw_qemu_virt_image.py | 17 + virttest/vt_imgr/images/virt_image.py | 96 ++++ virttest/vt_imgr/vt_imgr.py | 240 +++++++++ virttest/vt_resmgr/__init__.py | 1 + virttest/vt_resmgr/resources/__init__.py | 19 + virttest/vt_resmgr/resources/cvm/__init__.py | 1 + virttest/vt_resmgr/resources/pool.py | 178 +++++++ virttest/vt_resmgr/resources/resource.py | 135 +++++ .../vt_resmgr/resources/storage/__init__.py | 14 + .../resources/storage/ceph/__init__.py | 1 + .../resources/storage/dir/__init__.py | 1 + .../resources/storage/dir/dir_pool.py | 75 +++ .../resources/storage/dir/dir_resources.py | 109 ++++ .../storage/iscsi_direct/__init__.py | 1 + .../storage/iscsi_direct/iscsi_direct_pool.py | 20 + .../resources/storage/nbd/__init__.py | 1 + .../resources/storage/nfs/__init__.py | 1 + .../resources/storage/nfs/nfs_pool.py | 67 +++ .../resources/storage/nfs/nfs_resources.py | 113 ++++ .../vt_resmgr/resources/storage/volume.py | 81 +++ virttest/vt_resmgr/vt_resmgr.py | 438 ++++++++++++++++ virttest/vt_utils/image/qemu.py | 177 +++++++ 57 files changed, 3896 insertions(+), 34 deletions(-) create mode 100644 virttest/vt_agent/managers/image.py create mode 100644 virttest/vt_agent/managers/images/__init__.py create mode 100644 virttest/vt_agent/managers/images/qemu/__init__.py create mode 100644 virttest/vt_agent/managers/images/qemu/qemu_image_handlers.py create mode 100644 virttest/vt_agent/managers/resource_backing.py create mode 100644 virttest/vt_agent/managers/resource_backings/__init__.py create mode 100644 virttest/vt_agent/managers/resource_backings/backing.py create mode 100644 virttest/vt_agent/managers/resource_backings/cvm/__init__.py create mode 100644 virttest/vt_agent/managers/resource_backings/cvm/_sev_resmgr.py create mode 100644 virttest/vt_agent/managers/resource_backings/pool_connection.py create mode 100644 virttest/vt_agent/managers/resource_backings/storage/__init__.py create mode 100644 virttest/vt_agent/managers/resource_backings/storage/dir/__init__.py create mode 100644 virttest/vt_agent/managers/resource_backings/storage/dir/dir_backing.py create mode 100644 virttest/vt_agent/managers/resource_backings/storage/dir/dir_pool_connection.py create mode 100644 virttest/vt_agent/managers/resource_backings/storage/nfs/__init__.py create mode 100644 virttest/vt_agent/managers/resource_backings/storage/nfs/nfs_backing.py create mode 100644 virttest/vt_agent/managers/resource_backings/storage/nfs/nfs_pool_connection.py create mode 100644 virttest/vt_agent/services/image.py create mode 100644 virttest/vt_agent/services/resource.py create mode 100644 virttest/vt_imgr/__init__.py create mode 100644 virttest/vt_imgr/images/__init__.py create mode 100644 virttest/vt_imgr/images/image.py create mode 100644 virttest/vt_imgr/images/qemu/__init__.py create mode 100644 virttest/vt_imgr/images/qemu/qemu_image.py create mode 100644 virttest/vt_imgr/images/qemu/qemu_virt_image/__init__.py create mode 100644 virttest/vt_imgr/images/qemu/qemu_virt_image/luks_qemu_virt_image.py create mode 100644 virttest/vt_imgr/images/qemu/qemu_virt_image/qcow2_qemu_virt_image.py create mode 100644 virttest/vt_imgr/images/qemu/qemu_virt_image/qemu_virt_image.py create mode 100644 virttest/vt_imgr/images/qemu/qemu_virt_image/raw_qemu_virt_image.py create mode 100644 virttest/vt_imgr/images/virt_image.py create mode 100644 virttest/vt_imgr/vt_imgr.py create mode 100644 virttest/vt_resmgr/__init__.py create mode 100644 virttest/vt_resmgr/resources/__init__.py create mode 100644 virttest/vt_resmgr/resources/cvm/__init__.py create mode 100644 virttest/vt_resmgr/resources/pool.py create mode 100644 virttest/vt_resmgr/resources/resource.py create mode 100644 virttest/vt_resmgr/resources/storage/__init__.py create mode 100644 virttest/vt_resmgr/resources/storage/ceph/__init__.py create mode 100644 virttest/vt_resmgr/resources/storage/dir/__init__.py create mode 100644 virttest/vt_resmgr/resources/storage/dir/dir_pool.py create mode 100644 virttest/vt_resmgr/resources/storage/dir/dir_resources.py create mode 100644 virttest/vt_resmgr/resources/storage/iscsi_direct/__init__.py create mode 100644 virttest/vt_resmgr/resources/storage/iscsi_direct/iscsi_direct_pool.py create mode 100644 virttest/vt_resmgr/resources/storage/nbd/__init__.py create mode 100644 virttest/vt_resmgr/resources/storage/nfs/__init__.py create mode 100644 virttest/vt_resmgr/resources/storage/nfs/nfs_pool.py create mode 100644 virttest/vt_resmgr/resources/storage/nfs/nfs_resources.py create mode 100644 virttest/vt_resmgr/resources/storage/volume.py create mode 100644 virttest/vt_resmgr/vt_resmgr.py create mode 100644 virttest/vt_utils/image/qemu.py diff --git a/avocado_vt/plugins/vt_cluster.py b/avocado_vt/plugins/vt_cluster.py index 399e82eb4b..1c77daa80d 100644 --- a/avocado_vt/plugins/vt_cluster.py +++ b/avocado_vt/plugins/vt_cluster.py @@ -7,6 +7,8 @@ from avocado.core.plugin_interfaces import JobPreTests as Pre from avocado.utils.stacktrace import log_exc_info from virttest.vt_cluster import cluster, node_metadata +from virttest.vt_imgr import vt_imgr +from virttest.vt_resmgr import resmgr class ClusterSetupError(Exception): @@ -50,10 +52,8 @@ def _pre_node_setup(): def _pre_mgr_setup(): try: # Pre-setup the cluster manager - # e.g: - # startup_resmgr() - # vt_imgr.startup() - pass + resmgr.startup() + vt_imgr.startup() except Exception as err: raise ClusterManagerSetupError(err) @@ -61,10 +61,8 @@ def _pre_mgr_setup(): def _post_mgr_cleanup(): try: # Post-cleanup the cluster manager - # e.g: - # teardown_resmgr() - # vt_imgr.teardown() - pass + vt_imgr.teardown() + resmgr.teardown() except Exception as err: raise ClusterManagerCleanupError(err) diff --git a/avocado_vt/test.py b/avocado_vt/test.py index 921140a468..4f0024cb45 100644 --- a/avocado_vt/test.py +++ b/avocado_vt/test.py @@ -37,6 +37,7 @@ ) from virttest._wrappers import load_source from virttest.vt_cluster import cluster, logger, selector +from virttest.vt_resmgr import resmgr # avocado-vt no longer needs autotest for the majority of its functionality, # except by: @@ -378,7 +379,21 @@ def _setup_partition(self): _node.tag = node self._cluster_partition.add_node(_node) + # Select the pools when the user specifies the pools param + for pool_tag in self.params.objects("pools"): + pool_params = self.params.object_params(pool_tag) + pool_selectors = pool_params.get("pool_selectors") + + pools = set(resmgr.get_all_pools()) - set(cluster.partition.pools.values()) + pool_id = selector.select_resource_pool(list(pools), pool_selectors) + if not pool_id: + raise selector.SelectorError( + f"No pool selected for {pool_tag} with {pool_selectors}" + ) + self._cluster_partition.pools[pool_tag] = pool_id + def _clear_partition(self): + self._cluster_partition.pools.clear() cluster_dir = os.path.join(self.resultsdir, "cluster") if self._cluster_partition.nodes: for node in self._cluster_partition.nodes: diff --git a/virttest/bootstrap.py b/virttest/bootstrap.py index b192f0d233..c13b4a8610 100644 --- a/virttest/bootstrap.py +++ b/virttest/bootstrap.py @@ -12,6 +12,7 @@ from avocado.utils import process from virttest.vt_cluster import cluster, node +from virttest.vt_resmgr import resmgr from . import arch, asset, cartesian_config, data_dir, defaults, utils_selinux from .compat import get_opt @@ -895,6 +896,10 @@ def _register_hosts(hosts_configs): LOG.debug("Host %s registered", host) +def _initialize_managers(pools_params): + resmgr.initialize(pools_params) + + def _config_master_server(master_config): """Configure the master server.""" if master_config: @@ -1084,6 +1089,7 @@ def bootstrap(options, interactive=False): cluster_config = _load_cluster_config(vt_cluster_config) _register_hosts(cluster_config.get("hosts")) _config_master_server(cluster_config.get("master")) + _initialize_managers(cluster_config.get("pools")) LOG.info("") LOG.info("VT-BOOTSTRAP FINISHED") diff --git a/virttest/env_process.py b/virttest/env_process.py index 748fb6e0c3..0534ec91b3 100644 --- a/virttest/env_process.py +++ b/virttest/env_process.py @@ -62,6 +62,8 @@ from virttest.test_setup.storage import StorageConfig from virttest.test_setup.vms import UnrequestedVMHandler from virttest.utils_version import VersionInterval +from virttest.vt_imgr import vt_imgr + utils_libvirtd = lazy_import("virttest.utils_libvirtd") virsh = lazy_import("virttest.virsh") @@ -124,7 +126,7 @@ def _get_qemu_version(qemu_cmd): return "Unknown" -def preprocess_image(test, params, image_name, vm_process_status=None): +def preprocess_image(test, params, image_name, vm_process_status=None, vm_name=None): """ Preprocess a single QEMU image according to the instructions in params. @@ -132,34 +134,62 @@ def preprocess_image(test, params, image_name, vm_process_status=None): :param params: A dict containing image preprocessing parameters. :param vm_process_status: This is needed in postprocess_image. Add it here only for keep it work with process_images() + :param vm_name: vm tag defined in 'vms' :note: Currently this function just creates an image if requested. """ + # FIXME: + image_id = None + if params.get_boolean("multihost"): + params = params.copy() + params[f"image_owner_{image_name}"] = vm_name + image_config = vt_imgr.define_image_config(image_name, params) + image_id = vt_imgr.create_image_object(image_config) + + params = params.object_params(image_name) base_dir = params.get("images_base_dir", data_dir.get_data_dir()) if not storage.preprocess_image_backend(base_dir, params, image_name): LOG.error("Backend can't be prepared correctly.") - image_filename = storage.get_image_filename(params, base_dir) + image_filename = None + if not params.get_boolean("multihost"): + image_filename = storage.get_image_filename(params, base_dir) create_image = False if params.get("force_create_image") == "yes": create_image = True - elif params.get("create_image") == "yes" and not storage.file_exists( - params, image_filename - ): - create_image = True + elif params.get("create_image") == "yes": + # FIXME: check all volumes allocated + if params.get_boolean("multihost"): + volume = vt_imgr.get_image_info( + image_id, request=f"spec.virt-images.{image_name}.spec.volume.meta" + ) + create_image = True if not volume["meta"]["allocated"] else False + else: + create_image = ( + True if not storage.file_exists(params, image_filename) else False + ) + else: + # FIXME: sync all volumes configurations + if params.get_boolean("multihost"): + vt_imgr.get_image_info(image_id) + # TODO: check if file allocated if params.get("backup_image_before_testing", "no") == "yes": + # FIXME: add backup_image image = qemu_storage.QemuImg(params, base_dir, image_name) image.backup_image(params, base_dir, "backup", True, True) if create_image: - if storage.file_exists(params, image_filename): - # As rbd image can not be covered, so need remove it if we need - # force create a new image. - storage.file_remove(params, image_filename) - image = qemu_storage.QemuImg(params, base_dir, image_name) - LOG.info("Create image on %s." % image.storage_type) - image.create(params) + if params.get_boolean("multihost"): + vt_imgr.handle_image(image_id, {"create": {}}) + else: + if storage.file_exists(params, image_filename): + # As rbd image can not be covered, so need remove it if we need + # force create a new image. + storage.file_remove(params, image_filename) + image = qemu_storage.QemuImg(params, base_dir, image_name) + LOG.info("Create image on %s." % image.storage_type) + image.create(params) def preprocess_fs_source(test, params, fs_name, vm_process_status=None): @@ -522,7 +552,7 @@ def check_image(test, params, image_name, vm_process_status=None): raise e -def postprocess_image(test, params, image_name, vm_process_status=None): +def postprocess_image(test, params, image_name, vm_process_status=None, vm_name=None): """ Postprocess a single QEMU image according to the instructions in params. @@ -539,6 +569,16 @@ def postprocess_image(test, params, image_name, vm_process_status=None): ) return + # FIXME: multihost + image_id = None + if params.get_boolean("multihost"): + image_id = vt_imgr.query_image(image_name, vm_name) + if image_id is None: + LOG.warning(f"Cannot find the image {image_name}") + image_config = vt_imgr.define_image_config(image_name, params) + image_id = vt_imgr.create_image_object(image_config) + params = params.object_params(image_name) + restored, removed = (False, False) clone_master = params.get("clone_master", None) base_dir = params.get("images_base_dir", data_dir.get_data_dir()) @@ -596,10 +636,18 @@ def postprocess_image(test, params, image_name, vm_process_status=None): ) LOG.info("Remove image on %s." % image.storage_type) if clone_master is None: - image.remove() + if params.get_boolean("multihost"): + vt_imgr.handle_image(image_id, {"destroy": {}}) + vt_imgr.destroy_image_object(image_id) + else: + image.remove() elif clone_master == "yes": if image_name in params.get("master_images_clone").split(): - image.remove() + if params.get_boolean("multihost"): + vt_imgr.handle_image(image_id, {"destroy": {}}) + vt_imgr.destroy_image_object(image_id) + else: + image.remove() def postprocess_fs_source(test, params, fs_name, vm_process_status=None): @@ -754,7 +802,7 @@ class _CreateImages(threading.Thread): in self.exc_info """ - def __init__(self, image_func, test, images, params, exit_event, vm_process_status): + def __init__(self, image_func, test, images, params, exit_event, vm_process_status, vm_name=None): threading.Thread.__init__(self) self.image_func = image_func self.test = test @@ -763,6 +811,7 @@ def __init__(self, image_func, test, images, params, exit_event, vm_process_stat self.exit_event = exit_event self.exc_info = None self.vm_process_status = vm_process_status + self.vm_name = vm_name def run(self): try: @@ -773,13 +822,14 @@ def run(self): self.params, self.exit_event, self.vm_process_status, + self.vm_name, ) except Exception: self.exc_info = sys.exc_info() self.exit_event.set() -def process_images(image_func, test, params, vm_process_status=None): +def process_images(image_func, test, params, vm_process_status=None, vm_name=None): """ Wrapper which chooses the best way to process images. @@ -792,11 +842,11 @@ def process_images(image_func, test, params, vm_process_status=None): images = params.objects("images") if len(images) > 20: # Lets do it in parallel _process_images_parallel( - image_func, test, params, vm_process_status=vm_process_status + image_func, test, params, vm_process_status=vm_process_status,vm_name=vm_name ) else: _process_images_serial( - image_func, test, images, params, vm_process_status=vm_process_status + image_func, test, images, params, vm_process_status=vm_process_status, vm_name=vm_name ) @@ -816,7 +866,7 @@ def process_fs_sources(fs_source_func, test, params, vm_process_status=None): def _process_images_serial( - image_func, test, images, params, exit_event=None, vm_process_status=None + image_func, test, images, params, exit_event=None, vm_process_status=None, vm_name=None ): """ Original process_image function, which allows custom set of images @@ -829,14 +879,15 @@ def _process_images_serial( or None for no vm exist. """ for image_name in images: - image_params = params.object_params(image_name) - image_func(test, image_params, image_name, vm_process_status) + # image_params = params.object_params(image_name) + # image_func(test, image_params, image_name, vm_process_status) + image_func(test, params, image_name, vm_process_status, vm_name) if exit_event and exit_event.is_set(): LOG.error("Received exit_event, stop processing of images.") break -def _process_images_parallel(image_func, test, params, vm_process_status=None): +def _process_images_parallel(image_func, test, params, vm_process_status=None, vm_name=None): """ The same as _process_images but in parallel. :param image_func: Process function @@ -852,7 +903,7 @@ def _process_images_parallel(image_func, test, params, vm_process_status=None): for i in xrange(no_threads): imgs = images[i::no_threads] threads.append( - _CreateImages(image_func, test, imgs, params, exit_event, vm_process_status) + _CreateImages(image_func, test, imgs, params, exit_event, vm_process_status, vm_name) ) threads[-1].start() @@ -907,7 +958,7 @@ def _call_image_func(): unpause_vm = True vm_params["skip_cluster_leak_warn"] = "yes" try: - process_images(image_func, test, vm_params, vm_process_status) + process_images(image_func, test, vm_params, vm_process_status, vm_name) finally: if unpause_vm: vm.resume() diff --git a/virttest/vt_agent/managers/__init__.py b/virttest/vt_agent/managers/__init__.py index 4e973d4c42..8fa6cd208b 100644 --- a/virttest/vt_agent/managers/__init__.py +++ b/virttest/vt_agent/managers/__init__.py @@ -1,5 +1,9 @@ from .connect import ConnectManager from .console import ConsoleManager +from .image import ImageHandlerManager +from .resource_backing import ResourceBackingManager connect_mgr = ConnectManager() console_mgr = ConsoleManager() +resbacking_mgr = ResourceBackingManager() +image_handler_mgr = ImageHandlerManager() diff --git a/virttest/vt_agent/managers/image.py b/virttest/vt_agent/managers/image.py new file mode 100644 index 0000000000..b13db2cabb --- /dev/null +++ b/virttest/vt_agent/managers/image.py @@ -0,0 +1,26 @@ +import logging + +from .images import get_image_handler + + +LOG = logging.getLogger("avocado.agents." + __name__) + + +class ImageHandlerManager(object): + + def __init__(self): + pass + + def handle_image(self, image_config, config): + r, o = 0, dict() + try: + cmd, arguments = config.popitem() + image_type = image_config["meta"]["type"] + handler = get_image_handler(image_type, cmd) + ret = handler(image_config, arguments) + if ret: + o["out"] = ret + except Exception as e: + r, o["out"] = 1, str(e) + LOG.debug("Failed to handle image(%s): %s", str(e)) + return r, o diff --git a/virttest/vt_agent/managers/images/__init__.py b/virttest/vt_agent/managers/images/__init__.py new file mode 100644 index 0000000000..708b13576d --- /dev/null +++ b/virttest/vt_agent/managers/images/__init__.py @@ -0,0 +1,16 @@ +from .qemu import get_qemu_image_handler +# from .xen import get_xen_image_handler + + +_image_handler_getters = { + "qemu": get_qemu_image_handler, + # "xen": get_xen_image_handler, +} + + +def get_image_handler(image_type, cmd): + getter = _image_handler_getters.get(image_type) + return getter(cmd) + + +__all__ = ["get_image_handler"] diff --git a/virttest/vt_agent/managers/images/qemu/__init__.py b/virttest/vt_agent/managers/images/qemu/__init__.py new file mode 100644 index 0000000000..3e42b78b16 --- /dev/null +++ b/virttest/vt_agent/managers/images/qemu/__init__.py @@ -0,0 +1 @@ +from .qemu_image_handlers import get_qemu_image_handler diff --git a/virttest/vt_agent/managers/images/qemu/qemu_image_handlers.py b/virttest/vt_agent/managers/images/qemu/qemu_image_handlers.py new file mode 100644 index 0000000000..2feeb6289d --- /dev/null +++ b/virttest/vt_agent/managers/images/qemu/qemu_image_handlers.py @@ -0,0 +1,495 @@ +import collections +import json +import logging +import os +import re +import string + +from avocado.utils import process +from avocado.utils import path as utils_path + +from virttest.utils_numeric import normalize_data_size +from virttest.vt_utils.image.qemu import get_image_opts + + +LOG = logging.getLogger("avocado.service." + __name__) + + +class _ParameterAssembler(string.Formatter): + """ + Command line parameter assembler. + + This will automatically prepend parameter if corresponding value is passed + to the format string. + """ + + sentinal = object() + + def __init__(self, cmd_params=None): + string.Formatter.__init__(self) + self.cmd_params = cmd_params or {} + + def format(self, format_string, *args, **kwargs): + """Remove redundant whitespaces and return format string.""" + ret = string.Formatter.format(self, format_string, *args, **kwargs) + return re.sub(" +", " ", ret) + + def get_value(self, key, args, kwargs): + try: + val = string.Formatter.get_value(self, key, args, kwargs) + except KeyError: + if key in self.cmd_params: + val = None + else: + raise + return (self.cmd_params.get(key, self.sentinal), val) + + def convert_field(self, value, conversion): + """ + Do conversion on the resulting object. + + supported conversions: + 'b': keep the parameter only if bool(value) is True. + 'v': keep both the parameter and its corresponding value, + the default mode. + """ + if value[0] is self.sentinal: + return string.Formatter.convert_field(self, value[1], conversion) + if conversion is None: + conversion = "v" + if conversion == "v": + return "" if value[1] is None else " ".join(value) + if conversion == "b": + return value[0] if bool(value[1]) else "" + raise ValueError("Unknown conversion specifier {}".format(conversion)) + + +QEMU_IMG_BINARY = utils_path.find_command("qemu-img") +qemu_img_parameters = { + "image_format": "-f", + "backing_file": "-b", + "backing_format": "-F", + "unsafe": "-u", + "quiet": "-q", + "options": "-o", + "secret_object": "", + "tls_creds_object": "", + "image_opts": "--image-opts", + "check_repair": "-r", + "output_format": "--output", + "force_share": "-U", + "resize_preallocation": "--preallocation", + "resize_shrink": "--shrink", + "convert_compressed": "-c", + "cache_mode": "-t", + "source_cache_mode": "-T", + "target_image_format": "-O", + "convert_sparse_size": "-S", + "rate_limit": "-r", + "convert_target_is_zero": "--target-is-zero", + "convert_backing_file": "-B", + "commit_drop": "-d", + "compare_strict_mode": "-s", + "compare_second_image_format": "-F", + "backing_chain": "--backing-chain", +} +cmd_formatter = _ParameterAssembler(qemu_img_parameters) + + +def get_qemu_img_object_repr(sec_opts, obj_type="secret"): + mapping = { + "secret": "--object secret,id={name}", + "cookie": "--object secret,id={name}", + "tls-creds-x509": "--object tls-creds-x509,id={name},endpoint=client,dir={dir}", + } + + obj_str = mapping.get(obj_type) + if obj_str is None: + raise ValueError(f"Unknown object type {obj_type}") + + if "format" in sec_opts: + obj_str += ",format={format}" + if "file" in sec_opts: + obj_str += ",file={file}" + elif obj_type != "tls-creds-x509": + obj_str += ",data={data}" + + return obj_str.format(**sec_opts) + + +def get_qemu_virt_image_json_repr(virt_image_opts): + """Generate image json representation.""" + return "'json:%s'" % json.dumps(virt_image_opts) + + +def get_qemu_virt_image_opts_repr(virt_image_opts): + """Generate image-opts.""" + + def _dict_to_dot(dct): + """Convert dictionary to dot representation.""" + flat = [] + prefix = [] + stack = [dct.items()] + while stack: + it = stack[-1] + try: + key, value = next(it) + except StopIteration: + if prefix: + prefix.pop() + stack.pop() + continue + if isinstance(value, collections.Mapping): + prefix.append(key) + stack.append(value.items()) + else: + flat.append((".".join(prefix + [key]), value)) + return flat + + return ",".join( + ["%s=%s" % (attr, value) for attr, value in _dict_to_dot(virt_image_opts)] + ) + + +def parse_qemu_img_options(virt_image_spec): + options = [ + "preallocation", + "cluster_size", + "lazy_refcounts", + "compat", + "extent_size_hint", + "compression_type", + ] + opts = {k: v for k in options if k in virt_image_spec and virt_image_spec[k]} + + # TODO: data_file, backing_file + return opts + + +def get_qemu_virt_image_repr(virt_image_config, output=None): + virt_image_spec = virt_image_config["spec"] + + mapping = { + "uri": lambda i: virt_image_spec["volume"]["spec"]["uri"], + "json": get_qemu_virt_image_json_repr, + "opts": get_qemu_virt_image_opts_repr, + } + + auth_opts, sec_opts, img_opts = get_image_opts(virt_image_config) + func = mapping.get(output) + if func is None: + func = mapping["json"] if auth_opts or sec_opts else mapping["uri"] + virt_image_repr = func(img_opts) + + objs = [] + if auth_opts: + objs.append(get_qemu_img_object_repr(auth_opts)) + if sec_opts: + objs.append(get_qemu_img_object_repr(sec_opts)) + objs_repr = " ".join(objs) + + opts_repr = "" + options = parse_qemu_img_options(virt_image_spec) + if auth_opts: + # FIXME: cookie-secret + if "file" in auth_opts: + options["password-secret"] = auth_opts["name"] + elif "dir" in auth_opts: + options["tls-creds"] = auth_opts["name"] + else: + options["key-secret"] = auth_opts["name"] + + if sec_opts: + virt_image_format = virt_image_spec["format"] + if virt_image_format == "luks": + key = "password-secret" if "file" in sec_opts else "key-secret" + elif virt_image_format == "qcow2": + key = "encrypt.key-secret" + options.update({f"encrypt.{k}": v for k, v in sec_opts.items()}) + else: + raise ValueError( + f"Encryption of a {virt_image_format} image is not supported" + ) + options[key] = sec_opts["name"] + opts_repr = ",".join([f"{k}={v}" for k, v in options.items()]) + + return objs_repr, opts_repr, virt_image_repr + + +def _create(image_config, arguments): + def _dd(image_tag): + qemu_img_cmd = "" + virt_image_config = image_spec["virt-images"][image_tag] + volume_config = virt_image_config["spec"]["volume"] + + if virt_image_config["spec"]["format"] == "raw": + count = utils_numeric.normalize_data_size( + int(volume_config["spec"]["size"]), order_magnitude="M" + ) + qemu_img_cmd = "dd if=/dev/zero of=%s count=%s bs=1M" % ( + volume_config["spec"]["path"], + count, + block_size, + ) + else: + raise + + def _qemu_img_create(virt_image_tag): + qemu_img_cmd = "" + virt_image_config = image_spec["virt-images"][virt_image_tag] + virt_image_spec = virt_image_config["spec"] + volume_config = virt_image_config["spec"]["volume"] + + # Prepare the secret data storage file + encryption = virt_image_spec.get("encryption", {}) + if encryption.get("storage") == "file": + # FIXME: + os.makedirs(os.path.dirname(encryption["file"]), exist_ok=True) + with open(encryption["file"], "w") as fd: + fd.write(encryption["data"]) + + cmd_dict = { + "image_format": virt_image_spec["format"], + "image_size": int(volume_config["spec"]["size"]), + } + + secret_objects = list() + base_tag = virt_image_spec.get("backing") + if base_tag is not None: + base_virt_image_config = image_spec["virt-images"][base_tag] + objs_repr, _, cmd_dict["backing_file"] = get_qemu_virt_image_repr( + base_virt_image_config, image_repr_format + ) + if objs_repr: + secret_objects.append(objs_repr) + cmd_dict["backing_format"] = base_virt_image_config["spec"]["format"] + + # Add all backings' secret and access auth objects + for tag in list(image_meta["topology"].values())[0]: + if tag == base_tag: + break + config = image_spec["virt-images"][tag] + objs_repr, _, _ = get_qemu_virt_image_repr(config) + if objs_repr: + secret_objects.append(objs_repr) + + objs_repr, options_repr, cmd_dict["image_filename"] = get_qemu_virt_image_repr( + virt_image_config, "uri" + ) + if objs_repr: + secret_objects.append(objs_repr) + if options_repr: + cmd_dict["options"] = options_repr + + cmd_dict["secret_object"] = " ".join(secret_objects) + + qemu_img_cmd = ( + qemu_image_binary + " " + cmd_formatter.format(create_cmd, **cmd_dict) + ) + + LOG.info(f"Create image with command: {qemu_img_cmd}") + process.run(qemu_img_cmd, shell=True, verbose=False, ignore_status=False) + + create_cmd = ( + "create {secret_object} {image_format} " + "{backing_file} {backing_format} {unsafe!b} {options} " + "{image_filename} {image_size}" + ) + + qemu_image_binary = arguments.pop("qemu_img_binary", QEMU_IMG_BINARY) + image_repr_format = arguments.pop("repr", None) + image_meta = image_config["meta"] + image_spec = image_config["spec"] + + tag = arguments.pop("target", None) + virt_images = [tag] if tag else list(image_meta["topology"].values())[0] + for tag in virt_images: + _qemu_img_create(tag) + + +def _snapshot(image_config, arguments): + pass + + +def _rebase(image_config, arguments): + qemu_image_binary = arguments.pop("qemu_img_binary", QEMU_IMG_BINARY) + image_repr_format = arguments.pop("repr", None) + backing = arguments.pop("source") + backing_config = image_config["spec"]["virt-images"][backing] + target = arguments.pop("target") + target_config = image_config["spec"]["virt-images"][target] + + rebase_cmd = ( + "rebase {secret_object} {image_format} {cache_mode} " + "{source_cache_mode} {unsafe!b} {options} " + "{backing_file} {backing_format} {image_filename}" + ) + + cmd_dict = { + "image_format": target_config["spec"]["format"], + "cache_mode": arguments.pop("cache_mode", None), + "source_cache_mode": arguments.pop("source_cache_mode", None), + "unsafe": arguments.pop("unsafe", False), + "backing_format": backing_config["spec"]["format"], + } + + secret_objects = list() + obj_repr, options_repr, cmd_dict["image_filename"] = get_qemu_virt_image_repr(target_config, image_repr_format) + if obj_repr or image_repr_format in ["opts", "json"]: + secret_objects.append(obj_repr) + cmd_dict.pop("image_format") + if options_repr: + cmd_dict["options"] = options_repr + + obj_repr, _, cmd_dict["backing_file"] = get_qemu_virt_image_repr(backing_config, None) + if obj_repr: + secret_objects.append(obj_repr) + + # Add all backings' secret and access auth objects + for tag in list(image_config["meta"]["topology"].values())[0]: + if tag == backing: + break + config = image_config["spec"]["virt-images"][tag] + objs_repr, _, _ = get_qemu_virt_image_repr(config) + if objs_repr: + secret_objects.append(objs_repr) + + cmd_dict["secret_object"] = " ".join(secret_objects) + + qemu_img_cmd = ( + qemu_image_binary + + " " + + cmd_formatter.format(rebase_cmd, **cmd_dict) + ) + + LOG.info(f"Rebase {target} onto {backing} by command: {qemu_img_cmd}") + process.run( + qemu_img_cmd, shell=True, verbose=False, ignore_status=False + ) + + +def _commit(image_config, arguments): + pass + + +def _check(image_config, arguments): + check_cmd = ( + "check {secret_object} {quiet!b} {image_format} " + "{check_repair} {force_share!b} {output_format} " + "{source_cache_mode} {image_opts} {image_filename}" + ) + + qemu_image_binary = arguments.pop("qemu_img_binary", QEMU_IMG_BINARY) + image_repr_format = arguments.pop("repr", None) + target = arguments.pop("target", image_config["meta"]["name"]) + target_config = image_config["spec"]["virt-images"][target] + + cmd_dict = { + "quiet": arguments.pop("quiet", False), + "image_format": target_config["spec"]["format"], + "check_repair": arguments.pop("repair", None), + "force_share": arguments.pop("force", False), + "output_format": arguments.pop("output", "human"), + "source_cache_mode": arguments.pop("source_cache_mode", None), + } + + secret_objects = list() + obj_repr, _, cmd_dict["image_filename"] = get_qemu_virt_image_repr(target_config, image_repr_format) + if obj_repr: + secret_objects.append(obj_repr) + + image_list = list(image_config["meta"]["topology"].values())[0] + if target in image_list: + # Add all backings' secret and access auth objects + for tag in image_list: + if tag == target: + break + config = image_config["spec"]["virt-images"][tag] + objs_repr, _, _ = get_qemu_virt_image_repr(config) + if objs_repr: + secret_objects.append(objs_repr) + cmd_dict["secret_object"] = " ".join(secret_objects) + + if obj_repr or image_repr_format in ["opts", "json"]: + cmd_dict.pop("image_format") + if image_repr_format == "opts": + cmd_dict["image_opts"] = cmd_dict.pop("image_filename") + + qemu_img_cmd = ( + qemu_image_binary + + " " + + cmd_formatter.format(check_cmd, **cmd_dict) + ) + + LOG.info(f"Check {target} with command: {qemu_img_cmd}") + cmd_result = process.run( + qemu_img_cmd, shell=True, verbose=True, ignore_status=False + ) + return cmd_result.stdout_text + + +def _info(image_config, arguments): + info_cmd = ( + "info {secret_object} {image_format} {backing_chain!b} " + "{force_share!b} {output_format} {image_opts} {image_filename}" + ) + + qemu_image_binary = arguments.pop("qemu_img_binary", QEMU_IMG_BINARY) + image_repr_format = arguments.pop("repr", None) + target = arguments.pop("target", image_config["meta"]["name"]) + target_config = image_config["spec"]["virt-images"][target] + + cmd_dict = { + "image_format": target_config["spec"]["format"], + "backing_chain": arguments.pop("backing_chain", False), + "force_share": arguments.pop("force", False), + "output_format": arguments.pop("output", "human"), + } + + secret_objects = list() + obj_repr, _, cmd_dict["image_filename"] = get_qemu_virt_image_repr(target_config, image_repr_format) + if obj_repr: + secret_objects.append(obj_repr) + + image_list = list(image_config["meta"]["topology"].values())[0] + if target in image_list: + # Add all backings' secret and access auth objects + for tag in image_list: + if tag == target: + break + config = image_config["spec"]["virt-images"][tag] + objs_repr, _, _ = get_qemu_virt_image_repr(config) + if objs_repr: + secret_objects.append(objs_repr) + cmd_dict["secret_object"] = " ".join(secret_objects) + + if obj_repr or image_repr_format in ["opts", "json"]: + cmd_dict.pop("image_format") + + if image_repr_format == "opts": + cmd_dict["image_opts"] = cmd_dict.pop("image_filename") + + qemu_img_cmd = ( + qemu_image_binary + + " " + + cmd_formatter.format(info_cmd, **cmd_dict) + ) + + LOG.info(f"Query info for {target} with command: {qemu_img_cmd}") + cmd_result = process.run( + qemu_img_cmd, shell=True, verbose=True, ignore_status=False + ) + return cmd_result.stdout_text + + +_qemu_image_handlers = { + "create": _create, + "rebase": _rebase, + "snapshot": _snapshot, + "commit": _commit, + "check": _check, + "info": _info, +} + +def get_qemu_image_handler(cmd): + return _qemu_image_handlers.get(cmd) diff --git a/virttest/vt_agent/managers/resource_backing.py b/virttest/vt_agent/managers/resource_backing.py new file mode 100644 index 0000000000..825aa383ea --- /dev/null +++ b/virttest/vt_agent/managers/resource_backing.py @@ -0,0 +1,120 @@ +import logging +import os +import pickle + +from .resource_backings import ( + get_resource_backing_class, + get_pool_connection_class, +) + +from vt_agent.core import data_dir + + +LOG = logging.getLogger("avocado.agents." + __name__) + + +class ResourceBackingManager(object): + + def __init__(self): + if os.path.isfile(data_dir.BACKING_MGR_ENV_FILENAME): + self._pool_connections = self._load() + else: + self._pool_connections = dict() + self._backings = dict() + + def _load(self): + with open(data_dir.BACKING_MGR_ENV_FILENAME, "rb") as f: + return pickle.load(f) + + def _dump(self): + with open(data_dir.BACKING_MGR_ENV_FILENAME, "wb") as f: + pickle.dump(self._pool_connections, f) + + def create_pool_connection(self, pool_id, pool_config): + r, o = 0, dict() + try: + pool_type = pool_config["meta"]["type"] + pool_conn_class = get_pool_connection_class(pool_type) + pool_conn = pool_conn_class(pool_config) + pool_conn.startup() + self._pool_connections[pool_id] = pool_conn + except Exception as e: + r, o["out"] = 1, str(e) + LOG.debug(f"Failed to connect to pool({pool_id}): {str(e)}") + + if r == 0: + self._dump() + + return r, o + + def destroy_pool_connection(self, pool_id): + r, o = 0, dict() + try: + pool_conn = self._pool_connections.pop(pool_id) + pool_conn.shutdown() + except Exception as e: + r, o["out"] = 1, str(e) + LOG.debug(f"Failed to disconnect pool({pool_id}): {str(e)}") + + if r == 0: + self._dump() + + return r, o + + def create_backing_object(self, backing_config): + r, o = 0, dict() + try: + pool_id = backing_config["meta"]["pool"]["meta"]["uuid"] + pool_conn = self._pool_connections[pool_id] + pool_type = pool_conn.get_pool_type() + res_type = backing_config["meta"]["type"] + backing_class = get_resource_backing_class(pool_type, res_type) + backing = backing_class(backing_config) + backing.create(pool_conn) + self._backings[backing.backing_id] = backing + o["out"] = backing.backing_id + except Exception as e: + r, o["out"] = 1, str(e) + LOG.debug( + "Failed to create backing object for resource %s: %s", + backing_config["meta"]["uuid"], + str(e), + ) + return r, o + + def destroy_backing_object(self, backing_id): + r, o = 0, dict() + try: + backing = self._backings.pop(backing_id) + pool_conn = self._pool_connections[backing.source_pool_id] + backing.destroy(pool_conn) + except Exception as e: + r, o["out"] = 1, str(e) + LOG.debug(f"Failed to destroy backing object({backing_id}): {str(e)}") + return r, o + + def update_backing(self, backing_id, new_config): + r, o = 0, dict() + try: + backing = self._backings[backing_id] + pool_conn = self._pool_connections[backing.source_pool_id] + cmd, arguments = new_config.popitem() + handler = backing.get_update_handler(cmd) + ret = handler(pool_conn, arguments) + if ret: + o["out"] = ret + except Exception as e: + r, o["out"] = 1, str(e) + LOG.debug(f"Failed to update resource by backing ({backing_id}): {str(e)}") + return r, o + + def get_backing_info(self, backing_id): + r, o = 0, dict() + try: + backing = self._backings[backing_id] + pool_conn = self._pool_connections[backing.source_pool_id] + o["out"] = backing.get_resource_info(pool_conn) + except Exception as e: + r, o["out"] = 1, str(e) + LOG.debug(f"Failed to info resource by backing ({backing_id}): {str(e)}") + return r, o diff --git a/virttest/vt_agent/managers/resource_backings/__init__.py b/virttest/vt_agent/managers/resource_backings/__init__.py new file mode 100644 index 0000000000..cc60fe6c34 --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/__init__.py @@ -0,0 +1,29 @@ +from .storage import ( + _DirPoolConnection, + _DirVolumeBacking, + _NfsPoolConnection, + _NfsVolumeBacking, +) + + +_pool_conn_classes = dict() +_pool_conn_classes[_DirPoolConnection.get_pool_type()] = _DirPoolConnection +_pool_conn_classes[_NfsPoolConnection.get_pool_type()] = _NfsPoolConnection + +_backing_classes = dict() +_backing_classes[_DirVolumeBacking.get_pool_type()] = { + _DirVolumeBacking.get_resource_type(): _DirVolumeBacking, + _NfsVolumeBacking.get_resource_type(): _NfsVolumeBacking, +} + + +def get_resource_backing_class(pool_type, resource_type): + backing_classes = _backing_classes.get(pool_type, {}) + return backing_classes.get(resource_type) + + +def get_pool_connection_class(pool_type): + return _pool_conn_classes.get(pool_type) + + +__all__ = ["get_pool_connection_class", "get_resource_backing_class"] diff --git a/virttest/vt_agent/managers/resource_backings/backing.py b/virttest/vt_agent/managers/resource_backings/backing.py new file mode 100644 index 0000000000..ce2e62fe3c --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/backing.py @@ -0,0 +1,63 @@ +import uuid +from abc import ABC, abstractmethod + + +class _ResourceBacking(ABC): + _BINDING_RESOURCE_TYPE = None + _SOURCE_POOL_TYPE = None + + def __init__(self, backing_config): + self._uuid = uuid.uuid4().hex + self._source_pool_id = backing_config["meta"]["pool"]["meta"]["uuid"] + self._resource_id = backing_config["meta"]["uuid"] + self._handlers = { + "allocate": self.allocate_resource, + "release": self.release_resource, + "sync": self.sync_resource, + } + + def create(self, pool_conn): + pass + + def destroy(self, pool_conn): + self._uuid = None + self._resource_id = None + + @classmethod + def get_pool_type(cls): + return cls._SOURCE_POOL_TYPE + + @classmethod + def get_resource_type(cls): + return cls._BINDING_RESOURCE_TYPE + + @property + def binding_resource_id(self): + return self._resource_id + + @property + def source_pool_id(self): + return self._source_pool_id + + @property + def backing_id(self): + return self._uuid + + def get_update_handler(self, cmd): + return self._handlers[cmd] + + @abstractmethod + def allocate_resource(self, pool_connection, arguments): + raise NotImplemented + + @abstractmethod + def release_resource(self, pool_connection, arguments): + raise NotImplemented + + @abstractmethod + def get_resource_info(self, pool_connection): + raise NotImplemented + + @abstractmethod + def sync_resource(self, pool_connection, arguments): + raise NotImplemented diff --git a/virttest/vt_agent/managers/resource_backings/cvm/__init__.py b/virttest/vt_agent/managers/resource_backings/cvm/__init__.py new file mode 100644 index 0000000000..d31c959cc0 --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/cvm/__init__.py @@ -0,0 +1,27 @@ +from .. import _CVMResBackingMgr +from .. import _all_subclasses + + +class LaunchSecurity(object): + _instance = None + + @classmethod + def dispatch(cls): + return cls._instance + + @classmethod + def startup(cls, config): + if cls._instance is not None: + return cls._instance + + for mgr_cls in _all_subclasses(_CVMResBackingMgr): + if mgr_cls.get_platform_flags() is not None: + cls._instance = mgr_cls(config) + cls._instance.startup() + return cls._instance + + raise + + @classmethod + def teardown(cls): + cls._instance.teardown() diff --git a/virttest/vt_agent/managers/resource_backings/cvm/_sev_resmgr.py b/virttest/vt_agent/managers/resource_backings/cvm/_sev_resmgr.py new file mode 100644 index 0000000000..fcb2982cd8 --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/cvm/_sev_resmgr.py @@ -0,0 +1,98 @@ +from .. import _ResBacking +from .. import _ResBackingCaps +from .. import _CVMResBackingMgr + + +class _SEVCommResBacking(_ResBacking): + + def __init__(self, requests): + super().__init__(requests) + self._cbitpos = None + self._reduced_phys_bits = None + # self._sev_device = '/dev/sev' + # self._kernel_hashes = None + + def to_specs(self): + return {"cbitpos": self._cbitpos, "reduced-phys-bits": self._reduced_phys_bits} + + +class _SEVResBacking(_SEVCommResBacking): + RESOURCE_TYPE = "sev" + + def __init__(self): + super().__init__() + self._dh_cert = None + self._session = None + + def allocate(self, requests): + pass + + def free(self): + pass + + def to_specs(self): + pass + + +class _SNPResBacking(_SEVCommResBacking): + RESOURCE_TYPE = "snp" + + def __init__(self): + super().__init__() + + def allocate(self, requests): + pass + + def free(self): + pass + + def to_specs(self): + pass + + +class _SEVResBackingCaps(_ResBackingCaps): + + def __init__(self, params): + self._cbitpos = None + self._reduced_phys_bits = None + self._sev_device = None + self._max_sev_guests = None + self._max_snp_guests = None + self._pdh = None + self._cert_chain = None + self._cpu0_id = None + + def load(self): + pass + + def is_capable(self, requests): + pass + + def increase(self, backing): + pass + + def decrease(self, backing): + pass + + @property + def max_sev_guests(self): + return self._max_sev_guests + + @property + def max_snp_guests(self): + return self._max_snp_guests + + +class _SEVResBackingMgr(_CVMResBackingMgr): + + def __init__(self, config): + super().__init__(config) + self._caps = _SEVResBackingCaps(config) + _SEVResBackingMgr._platform_flags = config + + def startup(self): + reset_sev_platform() + super().startup() + + def teardown(self): + reset_sev_platform() diff --git a/virttest/vt_agent/managers/resource_backings/pool_connection.py b/virttest/vt_agent/managers/resource_backings/pool_connection.py new file mode 100644 index 0000000000..cc0df8d0d3 --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/pool_connection.py @@ -0,0 +1,32 @@ +from abc import ABC, abstractmethod + + +class _ResourcePoolAccess(ABC): + + @abstractmethod + def __init__(self, pool_access_config): + pass + + +class _ResourcePoolConnection(ABC): + _CONNECT_POOL_TYPE = None + + def __init__(self, pool_config): + self._pool_id = pool_config["meta"]["uuid"] + + @classmethod + def get_pool_type(cls): + return cls._CONNECT_POOL_TYPE + + @abstractmethod + def startup(self): + pass + + @abstractmethod + def shutdown(self): + pass + + @property + @abstractmethod + def connected(self): + return False diff --git a/virttest/vt_agent/managers/resource_backings/storage/__init__.py b/virttest/vt_agent/managers/resource_backings/storage/__init__.py new file mode 100644 index 0000000000..934cb0885b --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/storage/__init__.py @@ -0,0 +1,4 @@ +from .dir import _DirPoolConnection +from .dir import _DirVolumeBacking +from .nfs import _NfsPoolConnection +from .nfs import _NfsVolumeBacking diff --git a/virttest/vt_agent/managers/resource_backings/storage/dir/__init__.py b/virttest/vt_agent/managers/resource_backings/storage/dir/__init__.py new file mode 100644 index 0000000000..dd0d5847c5 --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/storage/dir/__init__.py @@ -0,0 +1,2 @@ +from .dir_backing import _DirVolumeBacking +from .dir_pool_connection import _DirPoolConnection diff --git a/virttest/vt_agent/managers/resource_backings/storage/dir/dir_backing.py b/virttest/vt_agent/managers/resource_backings/storage/dir/dir_backing.py new file mode 100644 index 0000000000..4f1fd61c02 --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/storage/dir/dir_backing.py @@ -0,0 +1,76 @@ +import logging +import os + +from avocado.utils import process + +from ...backing import _ResourceBacking + + +LOG = logging.getLogger("avocado.agents.resource_backings.storage.dir." + __name__) + + +class _DirVolumeBacking(_ResourceBacking): + _SOURCE_POOL_TYPE = "filesystem" + _BINDING_RESOURCE_TYPE = "volume" + + def __init__(self, backing_config): + super().__init__(backing_config) + self._size = backing_config["spec"]["size"] + self._filename = backing_config["spec"]["filename"] + self._uri = backing_config["spec"].get("uri") + self._handlers.update( + { + "resize": self.resize_volume, + } + ) + + def create(self, pool_connection): + if not self._uri: + self._uri = os.path.join(pool_connection.root_dir, self._filename) + + def destroy(self, pool_connection): + super().destroy(pool_connection) + self._uri = None + + def allocate_resource(self, pool_connection, arguments): + dirname = os.path.dirname(self._uri) + if not os.path.exists(dirname): + os.makedirs(dirname) + + process.run( + f"fallocate -l {self._size} {self._uri}", + shell=True, + verbose=False, + ignore_status=False, + ) + + return self.get_resource_info(pool_connection) + + def release_resource(self, pool_connection, arguments): + if os.path.exists(self._uri): + os.unlink(self._uri) + + def resize_volume(self, pool_connection, arguments): + pass + + def get_resource_info(self, pool_connection): + allocated, allocation = True, 0 + + try: + s = os.stat(self._uri) + allocation = str(s.st_size) + except FileNotFoundError: + allocated = False + + return { + "meta": { + "allocated": allocated, + }, + "spec": { + "uri": self._uri, + "allocation": allocation, + }, + } + + def sync_resource(self, pool_connection, arguments): + return self.get_resource_info(pool_connection) diff --git a/virttest/vt_agent/managers/resource_backings/storage/dir/dir_pool_connection.py b/virttest/vt_agent/managers/resource_backings/storage/dir/dir_pool_connection.py new file mode 100644 index 0000000000..6e26655f40 --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/storage/dir/dir_pool_connection.py @@ -0,0 +1,32 @@ +import os + +from avocado.utils.path import init_dir + +from ...pool_connection import _ResourcePoolConnection + + +LOG = logging.getLogger("avocado.agents.resource_backings.storage.dir." + __name__) + + +class _DirPoolConnection(_ResourcePoolConnection): + + _CONNECT_POOL_TYPE = "filesystem" + + def __init__(self, pool_config): + super().__init__(pool_config) + self._root_dir = pool_config["spec"]["path"] + + def startup(self): + init_dir(self.root_dir) + + def shutdown(self): + if not os.listdir(self.root_dir): + os.removedirs(self.root_dir) + + @property + def connected(self): + return os.path.exists(self.root_dir) + + @property + def root_dir(self): + return self._root_dir diff --git a/virttest/vt_agent/managers/resource_backings/storage/nfs/__init__.py b/virttest/vt_agent/managers/resource_backings/storage/nfs/__init__.py new file mode 100644 index 0000000000..d2b50e200b --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/storage/nfs/__init__.py @@ -0,0 +1,2 @@ +from .nfs_backing import _NfsVolumeBacking +from .nfs_pool_connection import _NfsPoolConnection diff --git a/virttest/vt_agent/managers/resource_backings/storage/nfs/nfs_backing.py b/virttest/vt_agent/managers/resource_backings/storage/nfs/nfs_backing.py new file mode 100644 index 0000000000..efd1c7f42c --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/storage/nfs/nfs_backing.py @@ -0,0 +1,75 @@ +import logging +import os + +from avocado.utils import process + +from ...backing import _ResourceBacking + + +LOG = logging.getLogger("avocado.agents.resource_backings.storage.nfs" + __name__) + + +class _NfsVolumeBacking(_ResourceBacking): + _SOURCE_POOL_TYPE = "nfs" + _BINDING_RESOURCE_TYPE = "volume" + + def __init__(self, backing_config): + super().__init__(backing_config) + self._size = backing_config["spec"]["size"] + self._filename = backing_config["spec"]["filename"] + self._uri = None + self._handlers.update( + { + "resize": self.resize_volume, + } + ) + + def create(self, pool_connection): + self._uri = os.path.join(pool_connection.mnt, self._filename) + + def destroy(self, pool_connection): + super().destroy(pool_connection) + self._uri = None + + def allocate_resource(self, pool_connection, arguments): + dirname = os.path.dirname(self._uri) + if not os.path.exists(dirname): + os.makedirs(dirname) + + process.run( + f"fallocate -l {self._size} {self._uri}", + shell=True, + verbose=False, + ignore_status=False, + ) + + return self.get_resource_info(pool_connection) + + def release_resource(self, pool_connection, arguments): + if os.path.exists(self._uri): + os.unlink(self._uri) + + def resize_volume(self, pool_connection, arguments): + pass + + def get_resource_info(self, pool_connection): + allocated, allocation = True, 0 + + try: + s = os.stat(self._uri) + allocation = str(s.st_size) + except FileNotFoundError: + allocated = False + + return { + "meta": { + "allocated": allocated, + }, + "spec": { + "uri": self._uri, + "allocation": allocation, + }, + } + + def sync_resource(self, pool_connection, arguments): + return self.get_resource_info(pool_connection) diff --git a/virttest/vt_agent/managers/resource_backings/storage/nfs/nfs_pool_connection.py b/virttest/vt_agent/managers/resource_backings/storage/nfs/nfs_pool_connection.py new file mode 100644 index 0000000000..902524523e --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/storage/nfs/nfs_pool_connection.py @@ -0,0 +1,58 @@ +import os + +from avocado.utils.path import init_dir + +from virttest import utils_misc + +from ...pool_connection import _ResourcePoolAccess +from ...pool_connection import _ResourcePoolConnection + + +LOG = logging.getLogger("avocado.agents.resource_backings.storage.nfs." + __name__) + + +class _NfsPoolAccess(_ResourcePoolAccess): + """ + Mount options + """ + + def __init__(self, pool_access_config): + self._options = pool_access_config.get("mount-options", "") + + def __str__(self): + return self._options + + +class _NfsPoolConnection(_ResourcePoolConnection): + _CONNECT_POOL_TYPE = "nfs" + + def __init__(self, pool_config): + super().__init__(pool_config) + spec = pool_config["spec"] + self._nfs_server = spec["server"] + self._export_dir = spec["export"] + self._nfs_access = _NfsPoolAccess(spec) + self._mnt = spec["mount"] + + def startup(self): + src = f"{self._nfs_server}:{self._export_dir}" + dst = self.mnt + init_dir(dst) + options = str(self._nfs_access) + utils_misc.mount(src, dst, fstype="nfs", options=options) + + def shutdown(self): + src = f"{self._nfs_server}:{self._export_dir}" + dst = self._mnt + utils_misc.umount(src, dst, fstype="nfs") + if self._create_mnt: + os.removedirs(self.mnt) + + def connected(self): + src = f"{self._nfs_server}:{self._export_dir}" + dst = self.mnt + return utils_misc.is_mount(src, dst, fstype="nfs") + + @property + def mnt(self): + return self._mnt diff --git a/virttest/vt_agent/services/image.py b/virttest/vt_agent/services/image.py new file mode 100644 index 0000000000..eaf83f3561 --- /dev/null +++ b/virttest/vt_agent/services/image.py @@ -0,0 +1,15 @@ +import logging + +from managers import image_handler_mgr + + +LOG = logging.getLogger("avocado.service." + __name__) + + +def handle_image(image_config, config): + """ + Handle the upper-level image. + """ + + LOG.info(f"Handle image with command: {config}") + return image_handler_mgr.handle_image(image_config, config) diff --git a/virttest/vt_agent/services/resource.py b/virttest/vt_agent/services/resource.py new file mode 100644 index 0000000000..fd82348bd7 --- /dev/null +++ b/virttest/vt_agent/services/resource.py @@ -0,0 +1,115 @@ +import logging + +from managers import resbacking_mgr + + +LOG = logging.getLogger("avocado.service." + __name__) + + +def connect_pool(pool_id, pool_config): + """ + Connect to a specified resource pool. + + :param pool_id: The resource pool id + :type pool_id: string + :param pool_config: The resource pool configuration + :type pool_config: dict + :return: Succeeded: 0, {} + Failed: 1, {"out": error message} + :rtype: tuple + """ + LOG.info(f"Connect to pool {pool_id}") + return resbacking_mgr.create_pool_connection(pool_id, pool_config) + + +def disconnect_pool(pool_id): + """ + Disconnect from a specified resource pool. + + :param pool_id: The resource pool id + :type pool_id: string + :param pool_config: The resource pool configuration + :type pool_config: dict + :return: Succeeded: 0, {} + Failed: 1, {"out": error message} + :rtype: tuple + """ + LOG.info(f"Disconnect from pool {pool_id}") + return resbacking_mgr.destroy_pool_connection(pool_id) + + +def create_backing_object(backing_config): + """ + Create a resource backing object on the worker node, which is bound + to one resource only + + :param backing_config: The resource backing configuration, usually, + it's a snippet of the resource configuration, + required for allocating the resource + :type backing_config: dict + :return: Succeeded: 0, {"out": backing_id} + Failed: 1, {"out": error message} + :rtype: tuple + """ + LOG.info( + "Create the backing object for the resource %s", backing_config["meta"]["uuid"] + ) + return resbacking_mgr.create_backing_object(backing_config) + + +def destroy_backing_object(backing_id): + """ + Destroy the backing + + :param backing_id: The cluster resource id + :type backing_id: string + :return: Succeeded: 0, {} + Failed: 1, {"out": error message} + :rtype: tuple + """ + LOG.info(f"Destroy the backing object {backing_id}") + return resbacking_mgr.destroy_backing_object(backing_id) + + +def get_backing_info(backing_id): + """ + Get the information of a resource with a specified backing + + We need not get all the information of the resource, because we can + get the static information by the resource object from the master + node, e.g. size, here we only get the information that only can be + fetched from the worker nodes. + + :param backing_id: The backing id + :type backing_id: string + :return: Succeeded: 0, {"out": config} + Failed: 1, {"out": "error message"} + e.g. a dir resource's config + { + "meta": { + "allocated": True, + }, + "spec":{ + "allocation": "1234567890", + 'uri': '/p1/f1', + } + } + :rtype: tuple + """ + LOG.info(f"Info the resource by backing {backing_id}") + return resbacking_mgr.get_backing_info(backing_id) + + +def update_backing(backing_id, config): + """ + :param backing_id: The resource backing id + :type backing_id: string + :param config: The specified action and the snippet of + the resource's spec and meta info used for update + :type config: dict + :return: Succeeded: 0, {"out" depends on the command} + Failed: 1, {"out": error message} + :rtype: tuple + """ + LOG.info(f"Update the resource by backing {backing_id}") + return resbacking_mgr.update_backing(backing_id, config) diff --git a/virttest/vt_cluster/__init__.py b/virttest/vt_cluster/__init__.py index 2e67c2870a..d06ea40897 100644 --- a/virttest/vt_cluster/__init__.py +++ b/virttest/vt_cluster/__init__.py @@ -18,6 +18,7 @@ import os import pickle +import uuid from virttest import data_dir @@ -32,8 +33,18 @@ class _Partition(object): """The representation of the partition of the cluster.""" def __init__(self): + self._uuid = uuid.uuid4().hex + self._pools = dict() self._nodes = set() + @property + def pools(self): + return self._pools + + @property + def uuid(self): + return self._uuid + def add_node(self, node): """ Add the node into the partition. @@ -201,5 +212,17 @@ def free_nodes(self): nodes = nodes - partition.nodes return list(nodes) + @property + def partition(self): + """ + When the job starts a new process to run a case, the cluster object + will be re-constructed as a new one, it reads the dumped file to get + back all the information. Note the cluster here is a 'slice' because + this object only serves the current test case, when the process(test + case) is finished, the slice cluster is gone. So there is only one + partition object added in self._data["partition"] + """ + return self._data["partitions"][0] + cluster = _Cluster() diff --git a/virttest/vt_cluster/selector.py b/virttest/vt_cluster/selector.py index 0f2d404d23..6def9dad6c 100644 --- a/virttest/vt_cluster/selector.py +++ b/virttest/vt_cluster/selector.py @@ -21,6 +21,8 @@ import operator from . import ClusterError, cluster, node_metadata +from virttest.vt_resmgr import resmgr + LOG = logging.getLogger("avocado." + __name__) @@ -150,6 +152,71 @@ def match_node(self, free_nodes): return None +class _PoolSelector(object): + """ + nodes = node1 node2 + pools = p1 p2 + pool_selectors_p1 = [{"key": "type", "operator": "==", "values": "filesystem"}, + pool_selectors_p1 += {"key": "access.nodes", "operator": "contains", values": "node1"}, + pool_selectors_p2 = [{"key": "type", "operator": "==", "values": "filesystem"}, + pool_selectors_p2 += {"key": "access.nodes", "operator": "contains", values": "node2"}, + """ + + def __init__(self, pool_selectors): + self._pool_selectors = ast.literal_eval(pool_selectors) + self._match_expressions = [] + + for pool_selector in self._pool_selectors: + key, operator, values = self._convert(pool_selector) + self._match_expressions.append( + _MatchExpression(key, operator, values) + ) + + def _convert(self, pool_selector): + key = pool_selector.get("key") + operator = pool_selector.get("operator") + values = pool_selector.get("values") + if "access.nodes" in key: + if isinstance(values, str): + values = cluster.get_node_by_tag(values).name + elif isinstance(values, list): + values = [cluster.get_node_by_tag(tag).name for tag in values] + else: + raise ValueError(f"Unsupported values {values}") + return key, operator, values + + def _get_values(self, keys, config): + for key in keys: + if key in config: + config = config[key] + else: + raise ValueError + return config + + def match_pool(self, pools): + for pool_id in pools: + config = resmgr.query_pool(pool_id) + for match_expression in self._match_expressions: + keys = match_expression.key.split(".") + op = match_expression.operator + values = match_expression.values + config_values = None + + try: + config_values = self._get_values(keys, config["meta"]) + except ValueError: + try: + config_values = self._get_values(keys, config["spec"]) + except ValueError: + raise SelectorError(f"Cannot find {match_expression.key}") + + if not _Operator.operate(op, config_values, values): + break + else: + return pool_id + return None + + def select_node(candidates, selectors=None): """ Select the node according to the node selectors. @@ -164,3 +231,8 @@ def select_node(candidates, selectors=None): selector = _Selector(selectors) return selector.match_node(candidates) return candidates.pop() if candidates else None + + +def select_resource_pool(pools, pool_selectors): + selector = _PoolSelector(pool_selectors) + return selector.match_pool(pools) diff --git a/virttest/vt_imgr/__init__.py b/virttest/vt_imgr/__init__.py new file mode 100644 index 0000000000..abfd5e0b3e --- /dev/null +++ b/virttest/vt_imgr/__init__.py @@ -0,0 +1 @@ +from .vt_imgr import vt_imgr diff --git a/virttest/vt_imgr/images/__init__.py b/virttest/vt_imgr/images/__init__.py new file mode 100644 index 0000000000..17c41ba6ce --- /dev/null +++ b/virttest/vt_imgr/images/__init__.py @@ -0,0 +1,12 @@ +from .qemu import _QemuImage + + +_image_classes = dict() +_image_classes[_QemuImage.get_image_type()] = _QemuImage + + +def get_image_class(image_type): + return _image_classes.get(image_type) + + +__all__ = ["get_image_class"] diff --git a/virttest/vt_imgr/images/image.py b/virttest/vt_imgr/images/image.py new file mode 100644 index 0000000000..9427584932 --- /dev/null +++ b/virttest/vt_imgr/images/image.py @@ -0,0 +1,95 @@ +import collections +import uuid +from abc import ABC, abstractmethod + + +class _Image(ABC): + """ + This is the upper-level image, in the context of a VM, it's mapping + to the VM's disk. It can have one or more lower-level images, + e.g. A qemu image can have a lower-level image chain: + base ---> sn + in which "sn" is the top lower-level image name while "base" is the + backing lower-level image name of "sn" + """ + + # Supported image types: qemu + _IMAGE_TYPE = None + + def __init__(self, image_config): + self._config = image_config + self.image_meta["uuid"] = uuid.uuid4().hex + self._virt_images = collections.OrderedDict() + self._handlers = dict() + + @classmethod + def get_image_type(cls): + return cls._IMAGE_TYPE + + @property + def image_id(self): + return self.image_meta["uuid"] + + @property + def image_config(self): + return self._config + + @property + def image_meta(self): + return self.image_config["meta"] + + @property + def image_spec(self): + return self.image_config["spec"] + + @property + def image_name(self): + return self.image_meta["name"] + + @image_name.setter + def image_name(self, name): + self.image_meta["name"] = name + + def is_owner(self, vm_name): + return vm_name == self.image_meta["owner"] + + @classmethod + def _define_config_legacy(cls, image_name, params): + return { + "meta": { + "uuid": None, + "name": image_name, + "type": cls.get_image_type(), + "owner": None, + "topology": None, + }, + "spec": { + "virt-images": {}, + }, + } + + @classmethod + def define_config(cls, image_name, params): + """ + Define the image configuration by its cartesian params + """ + return cls._define_config_legacy(image_name, params) + + @abstractmethod + def create_object(self): + raise NotImplemented + + @abstractmethod + def destroy_object(self): + raise NotImplemented + + @abstractmethod + def get_info(self, request): + raise NotImplemented + + @abstractmethod + def backup(self): + raise NotImplemented + + def get_image_handler(self, cmd): + return self._handlers.get(cmd) diff --git a/virttest/vt_imgr/images/qemu/__init__.py b/virttest/vt_imgr/images/qemu/__init__.py new file mode 100644 index 0000000000..a9808db67e --- /dev/null +++ b/virttest/vt_imgr/images/qemu/__init__.py @@ -0,0 +1 @@ +from .qemu_image import _QemuImage diff --git a/virttest/vt_imgr/images/qemu/qemu_image.py b/virttest/vt_imgr/images/qemu/qemu_image.py new file mode 100644 index 0000000000..f16294b2ab --- /dev/null +++ b/virttest/vt_imgr/images/qemu/qemu_image.py @@ -0,0 +1,367 @@ +import logging +import copy + +from virttest.vt_cluster import cluster + +from ..image import _Image +from .qemu_virt_image import get_virt_image_class + + +LOG = logging.getLogger("avocado." + __name__) + + +class _QemuImage(_Image): + + # The upper-level image type + _IMAGE_TYPE = "qemu" + + def __init__(self, image_config): + super().__init__(image_config) + # Store images with the same order as tags defined in image_chain + self._handlers.update( + { + "create": self.qemu_img_create, + "destroy": self.qemu_img_destroy, + "rebase": self.qemu_img_rebase, + "commit": self.qemu_img_commit, + "snapshot": self.qemu_img_snapshot, + "add": self.add_virt_image_object, + "remove": self.remove_virt_image_object, + "info": self.qemu_img_info, + "check": self.qemu_img_check, + "backup": self.backup, + "config": self.config, + } + ) + + @classmethod + def define_virt_image_config(cls, image_name, image_params): + image_format = image_params.get("image_format", "qcow2") + virt_image_class = get_virt_image_class(image_format) + return virt_image_class.define_config(image_name, image_params) + + @classmethod + def _define_config_legacy(cls, image_name, params): + def _define_topo_chain_config(): + backing = None + for image_tag in image_chain: + image_params = params.object_params(image_tag) + virt_images[image_tag] = cls.define_virt_image_config( + image_tag, image_params + ) + if backing is not None: + virt_images[image_tag]["spec"]["backing"] = backing + backing = image_tag + + def _define_topo_none_config(): + image_params = params.object_params(image_name) + virt_images[image_name] = cls.define_virt_image_config( + image_name, image_params + ) + + config = super()._define_config_legacy(image_name, params) + virt_images = config["spec"]["virt-images"] + + owner = params.object_params(image_name).get("image_owner") + if owner: + config["meta"]["owner"] = owner + + # image_chain should be the upper-level image param + image_chain = params.object_params(image_name).objects("image_chain") + if image_chain: + # config["meta"]["topology"] = {"type": "chain", "value": image_chain} + config["meta"]["topology"] = {"chain": image_chain} + _define_topo_chain_config() + else: + # config["meta"]["topology"] = {"type": "flat", "value": [image_name]} + config["meta"]["topology"] = {"none": [image_name]} + _define_topo_none_config() + + return config + + @property + def image_access_nodes(self): + node_set = set() + for virt_image in self.virt_images.values(): + node_set.update(virt_image.virt_image_access_nodes) + return list(node_set) + + @property + def virt_images(self): + return self._virt_images + + @property + def virt_image_names(self): + if "none" in self.image_meta["topology"]: + names = self.image_meta["topology"]["none"] + elif "chain" in self.image_meta["topology"]: + names = self.image_meta["topology"]["chain"] + else: + raise ValueError("Unknown topology %s" % self.image_meta["topology"]) + return names + + def create_virt_image_object(self, virt_image_name): + config = self.image_spec["virt-images"][virt_image_name] + image_format = config["spec"]["format"] + virt_image_class = get_virt_image_class(image_format) + virt_image = virt_image_class(config) + virt_image.create_object() + return virt_image + + def create_object(self): + """ + Create the qemu image object. + All its lower-level virt image objects and their volume + objects will be created + """ + LOG.debug("Created the image object for qemu image %s", + self.image_meta["name"]) + for virt_image_name in self.virt_image_names: + self.virt_images[virt_image_name] = self.create_virt_image_object(virt_image_name) + + def destroy_virt_image_object(self, virt_image_name): + virt_image = self.virt_images.pop(virt_image_name) + virt_image.destroy_object() + + def destroy_object(self): + """ + Destroy the image object, all its lower-level image objects + will be destroyed. + """ + for virt_image_name in self.virt_image_names[::-1]: + self.destroy_virt_image_object(virt_image_name) + for virt_image_name in self.virt_images: + self.destroy_virt_image_object(virt_image_name) + + def get_info(self, request): + for virt_image in self.virt_images.values(): + virt_image.sync_volume(dict()) + + config = self.image_config + if request is not None: + for item in request.split("."): + if item in config: + config = config[item] + else: + raise ValueError(request) + else: + config = {item: config} + + return config + + + def add_virt_image_object(self, arguments): + """ + Add a lower-level virt image into the qemu image + + Create the virt image object + Update the qemu image's topology + + Note: If the virt image has a backing, then its backing must be + the topest virt image, e.g. base <-- top, add top1, top1's backing + must be top, setting top1's backing to base will lead to error. + """ + target = arguments["target"] + target_image_params = arguments['target_params'] + backing_chain = arguments.get("backing_chain", False) + node_names = arguments.get("nodes") or self.image_access_nodes + + if target in self.virt_images: + raise ValueError(f"{target} already existed") + + if not set(node_names).issubset(set(self.image_access_nodes)): + raise ValueError(f"{node_names} should be a subset of {self.image_access_nodes}") + + config = self.define_virt_image_config( + target, target_image_params + ) + + if backing_chain: + config["spec"]["backing"] = self.virt_image_names[-1] + self.virt_image_names.append(target) + self.image_meta["name"] = target + if "none" in self.image_meta["topology"]: + self.image_meta["topology"]["chain"] = self.image_meta["topology"].pop("none") + + LOG.info("Qemu image changed: name=%s, topology=%s", + self.image_meta["name"], self.image_meta["topology"]) + + self.image_spec["virt-images"][target] = config + self.virt_images[target] = self.create_virt_image_object(target) + + def remove_virt_image_object(self, arguments): + """ + Remove the lower-level virt image + + Destroy the virt image object + Update the qemu image's topology + """ + target = arguments.pop('target') + + if target not in self.virt_images: + raise ValueError(f"{target} does not exist") + + if len(self.virt_images) == 1: + raise ValueError(f"Cannot remove {target} for a qemu image " + "must have at least one lower-level image") + + if target in self.virt_image_names: + if ("chain" in self.image_meta["topology"] and + target != self.virt_image_names[-1]): + raise ValueError("Only the top virt image in topology(%s) " + "can be removed" % self.virt_image_names) + elif "none" in self.image_meta["topology"]: + raise ValueError("Removing %s in topology(%s) can cause an " + "unknown state of the image" % (target, self.virt_image_names)) + + virt_image = self.virt_images.pop(target) + if virt_image.volume_allocated: + raise RuntimeError(f"The resource of {target} isn't released yet") + + virt_image.destroy_object() + + if target in self.virt_image_names: + self.virt_image_names.remove(target) + self.image_meta["name"] = self.virt_image_names[-1] + + if len(self.virt_image_names) < 2: + self.image_meta["topology"]["none"] = self.image_meta["topology"].pop("chain") + + LOG.info("Qemu image changed: name=%s, topology=%s", + self.image_meta["name"], self.image_meta["topology"]) + + def config(self, arguments): + pass + + def backup(self, arguments): + """ + Backup the image data + + Backup all lower-level images, or backup a specified one + """ + target = arguments.pop("target", None) + image_tags = [target] if target else self.virt_image_names + + for image_tag in image_tags: + virt_image = self.virt_images[image_tag] + + def qemu_img_create(self, arguments): + """ + qemu-img create + + Allocate storage + Create lower-level virt images with qemu-img + """ + node_name = self.image_access_nodes[0] + node = cluster.get_node(node_name) + + target = arguments.get("target") + if target in self.virt_image_names: + if target != self.virt_image_names[-1]: + raise ValueError("Only the top virt image in topology(%s) " + "can be created" % self.virt_image_names) + + image_tags = [target] if target else self.virt_image_names + LOG.info("Create the qemu image %s, targets: %s", + self.image_meta["name"], image_tags) + + for image_tag in image_tags: + virt_image = self.virt_images[image_tag] + virt_image.allocate_volume(arguments) + + r, o = node.proxy.image.handle_image(self.image_config, + {"create": arguments}) + if r != 0: + raise Exception(o["out"]) + + def qemu_img_destroy(self, arguments): + """ + Release the storage + + Note all the lower-level image objects and their volume objects + will not be destroyed. + """ + target = arguments.pop("target", None) + if target in self.virt_image_names: + if target != self.virt_image_names[-1]: + raise ValueError("Only the top virt image in topology(%s) " + "can be destroyed" % self.virt_image_names) + + image_tags = [target] if target else list(self.virt_images.keys()) + LOG.info("Destroy the qemu image %s, targets: %s", + self.image_meta["name"], image_tags) + + for image_tag in image_tags: + self.virt_images[image_tag].release_volume(arguments) + + def qemu_img_rebase(self, arguments): + """ + Rebase target to the top of the qemu image + """ + target = arguments.get("target") + backing = self.virt_image_names[-1] + arguments["source"] = backing + + LOG.info(f"Rebase lower-level image {target} onto {backing}") + add_args = { + "target": arguments["target"], + "target_params": arguments.pop("target_params"), + "nodes": arguments.pop("nodes", None), + } + self.add_virt_image_object(add_args) + + create_args = { + "target": target, + } + self.qemu_img_create(create_args) + + node_name = self.image_access_nodes[0] + node = cluster.get_node(node_name) + r, o = node.proxy.image.handle_image(self.image_config, + {"rebase": arguments}) + if r != 0: + raise Exception(o["out"]) + + self.image_meta["name"] = target + self.virt_image_names.append(target) + if "none" in self.image_meta["topology"]: + self.image_meta["topology"]["chain"] = self.image_meta["topology"].pop("none") + config = self.image_spec["virt-images"][target] + config["spec"]["backing"] = backing + + LOG.info("Qemu image changed: name=%s, topology=%s", + self.image_meta["name"], self.image_meta["topology"]) + + def qemu_img_commit(self, arguments): + node_name = self.image_access_nodes[0] + node = cluster.get_node(node_name) + r, o = node.proxy.image.handle_image(self.image_config, + {"commit": arguments}) + if r != 0: + raise Exception(o["out"]) + + def qemu_img_snapshot(self, arguments): + node_name = self.image_access_nodes[0] + node = cluster.get_node(node_name) + r, o = node.proxy.image.handle_image(self.image_config, + {"snapshot": arguments}) + if r != 0: + raise Exception(o["out"]) + + def qemu_img_info(self, arguments): + node_name = self.image_access_nodes[0] + node = cluster.get_node(node_name) + r, o = node.proxy.image.handle_image(self.image_config, + {"info": arguments}) + if r != 0: + raise Exception(o["out"]) + return o["out"] + + def qemu_img_check(self, arguments): + node_name = self.image_access_nodes[0] + node = cluster.get_node(node_name) + r, o = node.proxy.image.handle_image(self.image_config, + {"check": arguments}) + if r != 0: + raise Exception(o["out"]) + return o["out"] diff --git a/virttest/vt_imgr/images/qemu/qemu_virt_image/__init__.py b/virttest/vt_imgr/images/qemu/qemu_virt_image/__init__.py new file mode 100644 index 0000000000..085c06048a --- /dev/null +++ b/virttest/vt_imgr/images/qemu/qemu_virt_image/__init__.py @@ -0,0 +1,16 @@ +from .raw_qemu_virt_image import _RawQemuVirtImage +from .qcow2_qemu_virt_image import _Qcow2QemuVirtImage +from .luks_qemu_virt_image import _LuksQemuVirtImage + + +_image_classes= dict() +_image_classes[_RawQemuVirtImage.get_virt_image_format()] = _RawQemuVirtImage +_image_classes[_Qcow2QemuVirtImage.get_virt_image_format()] = _Qcow2QemuVirtImage +_image_classes[_LuksQemuVirtImage.get_virt_image_format()] = _LuksQemuVirtImage + + +def get_virt_image_class(virt_image_format): + return _image_classes.get(virt_image_format) + + +__all__ = ['get_virt_image_class'] diff --git a/virttest/vt_imgr/images/qemu/qemu_virt_image/luks_qemu_virt_image.py b/virttest/vt_imgr/images/qemu/qemu_virt_image/luks_qemu_virt_image.py new file mode 100644 index 0000000000..79f468152a --- /dev/null +++ b/virttest/vt_imgr/images/qemu/qemu_virt_image/luks_qemu_virt_image.py @@ -0,0 +1,31 @@ +import os + +from virttest.utils_misc import generate_random_string +from virttest.data_dir import get_tmp_dir + +from .qemu_virt_image import _QemuVirtImage + + +class _LuksQemuVirtImage(_QemuVirtImage): + _VIRT_IMAGE_FORMAT = "luks" + + @classmethod + def _define_config_legacy(cls, image_name, image_params): + config = super()._define_config_legacy(image_name, image_params) + spec = config["spec"] + spec.update({ + "preallocation": image_params.get("preallocated"), + "extent_size_hint": image_params.get("image_extent_size_hint"), + }) + + name = "secret_{s}".format(s=generate_random_string(6)) + spec["encryption"] = { + "name": name, + "data": image_params.get("image_secret", "redhat"), + "format": image_params.get("image_secret_format", "raw"), + } + + if image_params.get("image_secret_storage", "data") == "file": + spec["encryption"]["file"] = os.path.join(get_tmp_dir(), name) + + return config diff --git a/virttest/vt_imgr/images/qemu/qemu_virt_image/qcow2_qemu_virt_image.py b/virttest/vt_imgr/images/qemu/qemu_virt_image/qcow2_qemu_virt_image.py new file mode 100644 index 0000000000..d4bd19e831 --- /dev/null +++ b/virttest/vt_imgr/images/qemu/qemu_virt_image/qcow2_qemu_virt_image.py @@ -0,0 +1,40 @@ +import os + +from virttest.utils_misc import generate_random_string +from virttest.data_dir import get_tmp_dir + +from .qemu_virt_image import _QemuVirtImage + + +class _Qcow2QemuVirtImage(_QemuVirtImage): + + _VIRT_IMAGE_FORMAT = "qcow2" + + @classmethod + def _define_config_legacy(cls, image_name, image_params): + config = super()._define_config_legacy(image_name, image_params) + spec = config["spec"] + spec.update({ + "cluster-size": image_params.get("image_cluster_size"), + "lazy-refcounts": image_params.get("lazy_refcounts"), + "compat": image_params.get("qcow2_compatible"), + "preallocation": image_params.get("preallocated"), + "extent_size_hint": image_params.get("image_extent_size_hint"), + "compression_type": image_params.get("image_compression_type"), + }) + + name = "secret_{s}".format(s=generate_random_string(6)) + if image_params.get("image_encryption"): + spec["encryption"] = { + "name": name, + "data": image_params.get("image_secret", "redhat"), + "format": image_params.get("image_secret_format", "raw"), + "encrypt": { + "format": image_params.get("image_encryption", "luks"), + }, + } + + if image_params.get("image_secret_storage", "data") == "file": + spec["encryption"]["file"] = os.path.join(get_tmp_dir(), name) + + return config diff --git a/virttest/vt_imgr/images/qemu/qemu_virt_image/qemu_virt_image.py b/virttest/vt_imgr/images/qemu/qemu_virt_image/qemu_virt_image.py new file mode 100644 index 0000000000..3c398d6f1c --- /dev/null +++ b/virttest/vt_imgr/images/qemu/qemu_virt_image/qemu_virt_image.py @@ -0,0 +1,58 @@ +import logging +from abc import abstractmethod + +from ...virt_image import _VirtImage +from virttest.vt_resmgr import resmgr + + +LOG = logging.getLogger("avocado." + __name__) + + +class _QemuVirtImage(_VirtImage): + """ + A virt image has one storage resource(volume), take qemu virt image + as an example, the cartesian params beginning with 'image_', e.g. + 'image_size' describe this object + """ + + @classmethod + def _define_config_legacy(cls, image_name, image_params): + config = super()._define_config_legacy(image_name, + image_params) + config["spec"].update({ + "backing": None, + "volume": resmgr.define_resource_config(image_name, + "volume", + image_params), + }) + + return config + + def info(self): + pass + + def keep(self): + pass + + def create_object(self): + LOG.debug(f"Create the virt image object for '{self.virt_image_name}'") + volume_config = self.virt_image_spec["volume"] + volume_id = resmgr.create_resource_object(volume_config) + resmgr.update_resource(volume_id, {"bind": dict()}) + + def destroy_object(self): + LOG.debug(f"Destroy the virt image object for '{self.virt_image_name}'") + resmgr.update_resource(self.volume_id, {"unbind": dict()}) + resmgr.destroy_resource_object(self.volume_id) + + def sync_volume(self, arguments): + LOG.debug(f"Sync up the volume conf for '{self.virt_image_name}'") + resmgr.update_resource(self.volume_id, {"sync": arguments}) + + def allocate_volume(self, arguments): + LOG.debug(f"Allocate the volume for '{self.virt_image_name}'") + resmgr.update_resource(self.volume_id, {"allocate": arguments}) + + def release_volume(self, arguments): + LOG.debug(f"Release the volume for '{self.virt_image_name}'") + resmgr.update_resource(self.volume_id, {"release": arguments}) diff --git a/virttest/vt_imgr/images/qemu/qemu_virt_image/raw_qemu_virt_image.py b/virttest/vt_imgr/images/qemu/qemu_virt_image/raw_qemu_virt_image.py new file mode 100644 index 0000000000..b6fb55deae --- /dev/null +++ b/virttest/vt_imgr/images/qemu/qemu_virt_image/raw_qemu_virt_image.py @@ -0,0 +1,17 @@ +from .qemu_virt_image import _QemuVirtImage + + +class _RawQemuVirtImage(_QemuVirtImage): + + _VIRT_IMAGE_FORMAT = "raw" + + @classmethod + def _define_config_legacy(cls, image_name, image_params): + config = super()._define_config_legacy(image_name, image_params) + spec = config["spec"] + spec.update({ + "preallocation": image_params.get("preallocated"), + "extent_size_hint": image_params.get("image_extent_size_hint"), + }) + + return config diff --git a/virttest/vt_imgr/images/virt_image.py b/virttest/vt_imgr/images/virt_image.py new file mode 100644 index 0000000000..f1e745533b --- /dev/null +++ b/virttest/vt_imgr/images/virt_image.py @@ -0,0 +1,96 @@ +from abc import ABC, abstractmethod + + +class _VirtImage(ABC): + """ + The lower-level image, which has a storage resource(volume), is + defined by the cartesian params beginning with 'image_'. One or + more lower-level images can represent a upper-level image. + """ + + _VIRT_IMAGE_FORMAT = None + + def __init__(self, config): + self._config = config + self._backup_volumes = dict() + + @classmethod + def get_virt_image_format(cls): + return cls._VIRT_IMAGE_FORMAT + + @classmethod + def _define_config_legacy(cls, image_name, image_params): + return { + "meta": { + "name": image_name, + }, + "spec": { + "format": cls.get_virt_image_format(), + "volume": {}, + }, + } + + @classmethod + def define_config(cls, image_name, image_params): + """ + Define the virt image configuration by its cartesian params. + Currently use the existing image params, in future, we'll + design a new set of params to describe a lower-level image. + """ + return cls._define_config_legacy(image_name, image_params) + + @property + def volume_id(self): + return self.virt_image_spec["volume"]["meta"]["uuid"] + + @property + def virt_image_access_nodes(self): + volume_config = self.virt_image_spec["volume"] + bindings = volume_config["meta"]["bindings"] + return list(bindings.keys()) + + @property + def virt_image_name(self): + return self.virt_image_meta["name"] + + @property + def virt_image_config(self): + return self._config + + @property + def virt_image_spec(self): + return self.virt_image_config["spec"] + + @property + def virt_image_meta(self): + return self.virt_image_config["meta"] + + @property + @abstractmethod + def keep(self): + raise NotImplemented + + @abstractmethod + def create_object(self): + raise NotImplemented + + @abstractmethod + def destroy_object(self): + raise NotImplemented + + @abstractmethod + def info(self): + raise NotImplemented + + @abstractmethod + def allocate_volume(self, arguments): + raise NotImplemented + + @abstractmethod + def release_volume(self, arguments): + raise NotImplemented + + @property + def volume_allocated(self): + volume_config = self.virt_image_spec["volume"] + return volume_config["meta"]["allocated"] diff --git a/virttest/vt_imgr/vt_imgr.py b/virttest/vt_imgr/vt_imgr.py new file mode 100644 index 0000000000..745be0fc46 --- /dev/null +++ b/virttest/vt_imgr/vt_imgr.py @@ -0,0 +1,240 @@ +""" +The upper-level image manager. + +from virttest.vt_imgr import vt_imgr + +# Define the image configuration +image_config = vt_imgr.define_image_config(image_name, params) + +# Create the upper-level image object +image_id = vt_imgr.create_image_object(image_config) + +# Create the upper-level image +vt_imgr.handle_image(image_id, {"create":{}}) + +# Create only one lower-level image +vt_imgr.handle_image(image_id, {"create":{"target": "top"}}) + +# Destroy one lower-level image +vt_imgr.handle_image(image_id, {"destroy":{"target": "top"}}) + +# Get the configuration of the upper-level image +out = vt_imgr.get_image_info(image_id, request=None) +out: +{ + "meta": { + "uuid": "uuid-sn" + "name": "sn", + "type": "qemu", + "topology": {"chain": ["base", "sn"]} + }, + "spec": { + "virt-images": { + "base": { + "meta": {}, + "spec": { + "format": "raw", + "volume": {"meta": {}, "spec": {}}} + }, + "sn": { + "meta": {}, + "spec": { + "format": "qcow2", + "volume": {"meta": {}, "spec": {}}} + } + } + } +} + +# Destroy the upper-level image +vt_imgr.handle_image(image_id, {"destroy":{}}) + +# Destroy the upper-level image object +vt_imgr.destroy_image_object(image_id) +""" +import logging + +from .images import get_image_class +from virttest.vt_cluster import cluster + +LOG = logging.getLogger("avocado." + __name__) + + +# TODO: +# Add drivers for diff handlers +# Add access permission for images +# serialize +class _VTImageManager(object): + + def __init__(self): + self._images = dict() + + def startup(self): + LOG.info(f"Start the image manager") + + def teardown(self): + LOG.info(f"Stop the image manager") + + def define_image_config(self, image_name, params): + """ + Define the upper-level image(e.g. in the context of a VM, it's + mapping to a VM's disk) configuration by its cartesian params. + E.g. An upper-level qemu image has an lower-level image chain + base ---> sn + | | + resource resource + :param image_name: The image tag defined in cartesian params, + e.g. for a qemu image, the tag should be the + top image("sn" in the example above) if the + "image_chain" is defined, usually it is + defined in the "images" param, e.g. "image1" + :type image_name: string + :param params: The params for all the lower-level images + Note it's *NOT* an image-specific params like + params.object_params("sn") + *BUT* the params for both "sn" and "base" + Examples: + 1. images_vm1 = "image1 sn" + image_chain_sn = "base sn" + image_name = "sn" + params = the_case_params.object_params('vm1') + 2. images = "image1 stg" + image_name = "image1" + params = the_case_params + :type params: Params + :return: The image configuration + :rtype: dict + """ + image_params = params.object_params(image_name) + image_type = image_params.get("image_type", "qemu") + image_class = get_image_class(image_type) + + LOG.debug(f"Define the {image_type} image configuration for {image_name}") + return image_class.define_config(image_name, params) + + def create_image_object(self, image_config): + """ + Create an upper-level image(e.g. in the context of a VM, it's + mapping to a VM's disk) object by its configuration without + any storage allocation. All its lower-level images and their + mapping storage resource objects will be created. + :param image_config: The image configuration. + Call define_image_config to get it. + :type image_config: dict + :return: The image object id + :rtype: string + """ + image_type = image_config["meta"]["type"] + image_class = get_image_class(image_type) + image = image_class(image_config) + image.create_object() + self._images[image.image_id] = image + + LOG.debug("Created the image object %s for %s", + image.image_id, image.image_name) + return image.image_id + + def destroy_image_object(self, image_id): + """ + Destroy a specified image. All its storage allocation should + be released. + + :param image_id: The image id + :type image_id: string + """ + LOG.debug(f"Destroy the image object {image_id}") + image = self._images.pop(image_id) + image.destroy_object() + + + def handle_image(self, image_id, config): + """ + Update a specified upper-level image + + config format: + {command: arguments} + + Supported commands for a qemu image: + create: qemu-img create + destroy: Destroy the specified lower-level images + resize: qemu-img resize + map: qemu-img map + convert: qemu-img convert + commit: qemu-img commit + snapshot: qemu-img snapshot + rebase: qemu-img rebase + info: qemu-img info + check: qemu-img check + add: Add a lower-level image object + delete: Delete a lower-level image object + backup: Backup a qemu image + compare: Comare two qemu images + config: Update the static configurations + + Note: Not all images support the above operations + The arguments is a dict object which contains all related settings + for a specific command + :param image_id: The image id + :type image_id: string + """ + cmd, arguments = config.popitem() + image = self._images.get(image_id) + image_handler = image.get_image_handler(cmd) + + node_tags = arguments.pop("nodes", list()) + node_names = [cluster.get_node_by_tag(tag).name for tag in node_tags] + if node_names: + arguments["nodes"] = node_names + + LOG.debug(f"Handle the image object {image_id} with cmd {cmd}") + return image_handler(arguments) + + def get_image_info(self, image_id, request=None): + """ + Get the configuration of a specified upper-level image + + :param request: The query content, format: + None + meta[.] + spec[.virt-images.[.meta[.]]] + spec[.virt-images.[.spec[.]]] + Examples: + 1. Get the image's configuration + request=None + 2. Get the lower-level images' configurations + request=spec.virt-images + 3. Get sn's volume configuration + request=spec.virt-images.sn.spec.volume + :type request: string + :return: The configuration + :rtype: dict + """ + LOG.debug(f"Get the image object {image_id} with request {request}") + image = self._images.get(image_id) + return image.get_info(request) + + def query_image(self, image_name, vm_name=None): + """ + Get the image object id + + Note: The partition id is not required because only one + partition is created when running a test case + + :param image_name: The image tag defined in 'images' + :type image_name: string + :param vm_name: The vm tag defined in 'vms' + :type vm_name: string + :return: The image object id + :rtype: string + """ + for image in self._images.values(): + if image_name == image.image_name: + if vm_name is not None: + if image.is_owner(vm_name): + return image.image_id + else: + return image.image_id + return None + + +vt_imgr = _VTImageManager() diff --git a/virttest/vt_resmgr/__init__.py b/virttest/vt_resmgr/__init__.py new file mode 100644 index 0000000000..bcfbc1478b --- /dev/null +++ b/virttest/vt_resmgr/__init__.py @@ -0,0 +1 @@ +from .vt_resmgr import resmgr diff --git a/virttest/vt_resmgr/resources/__init__.py b/virttest/vt_resmgr/resources/__init__.py new file mode 100644 index 0000000000..7455113694 --- /dev/null +++ b/virttest/vt_resmgr/resources/__init__.py @@ -0,0 +1,19 @@ +#from .cvm import _SnpPool +#from .cvm import _TdxPool +#from .storage import _CephPool +from .storage import _DirPool +from .storage import _NfsPool + +_pool_classes = dict() +#_pool_classes[_SnpPool.get_pool_type()] = _SnpPool +#_pool_classes[_TdxPool.get_pool_type()] = _TdxPool +#_pool_classes[_CephPool.get_pool_type()] = _CephPool +_pool_classes[_DirPool.get_pool_type()] = _DirPool +_pool_classes[_NfsPool.get_pool_type()] = _NfsPool + + +def get_resource_pool_class(pool_type): + return _pool_classes.get(pool_type) + + +__all__ = ["get_resource_pool_class"] diff --git a/virttest/vt_resmgr/resources/cvm/__init__.py b/virttest/vt_resmgr/resources/cvm/__init__.py new file mode 100644 index 0000000000..0a0e47b0b0 --- /dev/null +++ b/virttest/vt_resmgr/resources/cvm/__init__.py @@ -0,0 +1 @@ +from .api import * diff --git a/virttest/vt_resmgr/resources/pool.py b/virttest/vt_resmgr/resources/pool.py new file mode 100644 index 0000000000..c39346075a --- /dev/null +++ b/virttest/vt_resmgr/resources/pool.py @@ -0,0 +1,178 @@ +import uuid +from copy import deepcopy +from abc import ABC, abstractmethod + +from virttest.vt_cluster import cluster + + +class _ResourcePool(ABC): + """ + A resource pool is used to manage resources. A resource must be + allocated from a specific pool, and a pool can hold many resources + """ + + _POOL_TYPE = None + + def __init__(self, pool_config): + self._config = pool_config + self.pool_meta["uuid"] = uuid.uuid4().hex + self._resources = dict() # {resource id: resource object} + self._caps = dict() + + if not set(self.attaching_nodes).difference(set(["*"])): + self.attaching_nodes = [n.name for n in cluster.get_all_nodes()] + + @property + def pool_name(self): + return self.pool_meta["name"] + + @property + def pool_id(self): + return self.pool_meta["uuid"] + + @property + def pool_config(self): + return self._config + + @property + def pool_meta(self): + return self._config["meta"] + + @property + def pool_spec(self): + return self._config["spec"] + + @property + def resources(self): + return self._resources + + @classmethod + def define_config(cls, pool_name, pool_params): + access = pool_params.get("access", {}) + return { + "meta": { + "name": pool_name, + "uuid": None, + "type": pool_params["type"], + "access": access, + }, + "spec": {}, + } + + def update_config(self, pool_config): + pass + + def get_info(self, request): + config = self.pool_config + if request is not None: + for item in request.split("."): + if item in config: + config = config[item] + else: + raise ValueError(request) + else: + config = {item: config} + + return deepcopy(config) + + @abstractmethod + def meet_resource_request(self, resource_type, resource_params): + """ + Check if the pool can support a resource's allocation + """ + raise NotImplementedError + + def define_resource_config(self, resource_name, resource_type, resource_params): + """ + Define the resource configuration, format: + {"meta": {...}, "spec": {...}} + It depends on the specific resource. + """ + res_cls = self.get_resource_class(resource_type) + config = res_cls.define_config(resource_name, resource_params) + + node_tags = resource_params.objects("vm_node") or resource_params.objects("nodes") + node_names = [cluster.get_node_by_tag(tag).name for tag in node_tags] + config["meta"].update( + { + "pool": self.pool_config, + "bindings": {node: None for node in node_names}, + } + ) + + return config + + @classmethod + @abstractmethod + def get_resource_class(cls, resource_type): + raise NotImplementedError + + def create(self): + pass + + def destroy(self): + pass + + def create_resource_object(self, resource_config): + """ + Create a resource object, no real resource allocated + """ + meta = resource_config["meta"] + res_cls = self.get_resource_class(meta["type"]) + res = res_cls(resource_config) + res.create() + self.resources[res.resource_id] = res + return res.resource_id + + def destroy_resource_object(self, resource_id): + """ + Destroy the resource object, all its backings should be released + """ + res = self.resources[resource_id] + res.destroy() + del(self.resources[resource_id]) + + def update_resource(self, resource_id, config): + resource = self.resources.get(resource_id) + cmd, arguments = config.popitem() + + # For the user specified nodes, we need to check if the pool + # can be accessed from these nodes + node_tags = arguments.pop("nodes") + if node_tags: + node_names = [cluster.get_node_by_tag(tag).name for tag in node_tags] + if not set(node_names).issubset(set(self.attaching_nodes)): + raise ValueError(f"Not all nodes({node_names}) can access the pool {self.pool_id}") + arguments["nodes"] = node_names + + handler = resource.get_update_handler(cmd) + handler(arguments) + + def get_resource_info(self, resource_id, request): + """ + Get the reference of a specified resource + """ + res = self.resources.get(resource_id) + return res.get_info(request) + + @property + def attaching_nodes(self): + return self.pool_meta["access"].get("nodes") + + @attaching_nodes.setter + def attaching_nodes(self, nodes): + self.pool_meta["access"]["nodes"] = nodes + + """ + @property + def pool_capability(self): + node_name = self.attaching_nodes[0] + node = cluster.get_node(node_name) + r, o = node.proxy.resource.get_pool_capability() + if r != 0: + raise Exception(o["out"]) + """ + + @classmethod + def get_pool_type(cls): + return cls._POOL_TYPE diff --git a/virttest/vt_resmgr/resources/resource.py b/virttest/vt_resmgr/resources/resource.py new file mode 100644 index 0000000000..7d58cc69e2 --- /dev/null +++ b/virttest/vt_resmgr/resources/resource.py @@ -0,0 +1,135 @@ +import uuid +from copy import deepcopy + +from abc import ABC, abstractmethod + + +class _Resource(ABC): + """ + A resource defines what users request, it's independent of a VM, + users can request kinds of resources for any purpose. The resource + can be bound to several backings on different worker nodes. + + Note: A resource can bind to only one backing on a worker node. + """ + + _RESOURCE_TYPE = None + + def __init__(self, resource_config): + self._config = resource_config + self.resource_meta["uuid"] = uuid.uuid4().hex + self._handlers = { + "bind": self.bind, + "unbind": self.unbind, + "allocate": self.allocate, + "release": self.release, + "sync": self.sync, + } + + @classmethod + def resource_type(cls): + raise cls._RESOURCE_TYPE + + @property + def resource_config(self): + return self._config + + @property + def resource_spec(self): + return self.resource_config["spec"] + + @property + def resource_meta(self): + return self.resource_config["meta"] + + @property + def resource_id(self): + return self.resource_meta["uuid"] + + @property + def resource_pool(self): + return self.resource_meta["pool"]["meta"]["uuid"] + + @property + def resource_bindings(self): + return self.resource_meta["bindings"] + + @classmethod + def _define_config_legacy(cls, resource_name, resource_params): + return { + "meta": { + "name": resource_name, + "uuid": None, + "type": None, + "pool": None, + "allocated": False, + "bindings": dict(), + }, + "spec": {}, + } + + @classmethod + def define_config(cls, resource_name, resource_params): + return cls._define_config_legacy(resource_name, resource_params) + + @property + def backing_config(self): + """ + Define the required information of the resource, used + for allocating the resource on the worker nodes + """ + return self.resource_config + #config = dict() + #config["uuid"] = self.resource_id + #config["pool"] = self.resource_pool + #config["type"] = self.resource_type + #return config + + def get_update_handler(self, command): + return self._handlers.get(command) + + @abstractmethod + def bind(self, arguments): + """ + Bind the resource to one or more worker nodes + """ + raise NotImplemented + + @abstractmethod + def unbind(self, arguments): + raise NotImplemented + + @abstractmethod + def allocate(self, arguments): + raise NotImplemented + + @abstractmethod + def release(self, arguments): + raise NotImplemented + + def get_info(self, request): + r, o = self.sync(dict()) + if r != 0: + raise Exception(o["out"]) + + config = self.resource_config + if request is not None: + for item in request.split("."): + if item in config: + config = config[item] + else: + raise ValueError(request) + else: + config = {item: config} + + return deepcopy(config) + + @abstractmethod + def sync(self, arguments): + raise NotImplemented + + def create(self): + pass + + def destroy(self): + pass diff --git a/virttest/vt_resmgr/resources/storage/__init__.py b/virttest/vt_resmgr/resources/storage/__init__.py new file mode 100644 index 0000000000..3f20ff6f5d --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/__init__.py @@ -0,0 +1,14 @@ +from .dir import _DirPool +from .nfs import _NfsPool +#from .ceph import _CephPool +#from .nbd import _NbdPool +#from .iscsi_direct import _IscsiDirectPool + + +__all__ = ( +# _CephPool, + _DirPool, + _NfsPool, +# _NbdPool, +# _IscsiDirectPool, +) diff --git a/virttest/vt_resmgr/resources/storage/ceph/__init__.py b/virttest/vt_resmgr/resources/storage/ceph/__init__.py new file mode 100644 index 0000000000..8ec3b25a7a --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/ceph/__init__.py @@ -0,0 +1 @@ +from .ceph_pool import _CephPool diff --git a/virttest/vt_resmgr/resources/storage/dir/__init__.py b/virttest/vt_resmgr/resources/storage/dir/__init__.py new file mode 100644 index 0000000000..c09faaf942 --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/dir/__init__.py @@ -0,0 +1 @@ +from .dir_pool import _DirPool diff --git a/virttest/vt_resmgr/resources/storage/dir/dir_pool.py b/virttest/vt_resmgr/resources/storage/dir/dir_pool.py new file mode 100644 index 0000000000..e7b654319a --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/dir/dir_pool.py @@ -0,0 +1,75 @@ +import logging +import os + +from ...pool import _ResourcePool +from .dir_resources import get_dir_resource_class + +from virttest.data_dir import get_data_dir +from virttest.vt_cluster import cluster + + +LOG = logging.getLogger("avocado." + __name__) + + +class _DirPool(_ResourcePool): + _POOL_TYPE = "filesystem" + _POOL_DEFAULT_DIR = "/home/kvm_autotest_root" + + @classmethod + def define_default_config(cls): + """ + We'll define a default filesystem pool if it is not defined by user + """ + pool_name = "dir_pool_default" + pool_params = { + "type": cls._POOL_TYPE, + "path": cls._POOL_DEFAULT_DIR, + "access": { + "nodes": list(), + } + } + return cls.define_config(pool_name, pool_params) + + @classmethod + def define_config(cls, pool_name, pool_params): + config = super().define_config(pool_name, pool_params) + path = pool_params.get("path") or os.path.join(get_data_dir(), "images") + config["spec"]["path"] = path + return config + + @classmethod + def get_resource_class(cls, resource_type): + return get_dir_resource_class(resource_type) + + def meet_resource_request(self, resource_type, resource_params): + # Specify a pool + pool_tag = resource_params.get("image_pool_name") + if pool_tag: + pool_id = cluster.partition.pools.get(pool_tag) + return True if pool_id == self.pool_id else False + + # Check if this is the pool with the specified type + if resource_params.get("storage_type", "filesystem") != self.get_pool_type(): + return False + + # Note if you want the image is created from a specific pool or + # the image is handled on a specific worker node, you should + # specify its image_pool_name + vm_node_tag = resource_params.get("vm_node") + if vm_node_tag: + # Check if the pool can be accessed by the vm nodes + vm_node_name = cluster.get_node_by_tag(vm_node_tag) + if vm_node_name not in self.attaching_nodes: + return False + else: + # Check if the pool can be accessed by one of the partition nodes + node_names = [node.name for node in cluster.partition.nodes] + if not set(self.attaching_nodes).intersection(set(node_names)): + return False + + # Check if the pool can support a specific resource + if not self.get_resource_class(resource_type): + return False + + # TODO: Check if the pool has capacity to allocate the resource + return True diff --git a/virttest/vt_resmgr/resources/storage/dir/dir_resources.py b/virttest/vt_resmgr/resources/storage/dir/dir_resources.py new file mode 100644 index 0000000000..8513de2b14 --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/dir/dir_resources.py @@ -0,0 +1,109 @@ +import logging + +from virttest.utils_numeric import normalize_data_size +from virttest.vt_cluster import cluster + +from ..volume import _FileVolume + + +LOG = logging.getLogger("avocado." + __name__) + + +class _DirFileVolume(_FileVolume): + """ + The directory file-based volume + """ + + def bind(self, arguments): + """ + Bind the resource to a backing on a worker node. + Note: A local dir resource has one and only one binding in the cluster + """ + node_name, backing_id = list(self.resource_bindings.items())[0] + if backing_id: + LOG.warning(f"The dir volume {self.resource_id} has already bound to {node_name}") + else: + nodes = arguments.pop("nodes", [node_name]) + LOG.info(f"Bind the dir volume {self.resource_id} to {nodes[0]}") + node = cluster.get_node(nodes[0]) + r, o = node.proxy.resource.create_backing_object(self.resource_config) + if r != 0: + raise Exception(o["out"]) + self.resource_bindings[nodes[0]] = o["out"] + + def unbind(self, arguments): + """ + Unbind the resource from a worker node. + Note: A dir resource must be released before unbinding + because it has only one binding + """ + node_name, backing_id = list(self.resource_bindings.items())[0] + LOG.info(f"Unbind the dir volume {self.resource_id} from {node_name}") + node = cluster.get_node(node_name) + r, o = node.proxy.resource.destroy_backing_object(backing_id) + if r != 0: + raise Exception(o["out"]) + self.resource_bindings[node_name] = None + + def sync(self, arguments): + LOG.debug(f"Sync up the configuration of the dir volume {self.resource_id}") + node_name, backing_id = list(self.resource_bindings.items())[0] + node = cluster.get_node(node_name) + r, o = node.proxy.resource.update_backing(backing_id, + {"sync": arguments}) + if r != 0: + raise Exception(o["out"]) + + config = o["out"] + self.resource_meta["allocated"] = config["meta"]["allocated"] + self.resource_spec["uri"] = config["spec"]["uri"] + self.resource_spec["allocation"] = config["spec"]["allocation"] + + def allocate(self, arguments): + node_name, backing_id = list(self.resource_bindings.items())[0] + LOG.debug(f"Allocate the dir volume {self.resource_id} from {node_name}.") + node = cluster.get_node(node_name) + r, o = node.proxy.resource.update_backing(backing_id, + {"allocate": arguments}) + if r != 0: + raise Exception(o["out"]) + + config = o["out"] + self.resource_meta["allocated"] = config["meta"]["allocated"] + self.resource_spec["uri"] = config["spec"]["uri"] + self.resource_spec["allocation"] = config["spec"]["allocation"] + + def release(self, arguments): + node_name, backing_id = list(self.resource_bindings.items())[0] + node = cluster.get_node(node_name) + LOG.debug(f"Release the dir volume {self.resource_id} from {node_name}") + r, o = node.proxy.resource.update_backing(backing_id, + {"release": arguments}) + if r != 0: + raise Exception(o["error"]) + self.resource_meta["allocated"] = False + self.resource_spec["allocation"] = 0 + + def resize(self, arguments): + """ + Resize the local dir volume resource + """ + new = int(normalize_data_size(arguments["size"], "B")) + if new != self.resource_spec["size"]: + node_name, backing_id = list(self.resource_bindings.items())[0] + LOG.debug(f"Resize the dir volume {self.resource_id} from {node_name}") + node = cluster.get_node(node_name) + r, o = node.proxy.resource.update_backing(backing_id, {"resize": arguments}) + if r != 0: + raise Exception(o["error"]) + self.resource_spec["size"] = new + else: + LOG.debug(f"New size {new} is the same with the original") + + +def get_dir_resource_class(resource_type): + mapping = { + "volume": _DirFileVolume, + } + + return mapping.get(resource_type) diff --git a/virttest/vt_resmgr/resources/storage/iscsi_direct/__init__.py b/virttest/vt_resmgr/resources/storage/iscsi_direct/__init__.py new file mode 100644 index 0000000000..4631b968fa --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/iscsi_direct/__init__.py @@ -0,0 +1 @@ +from .iscsi_direct_pool import _IscsiDirectPool diff --git a/virttest/vt_resmgr/resources/storage/iscsi_direct/iscsi_direct_pool.py b/virttest/vt_resmgr/resources/storage/iscsi_direct/iscsi_direct_pool.py new file mode 100644 index 0000000000..33738d1af9 --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/iscsi_direct/iscsi_direct_pool.py @@ -0,0 +1,20 @@ +import logging + +from ...resource import _Resource +from ...pool import _ResourcePool + + +LOG = logging.getLogger("avocado." + __name__) + + +class _IscsiDirectResource(_Resource): + """ + The iscsi-direct pool resource + """ + + def _initialize(self, config): + self._lun = config["lun"] + + +class _IscsiDirectPool(_ResourcePool): + POOL_TYPE = "iscsi-direct" diff --git a/virttest/vt_resmgr/resources/storage/nbd/__init__.py b/virttest/vt_resmgr/resources/storage/nbd/__init__.py new file mode 100644 index 0000000000..8a29e248f3 --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/nbd/__init__.py @@ -0,0 +1 @@ +from .nbd_pool import _NbdPool diff --git a/virttest/vt_resmgr/resources/storage/nfs/__init__.py b/virttest/vt_resmgr/resources/storage/nfs/__init__.py new file mode 100644 index 0000000000..a0e90ec573 --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/nfs/__init__.py @@ -0,0 +1 @@ +from .nfs_pool import _NfsPool diff --git a/virttest/vt_resmgr/resources/storage/nfs/nfs_pool.py b/virttest/vt_resmgr/resources/storage/nfs/nfs_pool.py new file mode 100644 index 0000000000..2c037f0e7d --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/nfs/nfs_pool.py @@ -0,0 +1,67 @@ +import logging +import os + +from ...pool import _ResourcePool +from .nfs_resources import get_nfs_resource_class + +from virttest.data_dir import get_shared_dir +from virttest.utils_misc import generate_random_string +from virttest.vt_cluster import cluster + + +LOG = logging.getLogger("avocado." + __name__) + + +class _NfsPool(_ResourcePool): + _POOL_TYPE = "nfs" + + @classmethod + def define_config(cls, pool_name, pool_params): + config = super().define_config(pool_name, pool_params) + config["spec"].update( + { + "server": pool_params["nfs_server_ip"], + "export": pool_params["nfs_mount_src"], + "mount-options": pool_params.get("nfs_mount_options", ""), + "mount": pool_params.get("nfs_mount_dir", + os.path.join(get_shared_dir(), generate_random_string(6))) + } + ) + return config + + @classmethod + def get_resource_class(cls, resource_type): + return get_nfs_resource_class(resource_type) + + def meet_resource_request(self, resource_type, resource_params): + # Specify a pool + pool_tag = resource_params.get("image_pool_name") + if pool_tag: + pool_id = cluster.partition.pools.get(pool_tag) + return True if pool_id == self.pool_id else False + + # Check if this is the pool with the specified type + if resource_params.get("storage_type") != self.get_pool_type(): + return False + + # Note if you want the image is created from a specific pool or + # the image is handled on a specific worker node, you should + # specify its image_pool_name + vm_node_tag = resource_params.get("vm_node") + if vm_node_tag: + # Check if the pool can be accessed by the vm nodes + vm_node_name = cluster.get_node_by_tag(vm_node_tag) + if vm_node_name not in self.attaching_nodes: + return False + else: + # Check if the pool can be accessed by one of the partition nodes + node_names = [node.name for node in cluster.partition.nodes] + if not set(self.attaching_nodes).intersection(set(node_names)): + return False + + # Check if the pool can support a specific resource + if not self.get_resource_class(resource_type): + return False + + # TODO: Check if the pool has capacity to allocate the resource + return True diff --git a/virttest/vt_resmgr/resources/storage/nfs/nfs_resources.py b/virttest/vt_resmgr/resources/storage/nfs/nfs_resources.py new file mode 100644 index 0000000000..5f885be155 --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/nfs/nfs_resources.py @@ -0,0 +1,113 @@ +import logging + +from virttest.utils_numeric import normalize_data_size +from virttest.vt_cluster import cluster + +from ..volume import _FileVolume + + +LOG = logging.getLogger("avocado." + __name__) + + +class _NfsFileVolume(_FileVolume): + """ + The nfs file-based volume + """ + + def bind(self, arguments): + """ + Bind the resource to a backing on a worker node. + Note: A nfs volume resource can have many bindings + """ + nodes = arguments.pop("nodes", list(self.resource_bindings.keys())) + for node_name in nodes: + if not self.resource_bindings.get(node_name): + LOG.info(f"Bind the nfs volume {self.resource_id} to node {node_name}") + node = cluster.get_node(node_name) + r, o = node.proxy.resource.create_backing_object(self.resource_config) + if r != 0: + raise Exception(o["out"]) + self.resource_bindings[node_name] = o["out"] + else: + LOG.info(f"The nfs volume {self.resource_id} has already bound to {node_name}") + + def unbind(self, arguments): + """ + Unbind the nfs volume from a worker node + """ + nodes = arguments.pop("nodes", list(self.resource_bindings.keys())) + for node_name in nodes: + backing_id = self.resource_bindings.get(node_name) + if backing_id: + LOG.info(f"Unbind the nfs volume {self.resource_id} from node {node_name}") + node = cluster.get_node(node_name) + r, o = node.proxy.resource.destroy_backing_object(backing_id) + if r != 0: + raise Exception(o["out"]) + self.resource_bindings[node_name] = None + else: + LOG.info(f"The nfs volume {self.resource_id} has already unbound from {node_name}") + + def sync(self, arguments): + LOG.debug(f"Sync up the configuration of the nfs volume {self.resource_id}") + node_name, backing_id = list(self.resource_bindings.items())[0] + node = cluster.get_node(node_name) + r, o = node.proxy.resource.update_backing(backing_id, + {"sync": arguments}) + if r != 0: + raise Exception(o["out"]) + + config = o["out"] + self.resource_meta["allocated"] = config["meta"]["allocated"] + self.resource_spec["uri"] = config["spec"]["uri"] + self.resource_spec["allocation"] = config["spec"]["allocation"] + + def allocate(self, arguments): + LOG.debug(f"Allocate the nfs volume {self.resource_id} from {node_name}.") + node_name, backing_id = list(self.resource_bindings.items())[0] + node = cluster.get_node(node_name) + r, o = node.proxy.resource.update_backing(backing_id, + {"allocate": arguments}) + if r != 0: + raise Exception(o["out"]) + + config = o["out"] + self.resource_meta["allocated"] = config["meta"]["allocated"] + self.resource_spec["uri"] = config["spec"]["uri"] + self.resource_spec["allocation"] = config["spec"]["allocation"] + + def release(self, arguments): + LOG.debug(f"Release the nfs volume {self.resource_id} from {node_name}") + node_name, backing_id = list(self.resource_bindings.items())[0] + node = cluster.get_node(node_name) + r, o = node.proxy.resource.update_backing(backing_id, + {"release": arguments}) + if r != 0: + raise Exception(o["error"]) + self.resource_meta["allocated"] = False + self.resource_spec["allocation"] = 0 + self.resource_spec["uri"] = None + + def resize(self, arguments): + """ + Resize the nfs volume + """ + new = int(normalize_data_size(arguments["size"], "B")) + if new != self.resource_spec["size"]: + LOG.debug(f"Resize the nfs volume {self.resource_id} from {node_name}") + node_name, backing_id = list(self.resource_bindings.items())[0] + node = cluster.get_node(node_name) + r, o = node.proxy.resource.update_backing(backing_id, {"resize": arguments}) + if r != 0: + raise Exception(o["error"]) + self.resource_spec["size"] = new + else: + LOG.debug(f"New size {new} is the same with the original") + + +def get_nfs_resource_class(resource_type): + mapping = { + "volume": _NfsFileVolume, + } + + return mapping.get(resource_type) diff --git a/virttest/vt_resmgr/resources/storage/volume.py b/virttest/vt_resmgr/resources/storage/volume.py new file mode 100644 index 0000000000..ac696322e1 --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/volume.py @@ -0,0 +1,81 @@ +import os + +from virttest import utils_numeric + +from ..resource import _Resource + + +class _Volume(_Resource): + """ + Storage volumes are abstractions of physical partitions, + LVM logical volumes, file-based disk images + """ + + _RESOURCE_TYPE = "volume" + _VOLUME_TYPE = None + + @classmethod + def get_volume_type(cls): + return cls._VOLUME_TYPE + + @classmethod + def _define_config_legacy(cls, resource_name, resource_params): + size = utils_numeric.normalize_data_size( + resource_params.get("image_size", "20G"), order_magnitude="B" + ) + + config = super()._define_config_legacy(resource_name, resource_params) + config["meta"].update({ + "type": cls._RESOURCE_TYPE, + "volume-type": cls.get_volume_type(), + "raw": resource_params.get_boolean("image_raw_device"), + }) + config["spec"].update({ + "size": size, + "allocation": None, + "uri": None, + }) + + return config + + +class _FileVolume(_Volume): + """For file based volumes""" + + _VOLUME_TYPE = "file" + + def __init__(self, resource_config): + super().__init__(resource_config) + self._handlers.update({ + "resize": self.resize, + }) + + @classmethod + def _define_config_legacy(cls, resource_name, resource_params): + config = super()._define_config_legacy(resource_name, resource_params) + + image_name = resource_params.get("image_name", "image") + if os.path.isabs(image_name): + config["spec"]["uri"] = image_name + config["spec"]["filename"] = os.path.basename(image_name) + else: + image_format = resource_params.get("image_format", "qcow2") + config["spec"]["filename"] = "%s.%s" % (image_name, image_format) + config["spec"]["uri"] = None + + return config + + def resize(self, arguments): + raise NotImplemented + + +class _BlockVolume(_Volume): + """For disk, lvm, iscsi based volumes""" + + _VOLUME_TYPE = "block" + + +class _NetworkVolume(_Volume): + """For rbd, iscsi-direct based volumes""" + + _VOLUME_TYPE = "network" diff --git a/virttest/vt_resmgr/vt_resmgr.py b/virttest/vt_resmgr/vt_resmgr.py new file mode 100644 index 0000000000..35e9162251 --- /dev/null +++ b/virttest/vt_resmgr/vt_resmgr.py @@ -0,0 +1,438 @@ +import ast +import logging +import os +import pickle + +from .resources import get_resource_pool_class + +from virttest.vt_cluster import cluster +from virttest.data_dir import get_data_dir + + +LOG = logging.getLogger("avocado." + __name__) +RESMGR_ENV_FILENAME = os.path.join(get_data_dir(), "vt_resmgr.env") + + +class PoolNotFound(Exception): + def __init__(self, pool_id): + self._pool_id = pool_id + + def __str__(self): + pool_id = self._pool_id + return f"Cannot find the pool by id={pool_id}" + + +class UnknownPoolType(Exception): + def __init__(self, pool_type): + self._pool_type = pool_type + + def __str__(self): + pool_type = self._pool_type + return f"Unknown pool type {pool_type}" + + +class PoolNotAvailable(Exception): + pass + + +class ResourceNotFound(Exception): + pass + + +class ResourceBusy(Exception): + pass + + +class ResourceNotAvailable(Exception): + pass + + +class UnknownResourceType(Exception): + pass + + +class _VTResourceManager(object): + + def __init__(self): + """ + When the job starts a new process to run a case, the resource manager + will be re-constructed as a new object, it reads the dumped file to get + back all the information. Note the resmgr here is a 'slice' because + this resmgr only serves the current test case, when the process(test + case) is finished, the slice resmgr is gone + """ + self._pools = dict() + if os.path.isfile(RESMGR_ENV_FILENAME): + self._load() + + @property + def _dump_data(self): + return { + "pools": self.pools, + } + + @_dump_data.setter + def _dump_data(self, data): + self.pools = data.get("pools") + + def _load(self): + with open(RESMGR_ENV_FILENAME, "rb") as f: + self._dump_data = pickle.load(f) + + def _dump(self): + with open(RESMGR_ENV_FILENAME, "wb") as f: + pickle.dump(self._dump_data, f) + + @property + def pools(self): + return self._pools + + @pools.setter + def pools(self, pools): + self._pools = pools + + def initialize(self, resource_pools_params): + """ + Register all the resource pools configured in cluster.json + Note: This function will be called only once during the VT bootstrap + + :param resource_pools_params: User defined resource pools' params + :type resource_pools_params: dict + """ + LOG.info(f"Initialize the resource manager") + + if os.path.exists(RESMGR_ENV_FILENAME): + LOG.info(f"Remove the old resource manager settings") + os.unlink(RESMGR_ENV_FILENAME) + + # Register a default pool on a node where no pool is defined + # e.g. if no filesystem pool is defined we need to register + # a default one + default_pools_nodes = { + "filesystem": set(), + # "switch": set(), + } + + # Register the resource pools + for category, params in resource_pools_params.items(): + for pool_name, pool_params in params.items(): + pool_config = self.define_pool_config(pool_name, pool_params) + pool_id = self.create_pool_object(pool_config) + + # Record the nodes of the pools with default type, i.e + # we've pools with default type attached to these nodes + pool = self.get_pool_by_id(pool_id) + pool_type = pool.get_pool_type() + if pool_type in default_pools_nodes: + default_pools_nodes[pool_type].update( + set(pool.attaching_nodes) + ) + + # Register the default pools if they are not defined + all_nodes = set([n.name for n in cluster.get_all_nodes()]) + for pool_type, node_set in default_pools_nodes.items(): + for node_name in all_nodes.difference(node_set): + LOG.debug(f"Register a default {pool_type} pool " + "with access nodes {node_name}") + pool_class = get_resource_pool_class(pool_type) + pool_config = pool_class.define_default_config() + pool_config["meta"]["access"]["nodes"] = [node_name] + self.create_pool_object(pool_config) + + # Dump all the information for the job process + self._dump() + + def startup(self): + """ + Attach all configured resource pools + Note: This function is called only once in job's pre_tests + """ + LOG.info(f"Startup the resource manager") + for pool_id in self.pools: + self.attach_pool(pool_id) + + def teardown(self): + """ + Detach all configured resource pools + Note: This function is called only once in job's post_tests + """ + LOG.info(f"Teardown the resource manager") + for pool_id in list(self.pools.keys()): + self.detach_pool(pool_id) + + def get_pool_by_name(self, pool_name): + pools = [p for p in self.pools.values() if p.pool_name == pool_name] + return pools[0] if pools else None + + def get_pool_by_id(self, pool_id): + return self.pools.get(pool_id) + + def get_pool_by_resource(self, resource_id): + pools = [p for p in self.pools.values() if resource_id in p.resources] + return pools[0] if pools else None + + def select_pool(self, resource_type, resource_params): + """ + Select the resource pool by its cartesian params + + :param resource_type: The resource's type, supported: + "volume" + :type resource_type: string + :param resource_params: The resource's specific params, e.g. + params.object_params('image1') + :type resource_params: dict or Param + :return: The resource pool id + :rtype: string + """ + LOG.info(f"Select a pool for the {resource_type} resource") + for pool_id, pool in self.pools.items(): + if pool.meet_resource_request(resource_type, resource_params): + return pool_id + return None + + def define_pool_config(self, pool_name, pool_params): + """ + Define a resource pool's configuration by its cartesian params. + + :param pool_name: The uniq resource pool name + :type pool_name: string + :param pool_params: The resource pool's specific params + :type pool_params: Param + :return: The resource pool's configuration, + format: {"meta":{...}, "spec":{...}} + The specific attributes depend on the specific pool + :rtype: dict + """ + pool_class = get_resource_pool_class(pool_params["type"]) + if pool_class is None: + raise UnknownPoolType(pool_type) + + return pool_class.define_config(pool_name, pool_params) + + def create_pool_object(self, pool_config): + """ + Create a resource pool object + Note: Currently the users need to setup the pool before testing + + :param pool_config: The pool's configuration, generated by + define_pool_config function + :type pool_config: dict + :return: The resource pool id + :rtype: string + """ + pool_type = pool_config["meta"]["type"] + pool_class = get_resource_pool_class(pool_type) + if pool_class is None: + raise UnknownPoolType(pool_type) + + pool = pool_class(pool_config) + pool.create() + self.pools[pool.pool_id] = pool + + LOG.info(f"Created the pool object {pool.pool_id} for {pool.pool_name}") + return pool.pool_id + + def destroy_pool_object(self, pool_id): + """ + The pool should be detached from all worker nodes before destroying it + + :param pool_id: The id of the pool + :type pool_id: string + """ + LOG.info(f"Destroy the pool object {pool_id}") + pool = self.pools.pop(pool_id) + pool.destroy() + + def _attach_pool_to(self, pool, node): + """ + Attach a pool to a specific node + """ + LOG.info(f"Attach resource pool ({pool.pool_name}) to {node.name}") + r, o = node.proxy.resource.connect_pool(pool.pool_id, pool.pool_config) + if r != 0: + raise Exception(o["out"]) + + def attach_pool(self, pool_id): + """ + Attach the resource pool to the worker nodes, where the pool can be + accessed. The accessiable worker nodes are defined by the pool's meta + + :param pool_id: The id of the pool to attach + :type pool_id: string + """ + pool = self.get_pool_by_id(pool_id) + for node_name in pool.attaching_nodes: + node = cluster.get_node(node_name) + self._attach_pool_to(pool, node) + + def _detach_pool_from(self, pool, node): + """ + Detach a pool from a specific node + """ + LOG.info(f"Detach resource pool({pool.pool_name}) from {node.name}") + r, o = node.proxy.resource.disconnect_pool(pool.pool_id) + if r != 0: + raise Exception(o["out"]) + + def detach_pool(self, pool_id): + """ + Detach the pool from its accessiable worker nodes + + :param pool_id: The id of the pool to detach + :type pool_id: string + """ + pool = self.get_pool_by_id(pool_id) + for node_name in pool.attaching_nodes: + node = cluster.get_node(node_name) + self._detach_pool_from(pool, node) + + def get_pool_info(self, pool_id, request=None): + """ + Get the configuration of a specified resource pool + + :param pool_id: The resource pool id + :type pool_id: string + :param request: The query content, format: + None + meta[.] + spec[.] + Note return the whole configuration if request=None + :type request: string + :return: The pool's configuration, e.g request=meta.type, it + returns: {"type": "filesystem"} + :rtype: dict + """ + pool = self.get_pool_by_id(pool_id) + return pool.get_info(request) + + def get_all_pools(self): + """ + Get all resource pools' uuid list + + :return: The pools' uuid list + :rtype: list + """ + return list(self.pools.keys()) + + def pool_capability(self, pool_id): + pool = self.get_pool_by_id(pool_id) + return pool.capability + + def define_resource_config(self, resource_name, resource_type, resource_params): + """ + Define a resource's configuration by its cartesian params + + :param resource_type: The resource type, it's usually implied, e.g. + the image's storage resource is a "volume", + supported: "volume" + :type resource_type: string + :param resource_params: The resource's specific params, usually + defined by an upper-level object, e.g. + "image1" has a storage resource, so + resource_params = image1's params + i.e. use image1's params to define its + storage resource's configuration + :type resource_params: Param + :return: The resource's configuration, + format: {"meta":{...}, "spec":{...}} + The specific attributes depend on the specific resource + :rtype: dict + """ + pool_id = self.select_pool(resource_type, resource_params) + if pool_id is None: + raise PoolNotAvailable() + pool = self.get_pool_by_id(pool_id) + return pool.define_resource_config(resource_name, + resource_type, + resource_params) + + def create_resource_object(self, resource_config): + """ + Create a resource object without any specific resource allocation. + + :param resource_config: The resource configuration, generated by + define_resource_config function + :type resource_config: dict + :return: The resource id + :rtype: string + """ + pool_config = resource_config["meta"]["pool"] + pool_id = pool_config["meta"]["uuid"] + pool = self.get_pool_by_id(pool_id) + if pool is None: + raise PoolNotFound(pool_id) + return pool.create_resource_object(resource_config) + + def destroy_resource_object(self, resource_id): + """ + Destroy the resource object, the specific resource allocation + will be released + + :param resource_id: The resource id + :type resource_id: string + """ + pool = self.get_pool_by_resource(resource_id) + pool.destroy_resource_object(resource_id) + + def get_resource_info(self, resource_id, request=None): + """ + Get the configuration of a specified resource + + :param resource_id: The resource id + :type resource_id: string + :param request: The query content, format: + None + meta[.] + spec[.] + Examples: + meta + spec.size + :type request: string + :return: The resource's configuration, e.g request=spec.size, it + returns: {"size": "123456"} + :rtype: dict + """ + pool = self.get_pool_by_resource(resource_id) + return pool.get_resource_info(resource_id, request) + + def update_resource(self, resource_id, config): + """ + Update a resource, the config format: + {'command': arguments} + Supported commands: + 'bind': Bind a specified resource to one or more worker nodes in order + to access the specific resource allocation, note the resource + is *NOT* allocated with the bind command + 'unbind': Unbind a specified resource from one or more worker nodes, + the specific resource will be released only when all bindings + are gone + 'allocate': Allocate the resource + 'release': Release the resource + 'sync': Sync up the resource configuration. Some items of the + configuration can change and only be fetched on the worker + nodes, e.g. allocation, use sync to sync-up these items + The arguments is a dict object which contains all related settings for a + specific action + + Examples: + Bind a resource to one or more nodes + {'bind': {'nodes': ['node1']}} + {'bind': {'nodes': ['node1', 'node2']}} + Unbind a resource from one or more nodes + {'unbind': {'nodes': []}} + {'unbind': {'nodes': ['node1', 'node2']}} + Allocate the resource + {'allocate': {}} + Release the resource + {'release': {}} + + :param resource_id: The resource id + :type resource_id: string + :param config: The specified action and its arguments + :type config: dict + """ + pool = self.get_pool_by_resource(resource_id) + return pool.update_resource(resource_id, config) + + +resmgr = _VTResourceManager() diff --git a/virttest/vt_utils/image/qemu.py b/virttest/vt_utils/image/qemu.py new file mode 100644 index 0000000000..1bab194dd2 --- /dev/null +++ b/virttest/vt_utils/image/qemu.py @@ -0,0 +1,177 @@ +import collections +import logging +import json +import os +import re +import string + +from avocado.core import exceptions +from avocado.utils import path as utils_path +from avocado.utils import process + +from virttest import utils_numeric + + +LOG = logging.getLogger("avocado.service." + __name__) + + +def _get_dir_volume_opts(volume_config): + return { + "driver": "file", + "filename": volume_config["spec"]["uri"], + } + + +def _get_nfs_volume_opts(volume_config): + return _get_dir_volume_opts(volume_config) + + +def _get_ceph_volume_opts(volume_config): + volume_spec = volume_config["spec"] + pool_config = volume_config["meta"]["pool"] + pool_meta = pool_config["meta"] + pool_spec = pool_config["spec"] + + volume_opts = { + "driver": "rbd", + "pool": pool_spec["pool"], + "image": volume_spec["filename"], + } + + if pool_spec.get("conf") is not None: + volume_opts["conf"] = pool_spec["conf"] + if pool_spec.get("namespace") is not None: + volume_opts["namespace"] = pool_spec["namespace"] + + return volume_opts + + +def _get_iscsi_direct_volume_opts(volume_config): + pool_config = volume_config["meta"]["pool"] + pool_meta = pool_config["meta"] + pool_spec = pool_config["spec"] + + # required options for iscsi + volume_opts = { + "driver": "iscsi", + "transport": pool_spec["transport"], + "portal": pool_spec["portal"], + "target": pool_spec["target"], + } + + # optional option + if pool_spec["user"] is not None: + volume_opts["user"] = pool_spec["user"] + + return volume_opts + + +def _get_nbd_volume_opts(volume_config): + volume_meta = volume_config["meta"] + volume_spec = volume_config["spec"] + pool_config = volume_meta["pool"] + pool_meta = pool_config["meta"] + pool_spec = pool_config["spec"] + + volume_opts = {"driver": "nbd"} + if pool_spec.get("host"): + volume_opts.update({ + "server.type": "inet", + "server.host": pool_spec["host"], + "server.port": volume_spec.get("port", 10809), + }) + elif pool_spec.get("path"): + volume_opts.update({ + "server.type": "unix", + "server.path": pool_spec["path"], + }) + else: + raise + + if volume_spec.get("export"): + volume_opts["export"] = volume_spec["export"] + + return volume_opts + + +def get_ceph_pool_access_opts(pool_config): + auth = dict() + return auth + + +def get_iscsi_direct_pool_access_opts(pool_config): + auth = dict() + return auth + + +def get_nbd_pool_access_opts(pool_config): + auth = dict() + return auth + + +def get_qemu_virt_image_volume_access_auth_opts(pool_config): + access_opts_getters = { + "filesystem": lambda i: dict(), + "nfs": lambda i: dict(), + "ceph": get_ceph_pool_access_opts, + "iscsi-direct": get_iscsi_direct_pool_access_opts, + "nbd": get_nbd_pool_access_opts, + } + + pool_type = pool_config["meta"]["type"] + access_opts_getter = access_opts_getters[pool_type] + + return access_opts_getter(pool_config) + + +def get_volume_opts(volume_config): + volume_opts_getters = { + "filesystem": _get_dir_volume_opts, + "nfs": _get_nfs_volume_opts, + "ceph": _get_ceph_volume_opts, + "iscsi-direct": _get_iscsi_direct_volume_opts, + "nbd": _get_nbd_volume_opts, + } + + pool_config = volume_config["meta"]["pool"] + pool_type = pool_config["meta"]["type"] + volume_opts_getter = volume_opts_getters[pool_type] + + return volume_opts_getter(volume_config) + + +def get_image_opts(virt_image_config): + """ + Get lower-level qemu virt image options + + Return a tuple of (access_auth_opts, encryption_opts, virt_image_opts) + """ + volume_config = virt_image_config["spec"]["volume"] + virt_image_format = virt_image_config["spec"]["format"] + + virt_image_opts = collections.OrderedDict() + virt_image_opts["file"] = collections.OrderedDict() + virt_image_opts["driver"] = virt_image_format + virt_image_opts["file"].update(get_volume_opts(volume_config)) + + # lower-level virt image encryption options + encryption_opts = virt_image_config["spec"].get("encryption", dict()) + if virt_image_format == "luks": + key = "password-secret" if "file" in encryption_opts else "key-secret" + virt_image_opts[key] = encryption_opts["name"] + elif virt_image_format == "qcow2" and encryption_opts: + encrypt_format = encryption_opts["encrypt"]["format"] + if encrypt_format == "luks": + virt_image_opts["encrypt.key-secret"] = encryption_opts["name"] + virt_image_opts.update( + {f"encrypt.{k}": v for k, v in encryption_opts["encrypt"]} + ) + else: + raise ValueError(f"Unknown encrypt format: {encrypt_format}") + + # volume pool access auth options + pool_config = volume_config["meta"]["pool"] + access_auth_opts = get_qemu_virt_image_volume_access_auth_opts(pool_config) + + # TODO: Add filters here + return access_auth_opts, encryption_opts, virt_image_opts