From d503be16fff3efa736f91e93a25efa9d9764e6f3 Mon Sep 17 00:00:00 2001 From: Naresh Bannoth Date: Sun, 9 Jun 2024 11:54:55 +0530 Subject: [PATCH] Moving utilities for object based support. Moving the utilities to OOPs supported way to make them more useful and Robust. Signed-off-by: Naresh Bannoth --- avocado/utils/block_devices/__init__.py | 13 + avocado/utils/block_devices/disk.py | 491 ++++++++++++++++++++++ avocado/utils/disk.py | 493 ---------------------- avocado/utils/distro.py | 532 ------------------------ 4 files changed, 504 insertions(+), 1025 deletions(-) create mode 100644 avocado/utils/block_devices/__init__.py create mode 100644 avocado/utils/block_devices/disk.py delete mode 100644 avocado/utils/disk.py delete mode 100644 avocado/utils/distro.py diff --git a/avocado/utils/block_devices/__init__.py b/avocado/utils/block_devices/__init__.py new file mode 100644 index 0000000000..e0f45434f9 --- /dev/null +++ b/avocado/utils/block_devices/__init__.py @@ -0,0 +1,13 @@ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat Inc. 2024 +# Author: Naresh Bannoth diff --git a/avocado/utils/block_devices/disk.py b/avocado/utils/block_devices/disk.py new file mode 100644 index 0000000000..98f0a450b6 --- /dev/null +++ b/avocado/utils/block_devices/disk.py @@ -0,0 +1,491 @@ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# This code was inspired in the autotest project, +# +# client/base_utils.py +# +# Copyright: 2024 IBM +# Authors : Praveen K Pandey +# : Narasimhan V +# : Naresh Bannoth + + +""" +Disk utilities +""" + + +import json +import logging +import os +import re + +from avocado.utils import genio, multipath, process + +LOGGER = logging.getLogger(__name__) + + +class DiskError(Exception): + """ + Generic DiskError + """ + + +class DiskUtils: + """ + Perform Disk related operations. + """ + + def __init__(self, disk, mount_point): + """ + :param disk: Name of the disk, eg: /dev/nvme0n1, /dev/mapper/mpathX, + /dev/sdX etc.. + :type disk: str + :param mount_point: It is the directory path, where the disk going + mounted or disk already mounted on this dir + eg: /mnt, /root/mnt etc.. + :type mount_point: str + """ + self.disk = disk + self.mount_point = mount_point + + def freespace(self): + fs_stats = os.statvfs(self.disk) + return fs_stats.f_bsize * fs_stats.f_bavail + + def get_disk_blocksize(self): + """Return the disk block size, in bytes""" + fs_stats = os.statvfs(self.disk) + return fs_stats.f_bsize + + def create_loop_device(self, size, blocksize=4096, directory="./"): + """ + Creates a loop device of size and blocksize specified. + + :param size: Size of loop device, in bytes + :type size: int + :param blocksize: block size of loop device, in bytes. Defaults to 4096 + :type blocksize: int + :param directory: Directory where the backing file will be created. + Defaults to current directory. + :type directory: str + + :return: loop device name + :rtype: str + """ + cmd = "losetup --find" + loop = process.run(cmd, ignore_status=True, sudo=True).stdout_text + loop = loop.strip("\n") + + loop_file = os.path.join(directory, f"tmp_{loop.split('/')[-1]}.img") + cmd = ( + f"dd if=/dev/zero of={loop_file} bs={blocksize} " + f"count={int(size / blocksize)}" + ) + if process.system(cmd, ignore_status=True, sudo=True) != 0: + raise DiskError("Unable to create backing file for loop device") + + cmd = f"losetup {loop} {loop_file} -P" + if process.system(cmd, ignore_status=True, sudo=True) != 0: + raise DiskError("Unable to create the loop device") + return loop + + def delete_loop_device(self, loop_device): + """ + Deletes the specified loop device. + + :return: True if deleted. + :rtype: bool + """ + cmd = "losetup -aJl" + loop_dic = json.loads( + process.run(cmd, ignore_status=True, sudo=True).stdout_text + ) + loop_file = "" + for loop_dev in loop_dic["loopdevices"]: + if loop_device == loop_dev["name"]: + loop_file = loop_dev["back-file"] + if not loop_file: + raise DiskError("Unable to find backing file for loop device") + cmd = f"losetup -d {loop_device}" + if process.system(cmd, ignore_status=True, sudo=True) != 0: + raise DiskError("Unable to delete the loop device") + os.remove(loop_file) + return True + + def get_disks(self): + """ + Returns the physical "hard drives" available on this system + + This is a simple wrapper around `lsblk` and will return all the + top level physical (non-virtual) devices return by it. + + TODO: this is currently Linux specific. Support for other + platforms is desirable and may be implemented in the future. + + :returns: a list of paths to the physical disks on the system + :rtype: list of str + """ + try: + json_result = process.run("lsblk --json --paths --inverse") + except process.CmdError as ce: + raise DiskError(f"Error occurred while executing lsblk command: {ce}") + try: + json_data = json.loads(json_result.stdout_text) + except json.JSONDecodeError as je: + raise DiskError(f"Error occurred while parsing JSON data: {je}") + disks = [] + for device in json_data["blockdevices"]: + disks.append(device["name"]) + if "children" in device: + for child in device["children"]: + disks.append(child["name"]) + return disks + + def get_all_disk_paths(self): + """ + Returns all available disk names and alias on this system + + This will get all the sysfs disks name entries by its device + node name, by-uuid, by-id and by-path, irrespective of any + platform and device type + + :returns: a list of all disk path names + :rtype: list of str + """ + disk_list = abs_path = [] + for path in [ + "/dev", + "/dev/mapper", + "/dev/disk/by-id", + "/dev/disk/by-path", + "/dev/disk/by-uuid", + "/dev/disk/by-partuuid", + "/dev/disk/by-partlabel", + ]: + if os.path.exists(path): + for device in os.listdir(path): + abs_path.append(os.path.join(path, device)) + disk_list.extend(abs_path) + return disk_list + + def get_absolute_disk_path(self): + """ + Returns absolute device path of given disk + + This will get actual disks path of given device, it can take + node name, by-uuid, by-id and by-path, irrespective of any + platform and device type + + :param device: disk name or disk alias names sda or scsi-xxx + :type device: str + + :returns: the device absolute path name + :rtype: bool + """ + if not os.path.exists(self.disk): + for dev_path in self.get_all_disk_paths(): + if self.disk == os.path.basename(dev_path): + return dev_path + return self.disk + + def get_available_filesystems(self): + """ + Return a list of all available filesystem types + + :returns: a list of filesystem types + :rtype: list of str + """ + filesystems = set() + with open("/proc/filesystems") as proc_fs: # pylint: disable=W1514 + for proc_fs_line in proc_fs.readlines(): + filesystems.add(re.sub(r"(nodev)?\s*", "", proc_fs_line)) + return list(filesystems) + + def get_filesystem_type(self): + """ + Returns the type of the filesystem of mount point informed. + The default mount point considered when none is informed + is the root "/" mount point. + + :param str mount_point: mount point to asses the filesystem type. + Default "/" + + :returns: filesystem type + :rtype: str + """ + if self.mount_point is None or self.is_dir_mounted() is False: + self.mount_point = "/" + with open("/proc/mounts") as mounts: # pylint: disable=W1514 + for mount_line in mounts.readlines(): + _, fs_file, fs_vfstype, _, _, _ = mount_line.split() + if fs_file == self.mount_point: + return fs_vfstype + + def is_root_device(self): + """ + check for root disk + + :param device: device to check + + :returns: True or False, True if given device is root disk + otherwise will return False. + """ + cmd = "lsblk --j -o MOUNTPOINT,PKNAME" + output = process.run(cmd) + result = json.loads(output.stdout_text) + for item in result["blockdevices"]: + if item["mountpoint"] == "/" and self.disk == str(item["pkname"]): + return True + return False + + def is_disk_mounted(self): + """ + check if given disk is mounted or not + + :param device: disk/device name + :type device: str + + :returns: True if the device/disk is mounted else False + :rtype: bool + """ + with open("/proc/mounts") as mounts: # pylint: disable=W1514 + for mount_line in mounts.readlines(): + dev, _, _, _, _, _ = mount_line.split() + if dev == self.disk: + return True + return False + + def is_dir_mounted(self): + """ + check if given directory is mounted or not + + :param dir_path: directory path + :type dir_path: str + + :returns: True if the given director is mounted else False + :rtype: bool + """ + with open("/proc/mounts") as mounts: # pylint: disable=W1514 + for mount_line in mounts.readlines(): + _, fs_dir, _, _, _, _ = mount_line.split() + if fs_dir == self.mount_point: + return True + return False + + def fs_exists(self): + """ + check if filesystem exists on give disk/device + + :param device: disk/device name + :type device: str + + :returns: returns True if filesystem exists on the give disk else False + :rtype: bool + """ + cmd = f"blkid -o value -s TYPE {self.disk}" + out = process.system_output(cmd, shell=True, ignore_status=True).decode("utf-8") + fs_list = ["ext2", "ext3", "ext4", "xfs", "btrfs"] + if out in fs_list: + return True + return False + + def get_dir_mountpoint(self): + """ + get mounted disk name that is mounted on given dir_path + + :param dir_path: absolute directory path + :type dir_path: str + + :returns: returns disk name which mounted on given dir_path + :rtype: str + """ + with open("/proc/mounts") as mounts: # pylint: disable=W1514 + for mount_line in mounts.readlines(): + dev, fs_dir, _, _, _, _ = mount_line.split() + if fs_dir == self.mount_point: + return dev + return None + + def get_disk_mountpoint(self): + """ + get mountpoint on which given disk is mounted + + :param device: disk/device name + :type device: str + + :return: return directory name on which disk is mounted + :rtype: str + """ + with open("/proc/mounts") as mounts: # pylint: disable=W1514 + for mount_line in mounts.readlines(): + dev, fs_dir, _, _, _, _ = mount_line.split() + if dev == self.disk: + return fs_dir + return None + + def create_linux_raw_partition(self, size=None, num_of_par=1): + """ + Creates partitions using sfdisk command + + :param disk_name: disk/device name + :type disk_name: str + :param size: size of partition + :type size: str + :param num_of_par: Number of partitions to be created + :type num_of_par: int + + Returns list of created partitions + """ + if not size: + size = self.get_size_of_disk(self.disk) / 1073741824 + size = size / num_of_par + size = str(size) + "G" + partitions = [ + "size= +" + size if val != 3 else "type=5" + for val in range(0, num_of_par + 1) + ] + disk_partition_file = ( + "/tmp/creat_partition" + + process.run("date '+%d-%m-%y_%T'").stdout_text.strip() + ) + if not os.path.isfile(disk_partition_file): + process.run("touch " + disk_partition_file) + for line in partitions: + genio.append_one_line(disk_partition_file, line) + try: + cmd = "sfdisk " + self.disk + " < " + disk_partition_file + part_output = process.getoutput(cmd) + except process.CmdError as ce: + msg = f"sfdisk partition creation command failed on disk {self.disk}" + LOGGER.warning(msg) + raise DiskError(msg) + self.rescan_disk(self.disk) + if "The partition table has been altered" in part_output: + return self.get_disk_partitions(self.disk) + + def get_size_of_disk(self): + """ + Returns size of disk in bytes + + :param disk: disk/device name + :type disk: str + + Return Type: int + """ + return int(process.getoutput("lsblk -b --output SIZE -n -d " + self.disk)) + + def delete_partition(self, partition_name): + """ + Deletes mentioned partition from disk + + :param partition_name: partition absolute path + :type partition_name: str + """ + disk_index = re.search(r"\d+", partition_name).start() + try: + process.run( + "sfdisk --delete " + + partition_name[:disk_index] + + " " + + partition_name[disk_index:] + ) + except process.CmdError as ce: + msg = f"sfdisk --delete command failed on disk {partition_name}" + LOGGER.warning(msg) + raise DiskError(msg) + + def clean_disk(self): + """ + Cleans partitions table of a disk + + :param disk_name: disk name + :type disk_name: str + """ + output = process.getoutput("sfdisk --delete " + self.disk) + self.rescan_disk(self.disk) + if not self.get_disk_partitions(self.disk): + if "The partition table has been altered" in output: + process.run("wipefs -af " + self.disk) + + def rescan_disk(self): + """ + Rescans disk + + :param disk_name: disk name + :type disk_name: str + """ + self.disk = os.path.realpath(self.disk) + if re.search(r"dm-\d+", self.disk): + mpath_dict = multipath.get_multipath_details() + for _ in range(len(mpath_dict["maps"])): + if mpath_dict["maps"][_]["sysfs"] == self.disk.split("/")[-1]: + self.disk = ( + "/dev/" + + mpath_dict["maps"][_]["path_groups"][0]["paths"][0]["dev"] + ) + break + process.run(f"echo 1 > /sys/block/{self.disk}/device/rescan") + + def get_disk_partitions(self): + """ + Returns partitions of a disk excluding extended partition + + :param disk: disk name + :type disk: str + + Returns array with all partitions of disk + """ + self.rescan_disk(self.disk) + partitions_op = process.getoutput("sfdisk -l " + self.disk) + return [ + line.split(" ")[0] + for line in partitions_op.split("\n") + if line.startswith(self.disk) and "Extended" not in line + ] + + def get_io_scheduler_list(self): + """ + Returns io scheduler available for the IO Device + :param device_name: Device name example like sda + :return: list of IO scheduler + """ + names = open(self.__sched_path(self.disk), "r", encoding="utf-8").read() + return names.translate(str.maketrans("[]", " ")).split() + + def get_io_scheduler(self): + """ + Return io scheduler name which is set currently for device + :param device_name: Device name example like sda + :return: IO scheduler + :rtype : str + """ + return re.split( + r"[\[\]]", open(self.__sched_path(self.disk), "r", encoding="utf-8").read() + )[1] + + def __sched_path(self): + + file_path = f"/sys/block/{self.disk}/queue/scheduler" + return file_path + + def set_io_scheduler(self, name): + """ + Set io scheduler to a device + :param device_name: Device name example like sda + :param name: io scheduler name + """ + if name not in self.get_io_scheduler_list(self.disk): + raise DiskError(f"No such IO scheduler: {name}") + + with open(self.__sched_path(self.disk), "w", encoding="utf-8") as fp: + fp.write(name) diff --git a/avocado/utils/disk.py b/avocado/utils/disk.py deleted file mode 100644 index 9954f3f37f..0000000000 --- a/avocado/utils/disk.py +++ /dev/null @@ -1,493 +0,0 @@ -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -# -# See LICENSE for more details. -# -# This code was inspired in the autotest project, -# -# client/base_utils.py -# -# Copyright: 2018 IBM -# Authors : Praveen K Pandey -# : Narasimhan V -# : Naresh Bannoth - - -""" -Disk utilities -""" - - -import json -import logging -import os -import re - -from avocado.utils import genio, multipath, process - -LOGGER = logging.getLogger(__name__) - - -class DiskError(Exception): - """ - Generic DiskError - """ - - -def freespace(path): - fs_stats = os.statvfs(path) - return fs_stats.f_bsize * fs_stats.f_bavail - - -def get_disk_blocksize(path): - """Return the disk block size, in bytes""" - fs_stats = os.statvfs(path) - return fs_stats.f_bsize - - -def create_loop_device(size, blocksize=4096, directory="./"): - """ - Creates a loop device of size and blocksize specified. - - :param size: Size of loop device, in bytes - :type size: int - :param blocksize: block size of loop device, in bytes. Defaults to 4096 - :type blocksize: int - :param directory: Directory where the backing file will be created. - Defaults to current directory. - :type directory: str - - :return: loop device name - :rtype: str - """ - cmd = "losetup --find" - loop = process.run(cmd, ignore_status=True, sudo=True).stdout_text.strip("\n") - - loop_file = os.path.join(directory, f"tmp_{loop.split('/')[-1]}.img") - cmd = ( - f"dd if=/dev/zero of={loop_file} bs={blocksize} " - f"count={int(size / blocksize)}" - ) - if process.system(cmd, ignore_status=True, sudo=True) != 0: - raise DiskError("Unable to create backing file for loop device") - - cmd = f"losetup {loop} {loop_file} -P" - if process.system(cmd, ignore_status=True, sudo=True) != 0: - raise DiskError("Unable to create the loop device") - return loop - - -def delete_loop_device(device): - """ - Deletes the specified loop device. - - :param device: device to be deleted - :type device: str - - :return: True if deleted. - :rtype: bool - """ - cmd = "losetup -aJl" - loop_dic = json.loads(process.run(cmd, ignore_status=True, sudo=True).stdout_text) - loop_file = "" - for loop_dev in loop_dic["loopdevices"]: - if device == loop_dev["name"]: - loop_file = loop_dev["back-file"] - if not loop_file: - raise DiskError("Unable to find backing file for loop device") - cmd = f"losetup -d {device}" - if process.system(cmd, ignore_status=True, sudo=True) != 0: - raise DiskError("Unable to delete the loop device") - os.remove(loop_file) - return True - - -def get_disks(): - """ - Returns the physical "hard drives" available on this system - - This is a simple wrapper around `lsblk` and will return all the - top level physical (non-virtual) devices return by it. - - TODO: this is currently Linux specific. Support for other - platforms is desirable and may be implemented in the future. - - :returns: a list of paths to the physical disks on the system - :rtype: list of str - """ - try: - json_result = process.run("lsblk --json --paths --inverse") - except process.CmdError as ce: - raise DiskError(f"Error occurred while executing lsblk command: {ce}") - try: - json_data = json.loads(json_result.stdout_text) - except json.JSONDecodeError as je: - raise DiskError(f"Error occurred while parsing JSON data: {je}") - disks = [] - for device in json_data["blockdevices"]: - disks.append(device["name"]) - if "children" in device: - for child in device["children"]: - disks.append(child["name"]) - return disks - - -def get_all_disk_paths(): - """ - Returns all available disk names and alias on this system - - This will get all the sysfs disks name entries by its device - node name, by-uuid, by-id and by-path, irrespective of any - platform and device type - - :returns: a list of all disk path names - :rtype: list of str - """ - disk_list = abs_path = [] - for path in [ - "/dev", - "/dev/mapper", - "/dev/disk/by-id", - "/dev/disk/by-path", - "/dev/disk/by-uuid", - "/dev/disk/by-partuuid", - "/dev/disk/by-partlabel", - ]: - if os.path.exists(path): - for device in os.listdir(path): - abs_path.append(os.path.join(path, device)) - disk_list.extend(abs_path) - return disk_list - - -def get_absolute_disk_path(device): - """ - Returns absolute device path of given disk - - This will get actual disks path of given device, it can take - node name, by-uuid, by-id and by-path, irrespective of any - platform and device type - - :param device: disk name or disk alias names sda or scsi-xxx - :type device: str - - :returns: the device absolute path name - :rtype: bool - """ - if not os.path.exists(device): - for dev_path in get_all_disk_paths(): - if device == os.path.basename(dev_path): - return dev_path - return device - - -def get_available_filesystems(): - """ - Return a list of all available filesystem types - - :returns: a list of filesystem types - :rtype: list of str - """ - filesystems = set() - with open("/proc/filesystems") as proc_fs: # pylint: disable=W1514 - for proc_fs_line in proc_fs.readlines(): - filesystems.add(re.sub(r"(nodev)?\s*", "", proc_fs_line)) - return list(filesystems) - - -def get_filesystem_type(mount_point="/"): - """ - Returns the type of the filesystem of mount point informed. - The default mount point considered when none is informed - is the root "/" mount point. - - :param str mount_point: mount point to asses the filesystem type. - Default "/" - - :returns: filesystem type - :rtype: str - """ - with open("/proc/mounts") as mounts: # pylint: disable=W1514 - for mount_line in mounts.readlines(): - _, fs_file, fs_vfstype, _, _, _ = mount_line.split() - if fs_file == mount_point: - return fs_vfstype - - -def is_root_device(device): - """ - check for root disk - - :param device: device to check - - :returns: True or False, True if given device is root disk - otherwise will return False. - """ - cmd = "lsblk --j -o MOUNTPOINT,PKNAME" - output = process.run(cmd) - result = json.loads(output.stdout_text) - for item in result["blockdevices"]: - if item["mountpoint"] == "/" and device == str(item["pkname"]): - return True - return False - - -def is_disk_mounted(device): - """ - check if given disk is mounted or not - - :param device: disk/device name - :type device: str - - :returns: True if the device/disk is mounted else False - :rtype: bool - """ - with open("/proc/mounts") as mounts: # pylint: disable=W1514 - for mount_line in mounts.readlines(): - dev, _, _, _, _, _ = mount_line.split() - if dev == device: - return True - return False - - -def is_dir_mounted(dir_path): - """ - check if given directory is mounted or not - - :param dir_path: directory path - :type dir_path: str - - :returns: True if the given director is mounted else False - :rtype: bool - """ - with open("/proc/mounts") as mounts: # pylint: disable=W1514 - for mount_line in mounts.readlines(): - _, fs_dir, _, _, _, _ = mount_line.split() - if fs_dir == dir_path: - return True - return False - - -def fs_exists(device): - """ - check if filesystem exists on give disk/device - - :param device: disk/device name - :type device: str - - :returns: returns True if filesystem exists on the give disk else False - :rtype: bool - """ - cmd = f"blkid -o value -s TYPE {device}" - out = process.system_output(cmd, shell=True, ignore_status=True).decode("utf-8") - fs_list = ["ext2", "ext3", "ext4", "xfs", "btrfs"] - if out in fs_list: - return True - return False - - -def get_dir_mountpoint(dir_path): - """ - get mounted disk name that is mounted on given dir_path - - :param dir_path: absolute directory path - :type dir_path: str - - :returns: returns disk name which mounted on given dir_path - :rtype: str - """ - with open("/proc/mounts") as mounts: # pylint: disable=W1514 - for mount_line in mounts.readlines(): - dev, fs_dir, _, _, _, _ = mount_line.split() - if fs_dir == dir_path: - return dev - return None - - -def get_disk_mountpoint(device): - """ - get mountpoint on which given disk is mounted - - :param device: disk/device name - :type device: str - - :return: return directory name on which disk is mounted - :rtype: str - """ - with open("/proc/mounts") as mounts: # pylint: disable=W1514 - for mount_line in mounts.readlines(): - dev, fs_dir, _, _, _, _ = mount_line.split() - if dev == device: - return fs_dir - return None - - -def create_linux_raw_partition(disk_name, size=None, num_of_par=1): - """ - Creates partitions using sfdisk command - - :param disk_name: disk/device name - :type disk_name: str - :param size: size of partition - :type size: str - :param num_of_par: Number of partitions to be created - :type num_of_par: int - - Returns list of created partitions - """ - if not size: - size = get_size_of_disk(disk_name) / 1073741824 - size = size / num_of_par - size = str(size) + "G" - partitions = [ - "size= +" + size if val != 3 else "type=5" for val in range(0, num_of_par + 1) - ] - disk_partition_file = ( - "/tmp/creat_partition" + process.run("date '+%d-%m-%y_%T'").stdout_text.strip() - ) - if not os.path.isfile(disk_partition_file): - process.run("touch " + disk_partition_file) - for line in partitions: - genio.append_one_line(disk_partition_file, line) - try: - part_output = process.getoutput( - "sfdisk " + disk_name + " < " + disk_partition_file - ) - except: - msg = f"sfdisk partition creation command failed on disk {disk_name}" - LOGGER.warning(msg) - raise DiskError(msg) - rescan_disk(disk_name) - if "The partition table has been altered" in part_output: - return get_disk_partitions(disk_name) - - -def get_size_of_disk(disk): - """ - Returns size of disk in bytes - - :param disk: disk/device name - :type disk: str - - Return Type: int - """ - return int(process.getoutput("lsblk -b --output SIZE -n -d " + disk)) - - -def delete_partition(partition_name): - """ - Deletes mentioned partition from disk - - :param partition_name: partition absolute path - :type partition_name: str - """ - disk_index = re.search(r"\d+", partition_name).start() - try: - process.run( - "sfdisk --delete " - + partition_name[:disk_index] - + " " - + partition_name[disk_index:] - ) - except: - msg = f"sfdisk --delete command failed on disk {partition_name}" - LOGGER.warning(msg) - raise DiskError(msg) - - -def clean_disk(disk_name): - """ - Cleans partitions table of a disk - - :param disk_name: disk name - :type disk_name: str - """ - output = process.getoutput("sfdisk --delete " + disk_name) - rescan_disk(disk_name) - if not get_disk_partitions(disk_name): - if "The partition table has been altered" in output: - process.run("wipefs -af " + disk_name) - - -def rescan_disk(disk_name): - """ - Rescans disk - - :param disk_name: disk name - :type disk_name: str - """ - disk_name = os.path.realpath(disk_name) - if re.search(r"dm-\d+", disk_name): - mpath_dict = multipath.get_multipath_details() - for _ in range(len(mpath_dict["maps"])): - if mpath_dict["maps"][_]["sysfs"] == disk_name.split("/")[-1]: - disk_name = ( - "/dev/" + mpath_dict["maps"][_]["path_groups"][0]["paths"][0]["dev"] - ) - break - process.run(f"echo 1 > /sys/block/{disk_name}/device/rescan") - - -def get_disk_partitions(disk): - """ - Returns partitions of a disk excluding extended partition - - :param disk: disk name - :type disk: str - - Returns array with all partitions of disk - """ - rescan_disk(disk) - partitions_op = process.getoutput("sfdisk -l " + disk) - return [ - line.split(" ")[0] - for line in partitions_op.split("\n") - if line.startswith(disk) and "Extended" not in line - ] - - -def get_io_scheduler_list(device_name): - """ - Returns io scheduler available for the IO Device - :param device_name: Device name example like sda - :return: list of IO scheduler - """ - names = open(__sched_path(device_name), "r", encoding="utf-8").read() - return names.translate(str.maketrans("[]", " ")).split() - - -def get_io_scheduler(device_name): - """ - Return io scheduler name which is set currently for device - :param device_name: Device name example like sda - :return: IO scheduler - :rtype : str - """ - return re.split( - r"[\[\]]", open(__sched_path(device_name), "r", encoding="utf-8").read() - )[1] - - -def __sched_path(device_name): - - file_path = f"/sys/block/{device_name}/queue/scheduler" - return file_path - - -def set_io_scheduler(device_name, name): - """ - Set io scheduler to a device - :param device_name: Device name example like sda - :param name: io scheduler name - """ - if name not in get_io_scheduler_list(device_name): - raise DiskError(f"No such IO scheduler: {name}") - - with open(__sched_path(device_name), "w", encoding="utf-8") as fp: - fp.write(name) diff --git a/avocado/utils/distro.py b/avocado/utils/distro.py deleted file mode 100644 index dcc25942c0..0000000000 --- a/avocado/utils/distro.py +++ /dev/null @@ -1,532 +0,0 @@ -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -# -# See LICENSE for more details. -# -# Copyright: Red Hat Inc. 2013-2014 -# Author: Lucas Meneghel Rodrigues - -""" -This module provides the client facilities to detect the Linux Distribution -it's running under. -""" - -import logging -import os -import platform -import re - -LOGGER = logging.getLogger(__name__) - -__all__ = [ - "LinuxDistro", - "UNKNOWN_DISTRO_NAME", - "UNKNOWN_DISTRO_VERSION", - "UNKNOWN_DISTRO_RELEASE", - "UNKNOWN_DISTRO_ARCH", - "Probe", - "register_probe", - "detect", -] - - -# pylint: disable=R0903 -class LinuxDistro: - """ - Simple collection of information for a Linux Distribution - """ - - def __init__(self, name, version, release, arch): - """ - Initializes a new Linux Distro - - :param name: a short name that precisely distinguishes this Linux - Distribution among all others. - :type name: str - :param version: the major version of the distribution. Usually this - is a single number that denotes a large development - cycle and support file. - :type version: str - :param release: the release or minor version of the distribution. - Usually this is also a single number, that is often - omitted or starts with a 0 when the major version - is initially release. It's often associated with a - shorter development cycle that contains incremental - a collection of improvements and fixes. - :type release: str - :param arch: the main target for this Linux Distribution. It's common - for some architectures to ship with packages for - previous and still compatible architectures, such as it's - the case with Intel/AMD 64 bit architecture that support - 32 bit code. In cases like this, this should be set to - the 64 bit architecture name. - :type arch: str - """ - self.name = name - self.version = version - self.release = release - self.arch = arch - - def __repr__(self): - return ( - f"" - ) - - -UNKNOWN_DISTRO_NAME = "unknown" -UNKNOWN_DISTRO_VERSION = 0 -UNKNOWN_DISTRO_RELEASE = 0 -UNKNOWN_DISTRO_ARCH = "unknown" - - -#: The distribution that is used when the exact one could not be found -UNKNOWN_DISTRO = LinuxDistro( - UNKNOWN_DISTRO_NAME, - UNKNOWN_DISTRO_VERSION, - UNKNOWN_DISTRO_RELEASE, - UNKNOWN_DISTRO_ARCH, -) - - -class Probe: - """ - Probes the machine and does it best to confirm it's the right distro. - If given an avocado.utils.ssh.Session object representing another machine, Probe - will attempt to detect another machine's distro via an ssh connection. - """ - - #: Points to a file that can determine if this machine is running a given - #: Linux Distribution. This servers a first check that enables the extra - #: checks to carry on. - CHECK_FILE = None - - #: Sets the content that should be checked on the file pointed to by - #: :attr:`CHECK_FILE_EXISTS`. Leave it set to `None` (its default) - #: to check only if the file exists, and not check its contents - CHECK_FILE_CONTAINS = None - - #: The name of the Linux Distribution to be returned if the file defined - #: by :attr:`CHECK_FILE_EXISTS` exist. - CHECK_FILE_DISTRO_NAME = None - - #: A regular expression that will be run on the file pointed to by - #: :attr:`CHECK_FILE_EXISTS` - CHECK_VERSION_REGEX = None - - def __init__(self, session=None): - self.score = 0 - self.session = session - - def check_for_remote_file(self, file_name): - """ - Checks if provided file exists in remote machine - - :param file_name: name of file - :type file_name: str - :returns: whether the file exists in remote machine or not - :rtype: bool - """ - if self.session and self.session.cmd(f"test -f {file_name}").exit_status == 0: - return True - else: - return False - - def check_name_for_file(self): - """ - Checks if this class will look for a file and return a distro - - The conditions that must be true include the file that identifies the - distro file being set (:attr:`CHECK_FILE`) and the name of the - distro to be returned (:attr:`CHECK_FILE_DISTRO_NAME`) - """ - if self.CHECK_FILE is None: - return False - - if self.CHECK_FILE_DISTRO_NAME is None: - return False - - return True - - def name_for_file(self): - """ - Get the distro name if the :attr:`CHECK_FILE` is set and exists - """ - if self.check_name_for_file(): - if self.check_for_remote_file(self.CHECK_FILE): - return self.CHECK_FILE_DISTRO_NAME - elif os.path.exists(self.CHECK_FILE): - return self.CHECK_FILE_DISTRO_NAME - - def check_name_for_file_contains(self): - """ - Checks if this class will look for text on a file and return a distro - - The conditions that must be true include the file that identifies the - distro file being set (:attr:`CHECK_FILE`), the text to look for - inside the distro file (:attr:`CHECK_FILE_CONTAINS`) and the name - of the distro to be returned (:attr:`CHECK_FILE_DISTRO_NAME`) - """ - if self.CHECK_FILE is None: - return False - - if self.CHECK_FILE_CONTAINS is None: - return False - - if self.CHECK_FILE_DISTRO_NAME is None: - return False - - return True - - def name_for_file_contains(self): - """ - Get the distro if the :attr:`CHECK_FILE` is set and has content - """ - if self.check_name_for_file_contains(): - check_file = None - if self.check_for_remote_file(self.CHECK_FILE): - check_file = self.session.cmd( - f"cat {self.CHECK_FILE}" - ).stdout_text.split("/n") - elif os.path.exists(self.CHECK_FILE): - try: - check_file = open(self.CHECK_FILE, encoding="utf-8") - except IOError as err: - LOGGER.debug("Could not open %s", self.CHECK_FILE) - LOGGER.debug("Exception: %s", str(err)) - return None - - if not check_file: - return None - - for line in check_file: - if self.CHECK_FILE_CONTAINS in line: - return self.CHECK_FILE_DISTRO_NAME - - def check_version(self): - """ - Checks if this class will look for a regex in file and return a distro - """ - if self.CHECK_FILE is None: - return False - - if self.CHECK_VERSION_REGEX is None: - return False - - return True - - def _get_version_match(self): - """ - Returns the match result for the version regex on the file content - """ - if self.check_version(): - if self.session: - if self.session.cmd(f"test -f {self.CHECK_FILE}").exit_status != 0: - return None - elif not os.path.exists(self.CHECK_FILE): - return None - - version_file_content = None - if self.session: - version_file_content = self.session.cmd( - f"cat {self.CHECK_FILE}" - ).stdout_text - else: - try: - version_file_content = open( - self.CHECK_FILE, encoding="utf-8" - ).read() - except IOError as err: - LOGGER.debug("Could not open %s", self.CHECK_FILE) - LOGGER.debug("Exception: %s", str(err)) - return None - - return self.CHECK_VERSION_REGEX.match(version_file_content) - - def version(self): - """ - Returns the version of the distro - """ - version = UNKNOWN_DISTRO_VERSION - match = self._get_version_match() - if match is not None: - if len(match.groups()) > 0: - version = match.groups()[0] - return version - - def check_release(self): - """ - Checks if this has the conditions met to look for the release number - """ - return self.check_version() and self.CHECK_VERSION_REGEX.groups > 1 - - def release(self): - """ - Returns the release of the distro - """ - release = UNKNOWN_DISTRO_RELEASE - match = self._get_version_match() - if match is not None: - if len(match.groups()) > 1: - if match.groups()[1]: - release = match.groups()[1] - return release - - def get_distro(self): - """ - :param session: ssh connection between another machine - - Returns the :class:`LinuxDistro` this probe detected - """ - name = None - version = UNKNOWN_DISTRO_VERSION - release = UNKNOWN_DISTRO_RELEASE - - distro = None - - if self.check_name_for_file(): - name = self.name_for_file() - self.score += 1 - - if self.check_name_for_file_contains(): - name = self.name_for_file_contains() - self.score += 1 - - if self.check_version(): - version = self.version() - self.score += 1 - - if self.check_release(): - release = self.release() - self.score += 1 - - if self.session: - arch = self.session.cmd("uname -m").stdout_text - else: - arch = platform.machine() - - # name is the first thing that should be identified. If we don't know - # the distro name, we don't bother checking for versions - if name is not None: - distro = LinuxDistro(name, version, release, arch) - else: - distro = UNKNOWN_DISTRO - - return distro - - -class RedHatProbe(Probe): - """ - Probe with version checks for Red Hat Enterprise Linux systems - """ - - CHECK_FILE = "/etc/redhat-release" - CHECK_FILE_CONTAINS = "Red Hat Enterprise Linux" - CHECK_FILE_DISTRO_NAME = "rhel" - CHECK_VERSION_REGEX = re.compile( - r"Red Hat Enterprise Linux\s+\w*\s*release\s+(\d{1,2})\.(\d{1,2}).*" - ) - - -class CentosProbe(RedHatProbe): - """ - Probe with version checks for CentOS systems - """ - - CHECK_FILE = "/etc/redhat-release" - CHECK_FILE_CONTAINS = "CentOS Linux" - CHECK_FILE_DISTRO_NAME = "centos" - CHECK_VERSION_REGEX = re.compile(r"CentOS Linux release (\d{1,2})\.(\d{1,2}).*") - - -class CentosStreamProbe(RedHatProbe): - """ - Probe with version checks for CentOS Stream systems - """ - - CHECK_FILE = "/etc/redhat-release" - CHECK_FILE_CONTAINS = "CentOS Stream" - CHECK_FILE_DISTRO_NAME = "centos-stream" - CHECK_VERSION_REGEX = re.compile(r"CentOS Stream release (\d{1,2})") - - -class FedoraProbe(RedHatProbe): - """ - Probe with version checks for Fedora systems - """ - - CHECK_FILE = "/etc/fedora-release" - CHECK_FILE_CONTAINS = "Fedora" - CHECK_FILE_DISTRO_NAME = "fedora" - CHECK_VERSION_REGEX = re.compile(r"Fedora release (\d{1,2}).*") - - -class AmazonLinuxProbe(Probe): - """ - Probe for Amazon Linux systems - """ - - CHECK_FILE = "/etc/os-release" - CHECK_FILE_CONTAINS = "Amazon Linux" - CHECK_FILE_DISTRO_NAME = "amzn" - CHECK_VERSION_REGEX = re.compile( - r".*VERSION=\"(\d+)\.?(\d+)?\".*", re.MULTILINE | re.DOTALL - ) - - -class DebianProbe(Probe): - """ - Simple probe with file checks for Debian systems - """ - - CHECK_FILE = "/etc/debian_version" - CHECK_FILE_DISTRO_NAME = "debian" - CHECK_VERSION_REGEX = re.compile(r"(\d+)\.(\d+)") - - -class UbuntuProbe(Probe): - """ - Simple probe for Ubuntu systems in general - """ - - CHECK_FILE = "/etc/os-release" - CHECK_FILE_CONTAINS = "ubuntu" - CHECK_FILE_DISTRO_NAME = "Ubuntu" - CHECK_VERSION_REGEX = re.compile( - r".*VERSION_ID=\"(\d+)\.(\d+)\".*", re.MULTILINE | re.DOTALL - ) - - -class SUSEProbe(Probe): - """ - Simple probe for SUSE systems in general - """ - - CHECK_FILE = "/etc/os-release" - CHECK_FILE_CONTAINS = "SUSE" - # this is the (incorrect) spelling used in python's platform - # and tests are looking for it in distro.name. So keep using it - CHECK_FILE_DISTRO_NAME = "SuSE" - - def get_distro(self): - distro = super().get_distro() - - # if the default methods find SUSE, detect version - if not distro.name == self.CHECK_FILE_DISTRO_NAME: - return distro - - # we need to check VERSION_ID, which is number - VERSION can - # be a string - - # for openSUSE Tumbleweed this will be e.g. 20161225 - # for openSUSE Leap this will be e.g. 42.2 - # for SUSE Linux Enterprise this will be e.g. 12 or 12.2 (for SP2) - version_id_re = re.compile(r'VERSION_ID="([\d\.]*)"') - version_id = None - - with open(self.CHECK_FILE, encoding="utf-8") as check_file: - for line in check_file: - match = version_id_re.match(line) - if match: - version_id = match.group(1) - - if version_id: - version_parts = version_id.split(".") - distro.version = int(version_parts[0]) - if len(version_parts) > 1: - distro.release = int(version_parts[1]) - - return distro - - -class OpenEulerProbe(Probe): - """ - Simple probe for openEuler systems in general - """ - - CHECK_FILE = "/etc/openEuler-release" - CHECK_FILE_CONTAINS = "openEuler release" - CHECK_FILE_DISTRO_NAME = "openEuler" - CHECK_VERSION_REGEX = re.compile(r"openEuler release (\d+)\.(\d+).*") - - -class UnionTechProbe(Probe): - """ - Simple probe for UnionTech systems in general - """ - - CHECK_FILE = "/etc/os-version" - CHECK_FILE_CONTAINS = "UnionTech OS" - CHECK_FILE_DISTRO_NAME = "uos" - CHECK_VERSION_REGEX = re.compile(r"MinorVersion=(\d+)") - - -#: the complete list of probes that have been registered -REGISTERED_PROBES = [] - - -def register_probe(probe_class): - """ - Register a probe to be run during autodetection - """ - if probe_class not in REGISTERED_PROBES: - REGISTERED_PROBES.append(probe_class) - - -register_probe(RedHatProbe) -register_probe(CentosProbe) -register_probe(CentosStreamProbe) -register_probe(FedoraProbe) -register_probe(AmazonLinuxProbe) -register_probe(DebianProbe) -register_probe(SUSEProbe) -register_probe(UbuntuProbe) -register_probe(OpenEulerProbe) -register_probe(UnionTechProbe) - - -def detect(session=None): - """ - Attempts to detect the Linux Distribution running on this machine. - - If given an avocado.utils.ssh.Session object, it will attempt to detect the - distro of another machine via an ssh connection. - - :param session: ssh connection between another machine - :type session: avocado.utils.ssh.Session - :returns: the detected :class:`LinuxDistro` or :data:`UNKNOWN_DISTRO` - :rtype: :class:`LinuxDistro` - """ - results = [] - - for probe_class in REGISTERED_PROBES: - probe_instance = probe_class(session) - distro_result = probe_instance.get_distro() - if distro_result is not UNKNOWN_DISTRO: - results.append((distro_result, probe_instance)) - - results.sort(key=lambda t: t[1].score) - if len(results) > 0: - distro = results[-1][0] - else: - distro = UNKNOWN_DISTRO - - return distro - - -class Spec: - """ - Describes a distro, usually for setting minimum distro requirements - """ - - def __init__(self, name, min_version=None, min_release=None, arch=None): - self.name = name - self.min_version = min_version - self.min_release = min_release - self.arch = arch