From 3e57c4468b94b4b90837c27cbfb41dad1ca4bc46 Mon Sep 17 00:00:00 2001 From: Dheeraj Kumar Srivastava Date: Tue, 15 Aug 2023 23:58:04 +0530 Subject: [PATCH] Add some pci utils in avocado framework Add the following pci utils: 1) unbind - unbind the driver from pci device. 2) bind - bind the driver to pci device. 3) get_vendor_id - get the vendor id of pci device. 4) reset - remove pci device in pci devices list in the system. 5) rescan - rescan the system for pci device. 6) reset_check - check if reset of pci device is successful. 7) rescan_check - check if rescan of the system for pci device is successful. 8) get_iommu_group - returns the iommu group of pci device. 9) change_domain - change iommu domain of pci device 10) change_domain_check - check whether whether change_domain is successful. Signed-off-by: Dheeraj Kumar Srivastava --- avocado/utils/pci.py | 165 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 164 insertions(+), 1 deletion(-) diff --git a/avocado/utils/pci.py b/avocado/utils/pci.py index 760f1a179e..a4d8e690dc 100644 --- a/avocado/utils/pci.py +++ b/avocado/utils/pci.py @@ -24,7 +24,7 @@ import os import re -from avocado.utils import genio, process +from avocado.utils import genio, process, wait def get_domains(): @@ -320,6 +320,169 @@ def get_driver(pci_address): return line.rsplit(None, 1)[-1] +def unbind(driver, full_pci_address): + """ + Unbind the pci device(full_pci_address) from driver + + :param driver: driver of the PCI address (full_pci_address) + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: None + """ + genio.write_file_or_fail(f"/sys/bus/pci/drivers/{driver}/unbind", full_pci_address) + if wait.wait_for( + lambda: os.path.exists( + f"/sys/bus/pci/drivers/\ +{driver}/{full_pci_address}" + ), + timeout=5, + ): + raise ValueError(f"Not able to unbind {full_pci_address} from {driver}") + + +def bind(driver, full_pci_address): + """ + Bind the pci device(full_pci_address) to driver + + :param driver: driver of the PCI address (full_pci_address) + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: None + """ + genio.write_file_or_fail(f"/sys/bus/pci/drivers/{driver}/bind", full_pci_address) + if not wait.wait_for( + lambda: os.path.exists( + f"/sys/bus/pci/drivers/\ +{driver}/{full_pci_address}" + ), + timeout=5, + ): + raise ValueError(f"Not able to bind {full_pci_address} to {driver}") + + +def get_vendor_id(full_pci_address): + """ + Get vendor id of a PCI address + + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: vendor id of PCI address + rtype: str + """ + cmd = f"lspci -n -s {full_pci_address}" + out = process.run(cmd, ignore_status=True, shell=True).stdout_text + if out == "": + raise ValueError(f"Not able to get {full_pci_address} vendor id") + return out.split(" ")[2].strip() + + +def reset_check(full_pci_address): + """ + Check if reset for "full_pci_address" is successful + + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: whether reset for "full_pci_address" is successful + rtype: bool + """ + cmd = f"lspci -vvs {full_pci_address}" + output = process.run(cmd, ignore_status=True, shell=True).stdout_text + if output != "": + return False + return True + + +def rescan_check(full_pci_address): + """ + Check if rescan for full_pci_address is successful + + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: whether rescan for full_pci_address is successful + rtype: bool + """ + cmd = f"lspci -vvs {full_pci_address}" + output = process.run(cmd, ignore_status=True, shell=True).stdout_text + if output == "": + return False + return True + + +def change_domain_check(dom, full_pci_address, def_dom): + """ + Check if the domain changed successfully to "dom" for "full_pci_address" + + :param dom: domain type + :param def_dom: default domain of pci device(full_pci_address) + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: whether domain changed successfully to "dom" + rtype: bool + """ + try: + output = genio.read_one_line( + f"/sys/bus/pci/devices/{full_pci_address}/iommu_group/type" + ) + out = output.rsplit(None, 1)[-1] + if (dom not in ("auto", out)) or (dom == "auto" and out != def_dom): + return False + return True + except OSError as details: + raise ValueError(f"Change domain check failed: {details}") + + +def reset(full_pci_address): + """ + Remove the full_pci_address + + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: None + """ + genio.write_file_or_fail(f"/sys/bus/pci/devices/{full_pci_address}/remove", "1") + if not wait.wait_for(lambda: reset_check(full_pci_address), timeout=5): + raise ValueError(f"Unsuccessful to remove {full_pci_address}") + + +def rescan(full_pci_address): + """ + Rescan the system and check for full_pci_address + + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: None + """ + genio.write_file_or_fail("/sys/bus/pci/rescan", "1") + if not wait.wait_for(lambda: rescan_check(full_pci_address), timeout=5): + raise ValueError(f"Unsuccessful to rescan for {full_pci_address}") + + +def get_iommu_group(full_pci_address): + """ + Return the iommu group of full_pci_address + + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: iommu group of full_pci_address + rtype: string + """ + cmd = f"lspci -vvvv -s {full_pci_address}" + out = process.run(cmd, ignore_status=True, shell=True) + for line in out.stdout_text.split("\n"): + if "IOMMU group" in line: + return line.strip().split(" ")[2] + raise ValueError(f"{full_pci_address} group not found") + + +def change_domain(dom, def_dom, full_pci_address): + """ + Change the domain of pci device(full_pci_address) to dom + + :param dom: domain type + :param def_dom: default domain of pci device(full_pci_address) + :param full_pci_address: Full PCI address including domain (0000:03:00.0) + return: None + """ + genio.write_file_or_fail( + f"/sys/bus/pci/devices/{full_pci_address}/iommu_group/type", dom + ) + if not wait.wait_for( + lambda: change_domain_check(dom, full_pci_address, def_dom), timeout=5 + ): + raise ValueError(f"Domain type change failed for {full_pci_address}") + + def get_memory_address(pci_address): """ Gets the memory address of a PCI address. (first match only)