Skip to content

Commit

Permalink
SCAP report and remediation (#1441)
Browse files Browse the repository at this point in the history
* SCAP report and remediation

* Fix missing attribute error when called without limit

* Comments addressed
  • Loading branch information
lhellebr authored Aug 28, 2024
1 parent f206e50 commit 73239e4
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 3 deletions.
79 changes: 79 additions & 0 deletions airgun/entities/oscapreport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from wait_for import wait_for

from airgun.entities.base import BaseEntity
from airgun.navigation import NavigateStep, navigator
from airgun.utils import retry_navigation
from airgun.views.oscapreport import (
RemediateModal,
SCAPReportDetailsView,
SCAPReportView,
)


class OSCAPReportEntity(BaseEntity):
endpoint_path = '/compliance/arf_reports'

def search(self, search_string):
"""Search for SCAP Report
:param search_string: how to find the SCAP Report
:return: result of the SCAP Report search
"""
view = self.navigate_to(self, 'All')
return view.search(search_string)

def details(self, search_string, widget_names=None, limit=None):
"""Read the content from corresponding SCAP Report dashboard,
clicking on the link in Reported At column of
SCAP Report list
:param search_string:
:param limit: how many rules results to fetch at most
:return: list of dictionaries with values from SCAP Report Details View
"""
view = self.navigate_to(self, 'Details', search_string=search_string)
return view.read(widget_names=widget_names, limit=limit)

def remediate(self, search_string, resource):
"""Remediate the failed rule using automatic remediation through Ansible
:param search_string:
"""
view = self.navigate_to(self, 'Details', search_string=search_string)
view.table.row(resource=resource).actions.fill('Remediation')
view = RemediateModal(self.browser)
view.wait_displayed()
self.browser.plugin.ensure_page_safe()
wait_for(lambda: view.title.is_displayed, timeout=10, delay=1)
view.fill({'select_remediation_method.snippet': 'Ansible'})
view.select_capsule.run.click()


@navigator.register(OSCAPReportEntity, 'All')
class ShowAllSCAPReports(NavigateStep):
"""Navigate to Compliance Reports screen."""

VIEW = SCAPReportView

@retry_navigation
def step(self, *args, **kwargs):
self.view.menu.select('Hosts', 'Compliance', 'Reports')


@navigator.register(OSCAPReportEntity, 'Details')
class DetailsSCAPReport(NavigateStep):
"""To get data from ARF report view
Args:
search_string: what to fill to find the SCAP report
"""

VIEW = SCAPReportDetailsView

def prerequisite(self, *args, **kwargs):
return self.navigate_to(self.obj, 'All')

def step(self, *args, **kwargs):
search_string = kwargs.get('search_string')
self.parent.search(search_string)
self.parent.table.row()['Reported At'].widget.click()
6 changes: 6 additions & 0 deletions airgun/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from airgun.entities.os import OperatingSystemEntity
from airgun.entities.oscapcontent import OSCAPContentEntity
from airgun.entities.oscappolicy import OSCAPPolicyEntity
from airgun.entities.oscapreport import OSCAPReportEntity
from airgun.entities.oscaptailoringfile import OSCAPTailoringFileEntity
from airgun.entities.package import PackageEntity
from airgun.entities.partitiontable import PartitionTableEntity
Expand Down Expand Up @@ -576,6 +577,11 @@ def oscappolicy(self):
"""Instance of OSCAP Policy entity."""
return self._open(OSCAPPolicyEntity)

@cached_property
def oscapreport(self):
"""Instance of OSCAP Report entity."""
return self._open(OSCAPReportEntity)

@cached_property
def oscaptailoringfile(self):
"""Instance of OSCAP Tailoring File entity."""
Expand Down
11 changes: 9 additions & 2 deletions airgun/views/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,28 @@ def select_logout(self):
self.account_menu.click()
self.logout.click()

def read(self, widget_names=None):
def read(self, widget_names=None, limit=None):
"""Reads the contents of the view and presents them as a dictionary.
:param widget_names: If specified, will read only the widgets names in the list.
:param limit: how many entries to fetch at most
:return: A :py:class:`dict` of ``widget_name: widget_read_value``
where the values are retrieved using the :py:meth:`Widget.read`.
"""
if widget_names is None:
if limit is not None:
raise NotImplementedError("You must specify widgets to be able to specify limit")
return super().read()
if not isinstance(widget_names, list | tuple):
widget_names = [widget_names]
values = {}
for widget_name in widget_names:
values[widget_name] = get_widget_by_name(self, widget_name).read()
widget = get_widget_by_name(self, widget_name)
if hasattr(widget, 'read_limited') and callable(widget.read_limited):
values[widget_name] = widget.read(limit=limit)
else:
values[widget_name] = widget.read()
return normalize_dict_values(values)


Expand Down
83 changes: 83 additions & 0 deletions airgun/views/oscapreport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from widgetastic.widget import Text, View
from widgetastic_patternfly4 import Button
from widgetastic_patternfly4.ouia import FormSelect

from airgun.views.common import BaseLoggedInView, SearchableViewMixin, WizardStepView
from airgun.widgets import (
ActionsDropdown,
SatTable,
)


class SCAPReportView(BaseLoggedInView, SearchableViewMixin):
title = Text("//h1[normalize-space(.)='Compliance Reports']")
table = SatTable(
'.//table',
column_widgets={
'Host': Text(".//a[contains(@href,'/new/hosts')]"),
'Reported At': Text(".//a[contains(@href,'/compliance/arf_reports')]"),
'Policy': Text(".//a[contains(@href,'/compliance/policies')]"),
'Openscap Capsule': Text(".//a[contains(@href,'/smart_proxies')]"),
'Passed': Text(".//span[contains(@class,'label-info')]"),
'Failed': Text(".//span[contains(@class,'label-danger')]"),
'Other': Text(".//span[contains(@class,'label-warning')]"),
'Actions': ActionsDropdown("./div[contains(@class, 'btn-group')]"),
},
)

@property
def is_displayed(self):
return self.browser.wait_for_element(self.title, exception=False) is not None


class SCAPReportDetailsView(BaseLoggedInView):
show_log_messages_label = Text('//span[normalize-space(.)="Show log messages:"]')
table = SatTable(
'.//table',
column_widgets={
'Result': Text('./span[1]'),
'Message': Text('./span[2]'),
'Resource': Text('./span[3]'),
'Severity': Text('./img[1]'),
'Actions': ActionsDropdown("./div[contains(@class, 'btn-group')]"),
},
)

@property
def is_displayed(self):
return (
self.browser.wait_for_element(self.show_log_messages_label, exception=False) is not None
)


class RemediateModal(View):
"""
Class representing the "Remediate" modal.
It contains multiple nested classes each representing a step of the wizard.
"""

ROOT = '//div[contains(@data-ouia-component-id, "OUIA-Generated-Modal-large-")]'

title = Text('.//h2[contains(@class, "pf-c-title")]')
close_modal = Button(locator='.//button[@aria-label="Close"]')

@View.nested
class select_remediation_method(WizardStepView):
expander = Text(
'.//button[contains(@class,"pf-c-wizard__nav-link") and contains(.,"Select snippet")]'
)
snippet = FormSelect('snippet-select')

@View.nested
class name_source(WizardStepView):
expander = Text(
'.//button[contains(@class,"pf-c-wizard__nav-link") and contains(.,"Review hosts")]'
)
host_table = SatTable(".//table")

@View.nested
class select_capsule(WizardStepView):
expander = Text(
'.//button[contains(@class,"pf-c-wizard__nav-link") and contains(.,"Review remediation")]'
)
run = Button(locator='.//button[normalize-space(.)="Run"]')
47 changes: 46 additions & 1 deletion airgun/widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1962,11 +1962,56 @@ def has_rows(self):
return False
return True

def read(self):
def read_limited(self, limit):
"""This is almost the same as inherited read but has a limit. Use it for tables that take too long to read.
Reads the table. Returns a list, every item in the list is contents read from the row."""
rows = list(self)
# Cut the unwanted rows if necessary
if self.rows_ignore_top is not None:
rows = rows[self.rows_ignore_top :]
if self.rows_ignore_bottom is not None and self.rows_ignore_bottom > 0:
rows = rows[: -self.rows_ignore_bottom]
if self.assoc_column_position is None:
ret = []
rows_read = 0
for row in rows:
if rows_read >= limit:
break
ret.append(row.read())
rows_read = rows_read + 1
return ret
else:
result = {}
rows_read = 0
for row in rows:
if rows_read >= limit:
break
row_read = row.read()
try:
key = row_read.pop(self.header_index_mapping[self.assoc_column_position])
except KeyError:
try:
key = row_read.pop(self.assoc_column_position)
except KeyError:
try:
key = row_read.pop(self.assoc_column)
except KeyError as e:
raise ValueError(
f"The assoc_column={self.assoc_column!r} could not be retrieved"
) from e
if key in result:
raise ValueError(f"Duplicate value for {key}={result[key]!r}")
result[key] = row_read
rows_read = rows_read + 1
return result

def read(self, limit=None):
"""Return empty list in case table is empty"""
if not self.has_rows:
self.logger.debug(f'Table {self.locator} is empty')
return []
if limit is not None:
return self.read_limited(limit)
if self.pagination.is_displayed:
return self._read_all()
return super().read()
Expand Down

0 comments on commit 73239e4

Please sign in to comment.