Skip to content

Commit

Permalink
Add storage and image management support
Browse files Browse the repository at this point in the history
Added pool_selectors and image_pool_name

Signed-off-by: Zhenchao Liu <[email protected]>
  • Loading branch information
zhencliu committed Sep 25, 2024
1 parent 774d424 commit 3232e25
Show file tree
Hide file tree
Showing 57 changed files with 3,979 additions and 35 deletions.
14 changes: 6 additions & 8 deletions avocado_vt/plugins/vt_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from avocado.core.plugin_interfaces import JobPreTests as Pre
from avocado.utils.stacktrace import log_exc_info
from virttest.vt_cluster import cluster, node_metadata
from virttest.vt_imgr import vt_imgr
from virttest.vt_resmgr import resmgr


class ClusterSetupError(Exception):
Expand Down Expand Up @@ -50,21 +52,17 @@ def _pre_node_setup():
def _pre_mgr_setup():
try:
# Pre-setup the cluster manager
# e.g:
# startup_resmgr()
# vt_imgr.startup()
pass
resmgr.startup()
vt_imgr.startup()
except Exception as err:
raise ClusterManagerSetupError(err)

@staticmethod
def _post_mgr_cleanup():
try:
# Post-cleanup the cluster manager
# e.g:
# teardown_resmgr()
# vt_imgr.teardown()
pass
vt_imgr.teardown()
resmgr.teardown()
except Exception as err:
raise ClusterManagerCleanupError(err)

Expand Down
15 changes: 15 additions & 0 deletions avocado_vt/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
)
from virttest._wrappers import load_source
from virttest.vt_cluster import cluster, logger, selector
from virttest.vt_resmgr import resmgr

# avocado-vt no longer needs autotest for the majority of its functionality,
# except by:
Expand Down Expand Up @@ -378,7 +379,21 @@ def _setup_partition(self):
_node.tag = node
self._cluster_partition.add_node(_node)

# Select the pools when the user specifies the pools param
for pool_tag in self.params.objects("pools"):
pool_params = self.params.object_params(pool_tag)
pool_selectors = pool_params.get("pool_selectors")

pools = set(resmgr.get_all_pools()) - set(cluster.partition.pools.values())
pool_id = selector.select_resource_pool(list(pools), pool_selectors)
if not pool_id:
raise selector.SelectorError(
f"No pool selected for {pool_tag} with {pool_selectors}"
)
self._cluster_partition.pools[pool_tag] = pool_id

def _clear_partition(self):
self._cluster_partition.pools.clear()
cluster_dir = os.path.join(self.resultsdir, "cluster")
if self._cluster_partition.nodes:
for node in self._cluster_partition.nodes:
Expand Down
6 changes: 6 additions & 0 deletions virttest/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from avocado.utils import process

from virttest.vt_cluster import cluster, node
from virttest.vt_resmgr import resmgr

from . import arch, asset, cartesian_config, data_dir, defaults, utils_selinux
from .compat import get_opt
Expand Down Expand Up @@ -895,6 +896,10 @@ def _register_hosts(hosts_configs):
LOG.debug("Host %s registered", host)


def _initialize_managers(pools_params):
resmgr.setup(pools_params)


def _config_master_server(master_config):
"""Configure the master server."""
if master_config:
Expand Down Expand Up @@ -1084,6 +1089,7 @@ def bootstrap(options, interactive=False):
cluster_config = _load_cluster_config(vt_cluster_config)
_register_hosts(cluster_config.get("hosts"))
_config_master_server(cluster_config.get("master"))
_initialize_managers(cluster_config.get("pools"))

LOG.info("")
LOG.info("VT-BOOTSTRAP FINISHED")
Expand Down
106 changes: 79 additions & 27 deletions virttest/env_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
from virttest.test_setup.storage import StorageConfig
from virttest.test_setup.vms import UnrequestedVMHandler
from virttest.utils_version import VersionInterval
from virttest.vt_imgr import vt_imgr


utils_libvirtd = lazy_import("virttest.utils_libvirtd")
virsh = lazy_import("virttest.virsh")
Expand Down Expand Up @@ -124,42 +126,70 @@ def _get_qemu_version(qemu_cmd):
return "Unknown"


def preprocess_image(test, params, image_name, vm_process_status=None):
def preprocess_image(test, params, image_name, vm_process_status=None, vm_name=None):
"""
Preprocess a single QEMU image according to the instructions in params.
:param test: Autotest test object.
:param params: A dict containing image preprocessing parameters.
:param vm_process_status: This is needed in postprocess_image. Add it here
only for keep it work with process_images()
:param vm_name: vm tag defined in 'vms'
:note: Currently this function just creates an image if requested.
"""
# FIXME:
image_id = None
if params.get_boolean("multihost"):
params = params.copy()
params[f"image_owner_{image_name}"] = vm_name
image_config = vt_imgr.define_image_config(image_name, params)
image_id = vt_imgr.create_image_object(image_config)

params = params.object_params(image_name)
base_dir = params.get("images_base_dir", data_dir.get_data_dir())

if not storage.preprocess_image_backend(base_dir, params, image_name):
LOG.error("Backend can't be prepared correctly.")

image_filename = storage.get_image_filename(params, base_dir)
image_filename = None
if not params.get_boolean("multihost"):
image_filename = storage.get_image_filename(params, base_dir)

create_image = False
if params.get("force_create_image") == "yes":
create_image = True
elif params.get("create_image") == "yes" and not storage.file_exists(
params, image_filename
):
create_image = True
elif params.get("create_image") == "yes":
# FIXME: check all volumes allocated
if params.get_boolean("multihost"):
volume = vt_imgr.get_image_info(
image_id, request=f"spec.virt-images.{image_name}.spec.volume.meta"
)
create_image = True if not volume["meta"]["allocated"] else False
else:
create_image = (
True if not storage.file_exists(params, image_filename) else False
)
else:
# FIXME: sync all volumes configurations
if params.get_boolean("multihost"):
vt_imgr.get_image_info(image_id)
# TODO: check if file allocated

if params.get("backup_image_before_testing", "no") == "yes":
# FIXME: add backup_image
image = qemu_storage.QemuImg(params, base_dir, image_name)
image.backup_image(params, base_dir, "backup", True, True)
if create_image:
if storage.file_exists(params, image_filename):
# As rbd image can not be covered, so need remove it if we need
# force create a new image.
storage.file_remove(params, image_filename)
image = qemu_storage.QemuImg(params, base_dir, image_name)
LOG.info("Create image on %s." % image.storage_type)
image.create(params)
if params.get_boolean("multihost"):
vt_imgr.handle_image(image_id, {"create": {}})
else:
if storage.file_exists(params, image_filename):
# As rbd image can not be covered, so need remove it if we need
# force create a new image.
storage.file_remove(params, image_filename)
image = qemu_storage.QemuImg(params, base_dir, image_name)
LOG.info("Create image on %s." % image.storage_type)
image.create(params)


def preprocess_fs_source(test, params, fs_name, vm_process_status=None):
Expand Down Expand Up @@ -443,7 +473,7 @@ def preprocess_vm(test, params, env, name):
)


def check_image(test, params, image_name, vm_process_status=None):
def check_image(test, params, image_name, vm_process_status=None, vm_name=None):
"""
Check a single QEMU image according to the instructions in params.
Expand All @@ -452,6 +482,7 @@ def check_image(test, params, image_name, vm_process_status=None):
:param vm_process_status: (optional) vm process status like running, dead
or None for no vm exist.
"""
params = params.object_params(image_name)
clone_master = params.get("clone_master", None)
base_dir = data_dir.get_data_dir()
check_image_flag = params.get("check_image") == "yes"
Expand Down Expand Up @@ -522,7 +553,7 @@ def check_image(test, params, image_name, vm_process_status=None):
raise e


def postprocess_image(test, params, image_name, vm_process_status=None):
def postprocess_image(test, params, image_name, vm_process_status=None, vm_name=None):
"""
Postprocess a single QEMU image according to the instructions in params.
Expand All @@ -539,6 +570,16 @@ def postprocess_image(test, params, image_name, vm_process_status=None):
)
return

# FIXME: multihost
image_id = None
if params.get_boolean("multihost"):
image_id = vt_imgr.query_image(image_name, vm_name)
if image_id is None:
LOG.warning(f"Cannot find the image {image_name}")
image_config = vt_imgr.define_image_config(image_name, params)
image_id = vt_imgr.create_image_object(image_config)
params = params.object_params(image_name)

restored, removed = (False, False)
clone_master = params.get("clone_master", None)
base_dir = params.get("images_base_dir", data_dir.get_data_dir())
Expand Down Expand Up @@ -596,10 +637,18 @@ def postprocess_image(test, params, image_name, vm_process_status=None):
)
LOG.info("Remove image on %s." % image.storage_type)
if clone_master is None:
image.remove()
if params.get_boolean("multihost"):
vt_imgr.handle_image(image_id, {"destroy": {}})
vt_imgr.destroy_image_object(image_id)
else:
image.remove()
elif clone_master == "yes":
if image_name in params.get("master_images_clone").split():
image.remove()
if params.get_boolean("multihost"):
vt_imgr.handle_image(image_id, {"destroy": {}})
vt_imgr.destroy_image_object(image_id)
else:
image.remove()


def postprocess_fs_source(test, params, fs_name, vm_process_status=None):
Expand Down Expand Up @@ -754,7 +803,7 @@ class _CreateImages(threading.Thread):
in self.exc_info
"""

def __init__(self, image_func, test, images, params, exit_event, vm_process_status):
def __init__(self, image_func, test, images, params, exit_event, vm_process_status, vm_name=None):
threading.Thread.__init__(self)
self.image_func = image_func
self.test = test
Expand All @@ -763,6 +812,7 @@ def __init__(self, image_func, test, images, params, exit_event, vm_process_stat
self.exit_event = exit_event
self.exc_info = None
self.vm_process_status = vm_process_status
self.vm_name = vm_name

def run(self):
try:
Expand All @@ -773,13 +823,14 @@ def run(self):
self.params,
self.exit_event,
self.vm_process_status,
self.vm_name,
)
except Exception:
self.exc_info = sys.exc_info()
self.exit_event.set()


def process_images(image_func, test, params, vm_process_status=None):
def process_images(image_func, test, params, vm_process_status=None, vm_name=None):
"""
Wrapper which chooses the best way to process images.
Expand All @@ -792,11 +843,11 @@ def process_images(image_func, test, params, vm_process_status=None):
images = params.objects("images")
if len(images) > 20: # Lets do it in parallel
_process_images_parallel(
image_func, test, params, vm_process_status=vm_process_status
image_func, test, params, vm_process_status=vm_process_status,vm_name=vm_name
)
else:
_process_images_serial(
image_func, test, images, params, vm_process_status=vm_process_status
image_func, test, images, params, vm_process_status=vm_process_status, vm_name=vm_name
)


Expand All @@ -816,7 +867,7 @@ def process_fs_sources(fs_source_func, test, params, vm_process_status=None):


def _process_images_serial(
image_func, test, images, params, exit_event=None, vm_process_status=None
image_func, test, images, params, exit_event=None, vm_process_status=None, vm_name=None
):
"""
Original process_image function, which allows custom set of images
Expand All @@ -829,14 +880,15 @@ def _process_images_serial(
or None for no vm exist.
"""
for image_name in images:
image_params = params.object_params(image_name)
image_func(test, image_params, image_name, vm_process_status)
# image_params = params.object_params(image_name)
# image_func(test, image_params, image_name, vm_process_status)
image_func(test, params, image_name, vm_process_status, vm_name)
if exit_event and exit_event.is_set():
LOG.error("Received exit_event, stop processing of images.")
break


def _process_images_parallel(image_func, test, params, vm_process_status=None):
def _process_images_parallel(image_func, test, params, vm_process_status=None, vm_name=None):
"""
The same as _process_images but in parallel.
:param image_func: Process function
Expand All @@ -852,7 +904,7 @@ def _process_images_parallel(image_func, test, params, vm_process_status=None):
for i in xrange(no_threads):
imgs = images[i::no_threads]
threads.append(
_CreateImages(image_func, test, imgs, params, exit_event, vm_process_status)
_CreateImages(image_func, test, imgs, params, exit_event, vm_process_status, vm_name)
)
threads[-1].start()

Expand Down Expand Up @@ -907,7 +959,7 @@ def _call_image_func():
unpause_vm = True
vm_params["skip_cluster_leak_warn"] = "yes"
try:
process_images(image_func, test, vm_params, vm_process_status)
process_images(image_func, test, vm_params, vm_process_status, vm_name)
finally:
if unpause_vm:
vm.resume()
Expand Down
4 changes: 4 additions & 0 deletions virttest/vt_agent/managers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from .connect import ConnectManager
from .console import ConsoleManager
from .image import ImageHandlerManager
from .resource_backing import ResourceBackingManager

connect_mgr = ConnectManager()
console_mgr = ConsoleManager()
resbacking_mgr = ResourceBackingManager()
image_handler_mgr = ImageHandlerManager()
26 changes: 26 additions & 0 deletions virttest/vt_agent/managers/image.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import logging

from .images import get_image_handler


LOG = logging.getLogger("avocado.agents." + __name__)


class ImageHandlerManager(object):

def __init__(self):
pass

def handle_image(self, image_config, config):
r, o = 0, dict()
try:
cmd, arguments = config.popitem()
image_type = image_config["meta"]["type"]
handler = get_image_handler(image_type, cmd)
ret = handler(image_config, arguments)
if ret:
o["out"] = ret
except Exception as e:
r, o["out"] = 1, str(e)
LOG.debug("Failed to handle image(%s): %s", str(e))
return r, o
16 changes: 16 additions & 0 deletions virttest/vt_agent/managers/images/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from .qemu import get_qemu_image_handler
# from .xen import get_xen_image_handler


_image_handler_getters = {
"qemu": get_qemu_image_handler,
# "xen": get_xen_image_handler,
}


def get_image_handler(image_type, cmd):
getter = _image_handler_getters.get(image_type)
return getter(cmd)


__all__ = ["get_image_handler"]
1 change: 1 addition & 0 deletions virttest/vt_agent/managers/images/qemu/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .qemu_image_handlers import get_qemu_image_handler
Loading

0 comments on commit 3232e25

Please sign in to comment.