diff --git a/airgun/entities/oscapreport.py b/airgun/entities/oscapreport.py new file mode 100644 index 000000000..86b6f73d3 --- /dev/null +++ b/airgun/entities/oscapreport.py @@ -0,0 +1,79 @@ +from wait_for import wait_for + +from airgun.entities.base import BaseEntity +from airgun.navigation import NavigateStep, navigator +from airgun.utils import retry_navigation +from airgun.views.oscapreport import ( + RemediateModal, + SCAPReportDetailsView, + SCAPReportView, +) + + +class OSCAPReportEntity(BaseEntity): + endpoint_path = '/compliance/arf_reports' + + def search(self, search_string): + """Search for SCAP Report + + :param search_string: how to find the SCAP Report + :return: result of the SCAP Report search + """ + view = self.navigate_to(self, 'All') + return view.search(search_string) + + def details(self, search_string, widget_names=None, limit=None): + """Read the content from corresponding SCAP Report dashboard, + clicking on the link in Reported At column of + SCAP Report list + + :param search_string: + :param limit: how many rules results to fetch at most + :return: list of dictionaries with values from SCAP Report Details View + """ + view = self.navigate_to(self, 'Details', search_string=search_string) + return view.read(widget_names=widget_names, limit=limit) + + def remediate(self, search_string, resource): + """Remediate the failed rule using automatic remediation through Ansible + + :param search_string: + """ + view = self.navigate_to(self, 'Details', search_string=search_string) + view.table.row(resource=resource).actions.fill('Remediation') + view = RemediateModal(self.browser) + view.wait_displayed() + self.browser.plugin.ensure_page_safe() + wait_for(lambda: view.title.is_displayed, timeout=10, delay=1) + view.fill({'select_remediation_method.snippet': 'Ansible'}) + view.select_capsule.run.click() + + +@navigator.register(OSCAPReportEntity, 'All') +class ShowAllSCAPReports(NavigateStep): + """Navigate to Compliance Reports screen.""" + + VIEW = SCAPReportView + + @retry_navigation + def step(self, *args, **kwargs): + self.view.menu.select('Hosts', 'Compliance', 'Reports') + + +@navigator.register(OSCAPReportEntity, 'Details') +class DetailsSCAPReport(NavigateStep): + """To get data from ARF report view + + Args: + search_string: what to fill to find the SCAP report + """ + + VIEW = SCAPReportDetailsView + + def prerequisite(self, *args, **kwargs): + return self.navigate_to(self.obj, 'All') + + def step(self, *args, **kwargs): + search_string = kwargs.get('search_string') + self.parent.search(search_string) + self.parent.table.row()['Reported At'].widget.click() diff --git a/airgun/session.py b/airgun/session.py index 4065dd50f..229ea558c 100644 --- a/airgun/session.py +++ b/airgun/session.py @@ -57,6 +57,7 @@ from airgun.entities.os import OperatingSystemEntity from airgun.entities.oscapcontent import OSCAPContentEntity from airgun.entities.oscappolicy import OSCAPPolicyEntity +from airgun.entities.oscapreport import OSCAPReportEntity from airgun.entities.oscaptailoringfile import OSCAPTailoringFileEntity from airgun.entities.package import PackageEntity from airgun.entities.partitiontable import PartitionTableEntity @@ -576,6 +577,11 @@ def oscappolicy(self): """Instance of OSCAP Policy entity.""" return self._open(OSCAPPolicyEntity) + @cached_property + def oscapreport(self): + """Instance of OSCAP Report entity.""" + return self._open(OSCAPReportEntity) + @cached_property def oscaptailoringfile(self): """Instance of OSCAP Tailoring File entity.""" diff --git a/airgun/views/common.py b/airgun/views/common.py index 1ff313948..2c222dc40 100644 --- a/airgun/views/common.py +++ b/airgun/views/common.py @@ -57,21 +57,28 @@ def select_logout(self): self.account_menu.click() self.logout.click() - def read(self, widget_names=None): + def read(self, widget_names=None, limit=None): """Reads the contents of the view and presents them as a dictionary. :param widget_names: If specified, will read only the widgets names in the list. + :param limit: how many entries to fetch at most :return: A :py:class:`dict` of ``widget_name: widget_read_value`` where the values are retrieved using the :py:meth:`Widget.read`. """ if widget_names is None: + if limit is not None: + raise NotImplementedError("You must specify widgets to be able to specify limit") return super().read() if not isinstance(widget_names, list | tuple): widget_names = [widget_names] values = {} for widget_name in widget_names: - values[widget_name] = get_widget_by_name(self, widget_name).read() + widget = get_widget_by_name(self, widget_name) + if hasattr(widget, 'read_limited') and callable(widget.read_limited): + values[widget_name] = widget.read(limit=limit) + else: + values[widget_name] = widget.read() return normalize_dict_values(values) diff --git a/airgun/views/oscapreport.py b/airgun/views/oscapreport.py new file mode 100644 index 000000000..2bc57c58c --- /dev/null +++ b/airgun/views/oscapreport.py @@ -0,0 +1,83 @@ +from widgetastic.widget import Text, View +from widgetastic_patternfly4 import Button +from widgetastic_patternfly4.ouia import FormSelect + +from airgun.views.common import BaseLoggedInView, SearchableViewMixin, WizardStepView +from airgun.widgets import ( + ActionsDropdown, + SatTable, +) + + +class SCAPReportView(BaseLoggedInView, SearchableViewMixin): + title = Text("//h1[normalize-space(.)='Compliance Reports']") + table = SatTable( + './/table', + column_widgets={ + 'Host': Text(".//a[contains(@href,'/new/hosts')]"), + 'Reported At': Text(".//a[contains(@href,'/compliance/arf_reports')]"), + 'Policy': Text(".//a[contains(@href,'/compliance/policies')]"), + 'Openscap Capsule': Text(".//a[contains(@href,'/smart_proxies')]"), + 'Passed': Text(".//span[contains(@class,'label-info')]"), + 'Failed': Text(".//span[contains(@class,'label-danger')]"), + 'Other': Text(".//span[contains(@class,'label-warning')]"), + 'Actions': ActionsDropdown("./div[contains(@class, 'btn-group')]"), + }, + ) + + @property + def is_displayed(self): + return self.browser.wait_for_element(self.title, exception=False) is not None + + +class SCAPReportDetailsView(BaseLoggedInView): + show_log_messages_label = Text('//span[normalize-space(.)="Show log messages:"]') + table = SatTable( + './/table', + column_widgets={ + 'Result': Text('./span[1]'), + 'Message': Text('./span[2]'), + 'Resource': Text('./span[3]'), + 'Severity': Text('./img[1]'), + 'Actions': ActionsDropdown("./div[contains(@class, 'btn-group')]"), + }, + ) + + @property + def is_displayed(self): + return ( + self.browser.wait_for_element(self.show_log_messages_label, exception=False) is not None + ) + + +class RemediateModal(View): + """ + Class representing the "Remediate" modal. + It contains multiple nested classes each representing a step of the wizard. + """ + + ROOT = '//div[contains(@data-ouia-component-id, "OUIA-Generated-Modal-large-")]' + + title = Text('.//h2[contains(@class, "pf-c-title")]') + close_modal = Button(locator='.//button[@aria-label="Close"]') + + @View.nested + class select_remediation_method(WizardStepView): + expander = Text( + './/button[contains(@class,"pf-c-wizard__nav-link") and contains(.,"Select snippet")]' + ) + snippet = FormSelect('snippet-select') + + @View.nested + class name_source(WizardStepView): + expander = Text( + './/button[contains(@class,"pf-c-wizard__nav-link") and contains(.,"Review hosts")]' + ) + host_table = SatTable(".//table") + + @View.nested + class select_capsule(WizardStepView): + expander = Text( + './/button[contains(@class,"pf-c-wizard__nav-link") and contains(.,"Review remediation")]' + ) + run = Button(locator='.//button[normalize-space(.)="Run"]') diff --git a/airgun/widgets.py b/airgun/widgets.py index 89e2f4c87..607c6d20d 100644 --- a/airgun/widgets.py +++ b/airgun/widgets.py @@ -1962,11 +1962,56 @@ def has_rows(self): return False return True - def read(self): + def read_limited(self, limit): + """This is almost the same as inherited read but has a limit. Use it for tables that take too long to read. + Reads the table. Returns a list, every item in the list is contents read from the row.""" + rows = list(self) + # Cut the unwanted rows if necessary + if self.rows_ignore_top is not None: + rows = rows[self.rows_ignore_top :] + if self.rows_ignore_bottom is not None and self.rows_ignore_bottom > 0: + rows = rows[: -self.rows_ignore_bottom] + if self.assoc_column_position is None: + ret = [] + rows_read = 0 + for row in rows: + if rows_read >= limit: + break + ret.append(row.read()) + rows_read = rows_read + 1 + return ret + else: + result = {} + rows_read = 0 + for row in rows: + if rows_read >= limit: + break + row_read = row.read() + try: + key = row_read.pop(self.header_index_mapping[self.assoc_column_position]) + except KeyError: + try: + key = row_read.pop(self.assoc_column_position) + except KeyError: + try: + key = row_read.pop(self.assoc_column) + except KeyError as e: + raise ValueError( + f"The assoc_column={self.assoc_column!r} could not be retrieved" + ) from e + if key in result: + raise ValueError(f"Duplicate value for {key}={result[key]!r}") + result[key] = row_read + rows_read = rows_read + 1 + return result + + def read(self, limit=None): """Return empty list in case table is empty""" if not self.has_rows: self.logger.debug(f'Table {self.locator} is empty') return [] + if limit is not None: + return self.read_limited(limit) if self.pagination.is_displayed: return self._read_all() return super().read()