diff --git a/avocado/utils/pci.py b/avocado/utils/pci.py index 760f1a179e..a4d8e690dc 100644 --- a/avocado/utils/pci.py +++ b/avocado/utils/pci.py @@ -24,7 +24,7 @@ import os import re -from avocado.utils import genio, process +from avocado.utils import genio, process, wait def get_domains(): @@ -320,6 +320,169 @@ def get_driver(pci_address): return line.rsplit(None, 1)[-1] +def unbind(driver, full_pci_address): + """ + Unbind the pci device(full_pci_address) from driver + + :param driver: driver of the PCI address (full_pci_address) + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: None + """ + genio.write_file_or_fail(f"/sys/bus/pci/drivers/{driver}/unbind", full_pci_address) + if wait.wait_for( + lambda: os.path.exists( + f"/sys/bus/pci/drivers/\ +{driver}/{full_pci_address}" + ), + timeout=5, + ): + raise ValueError(f"Not able to unbind {full_pci_address} from {driver}") + + +def bind(driver, full_pci_address): + """ + Bind the pci device(full_pci_address) to driver + + :param driver: driver of the PCI address (full_pci_address) + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: None + """ + genio.write_file_or_fail(f"/sys/bus/pci/drivers/{driver}/bind", full_pci_address) + if not wait.wait_for( + lambda: os.path.exists( + f"/sys/bus/pci/drivers/\ +{driver}/{full_pci_address}" + ), + timeout=5, + ): + raise ValueError(f"Not able to bind {full_pci_address} to {driver}") + + +def get_vendor_id(full_pci_address): + """ + Get vendor id of a PCI address + + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: vendor id of PCI address + rtype: str + """ + cmd = f"lspci -n -s {full_pci_address}" + out = process.run(cmd, ignore_status=True, shell=True).stdout_text + if out == "": + raise ValueError(f"Not able to get {full_pci_address} vendor id") + return out.split(" ")[2].strip() + + +def reset_check(full_pci_address): + """ + Check if reset for "full_pci_address" is successful + + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: whether reset for "full_pci_address" is successful + rtype: bool + """ + cmd = f"lspci -vvs {full_pci_address}" + output = process.run(cmd, ignore_status=True, shell=True).stdout_text + if output != "": + return False + return True + + +def rescan_check(full_pci_address): + """ + Check if rescan for full_pci_address is successful + + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: whether rescan for full_pci_address is successful + rtype: bool + """ + cmd = f"lspci -vvs {full_pci_address}" + output = process.run(cmd, ignore_status=True, shell=True).stdout_text + if output == "": + return False + return True + + +def change_domain_check(dom, full_pci_address, def_dom): + """ + Check if the domain changed successfully to "dom" for "full_pci_address" + + :param dom: domain type + :param def_dom: default domain of pci device(full_pci_address) + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: whether domain changed successfully to "dom" + rtype: bool + """ + try: + output = genio.read_one_line( + f"/sys/bus/pci/devices/{full_pci_address}/iommu_group/type" + ) + out = output.rsplit(None, 1)[-1] + if (dom not in ("auto", out)) or (dom == "auto" and out != def_dom): + return False + return True + except OSError as details: + raise ValueError(f"Change domain check failed: {details}") + + +def reset(full_pci_address): + """ + Remove the full_pci_address + + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: None + """ + genio.write_file_or_fail(f"/sys/bus/pci/devices/{full_pci_address}/remove", "1") + if not wait.wait_for(lambda: reset_check(full_pci_address), timeout=5): + raise ValueError(f"Unsuccessful to remove {full_pci_address}") + + +def rescan(full_pci_address): + """ + Rescan the system and check for full_pci_address + + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: None + """ + genio.write_file_or_fail("/sys/bus/pci/rescan", "1") + if not wait.wait_for(lambda: rescan_check(full_pci_address), timeout=5): + raise ValueError(f"Unsuccessful to rescan for {full_pci_address}") + + +def get_iommu_group(full_pci_address): + """ + Return the iommu group of full_pci_address + + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: iommu group of full_pci_address + rtype: string + """ + cmd = f"lspci -vvvv -s {full_pci_address}" + out = process.run(cmd, ignore_status=True, shell=True) + for line in out.stdout_text.split("\n"): + if "IOMMU group" in line: + return line.strip().split(" ")[2] + raise ValueError(f"{full_pci_address} group not found") + + +def change_domain(dom, def_dom, full_pci_address): + """ + Change the domain of pci device(full_pci_address) to dom + + :param dom: domain type + :param def_dom: default domain of pci device(full_pci_address) + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: None + """ + genio.write_file_or_fail( + f"/sys/bus/pci/devices/{full_pci_address}/iommu_group/type", dom + ) + if not wait.wait_for( + lambda: change_domain_check(dom, full_pci_address, def_dom), timeout=5 + ): + raise ValueError(f"Domain type change failed for {full_pci_address}") + + def get_memory_address(pci_address): """ Gets the memory address of a PCI address. (first match only)