Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom modifications tracking #1148

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from leapp.actors import Actor
from leapp.libraries.actor import checkcustommodifications
from leapp.models import CustomModifications, Report
from leapp.tags import ChecksPhaseTag, IPUWorkflowTag


class CheckCustomModificationsActor(Actor):
"""
Checks CustomModifications messages and produces a report about files in leapp directories that have been
modified or newly added.
"""

name = 'check_custom_modifications_actor'
consumes = (CustomModifications,)
produces = (Report,)
tags = (IPUWorkflowTag, ChecksPhaseTag)

def process(self):
checkcustommodifications.report_any_modifications()
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from leapp import reporting
from leapp.libraries.stdlib import api
from leapp.models import CustomModifications

FMT_LIST_SEPARATOR = "\n - "


def _pretty_files(messages):
"""
Return formatted string of discovered files from obtained CustomModifications messages.
"""
flist = []
for msg in messages:
actor = ' (Actor: {})'.format(msg.actor_name) if msg.actor_name else ''
flist.append(
'{sep}{filename}{actor}'.format(
sep=FMT_LIST_SEPARATOR,
filename=msg.filename,
actor=actor
)
)
return ''.join(flist)


def _is_modified_config(msg):
# NOTE(pstodulk):
# We are interested just about modified files for now. Having new created config
# files is not so much important for us right now, but in future it could
# be changed.
if msg.component and msg.component == 'configuration':
return msg.type == 'modified'
return False


def _create_report(title, summary, hint, links=None):
report_parts = [
reporting.Title(title),
reporting.Summary(summary),
reporting.Severity(reporting.Severity.HIGH),
reporting.Groups([reporting.Groups.UPGRADE_PROCESS]),
reporting.RemediationHint(hint)
]
if links:
report_parts += links
reporting.create_report(report_parts)


def check_configuration_files(msgs):
filtered_msgs = [m for m in msgs if _is_modified_config(m)]
if not filtered_msgs:
return
title = 'Detected modified configuration files in leapp configuration directories.'
summary = (
'We have detected that some configuration files related to leapp or'
' upgrade process have been modified. Some of these changes could be'
' intended (e.g. modified repomap.json file in case of private cloud'
' regions or customisations done on used Satellite server) so it is'
' not always needed to worry about them. However they can impact'
' the in-place upgrade and it is good to be aware of potential problems'
' or unexpected results if they are not intended.'
'\nThe list of modified configuration files:{files}'
.format(files=_pretty_files(filtered_msgs))
)
hint = (
'If some of changes in listed configuration files have not been intended,'
' you can restore original files by following procedure:'
'\n1. Remove (or back up) modified files that you want to restore.'
'\n2. Reinstall packages which owns these files.'
)
_create_report(title, summary, hint)


def _is_modified_code(msg):
if msg.component not in ['framework', 'repository']:
return False
return msg.type == 'modified'


def check_modified_code(msgs):
filtered_msgs = [m for m in msgs if _is_modified_code(m)]
if not filtered_msgs:
return
title = 'Detected modified files of the in-place upgrade tooling.'
summary = (
'We have detected that some files of the tooling processing the in-place'
' upgrade have been modified. Note that such modifications can be allowed'
' only after consultation with Red Hat - e.g. when support suggests'
' the change to resolve discovered problem.'
' If these changes have not been approved by Red Hat, the in-place upgrade'
' is unsupported.'
'\nFollowing files have been modified:{files}'
.format(files=_pretty_files(filtered_msgs))
)
hint = 'To restore original files reinstall related packages.'
_create_report(title, summary, hint)


def check_custom_actors(msgs):
filtered_msgs = [m for m in msgs if m.type == 'custom']
if not filtered_msgs:
return
title = 'Detected custom leapp actors or files.'
summary = (
'We have detected installed custom actors or files on the system.'
' These can be provided e.g. by third party vendors, Red Hat consultants,'
' or can be created by users to customize the upgrade (e.g. to migrate'
' custom applications).'
' This is allowed and appreciated. However Red Hat is not responsible'
' for any issues caused by these custom leapp actors.'
' Note that upgrade tooling is under agile development which could'
' require more frequent update of custom actors.'
'\nThe list of custom leapp actors and files:{files}'
.format(files=_pretty_files(filtered_msgs))
)
hint = (
'In case of any issues connected to custom or third party actors,'
' contact vendor of such actors. Also we suggest to ensure the installed'
' custom leapp actors are up to date, compatible with the installed'
' packages.'
)
links = [
reporting.ExternalLink(
url='https://red.ht/customize-rhel-upgrade',
title='Customizing your Red Hat Enterprise Linux in-place upgrade'
)
]

_create_report(title, summary, hint, links)


def report_any_modifications():
modifications = list(api.consume(CustomModifications))
if not modifications:
# no modification detected
return
check_custom_actors(modifications)
check_configuration_files(modifications)
check_modified_code(modifications)
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from leapp.libraries.actor import checkcustommodifications
from leapp.models import CustomModifications, Report


def test_report_any_modifications(current_actor_context):
discovered_msgs = [CustomModifications(filename='some/changed/leapp/actor/file',
type='modified',
actor_name='an_actor',
component='repository'),
CustomModifications(filename='some/new/actor/in/leapp/dir',
type='custom',
actor_name='a_new_actor',
component='repository'),
CustomModifications(filename='some/new/actor/in/leapp/dir',
type='modified',
actor_name='a_new_actor',
component='configuration'),
CustomModifications(filename='some/changed/file/in/framework',
type='modified',
actor_name='',
component='framework')]
for msg in discovered_msgs:
current_actor_context.feed(msg)
current_actor_context.run()
reports = current_actor_context.consume(Report)
assert len(reports) == 3
assert (reports[0].report['title'] ==
'Detected custom leapp actors or files.')
assert 'some/new/actor/in/leapp/dir (Actor: a_new_actor)' in reports[0].report['summary']
assert (reports[1].report['title'] ==
'Detected modified configuration files in leapp configuration directories.')
assert (reports[2].report['title'] ==
'Detected modified files of the in-place upgrade tooling.')
assert 'some/changed/file/in/framework' in reports[2].report['summary']
assert 'some/changed/leapp/actor/file (Actor: an_actor)' in reports[2].report['summary']
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from leapp.actors import Actor
from leapp.libraries.actor import scancustommodifications
from leapp.models import CustomModifications
from leapp.tags import FactsPhaseTag, IPUWorkflowTag


class ScanCustomModificationsActor(Actor):
"""
Collects information about files in leapp directories that have been modified or newly added.
"""

name = 'scan_custom_modifications_actor'
produces = (CustomModifications,)
tags = (IPUWorkflowTag, FactsPhaseTag)

def process(self):
for msg in scancustommodifications.scan():
self.produce(msg)
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import ast
import os

from leapp.exceptions import StopActorExecution
from leapp.libraries.common import rpms
from leapp.libraries.stdlib import api, CalledProcessError, run
from leapp.models import CustomModifications

LEAPP_REPO_DIRS = ['/usr/share/leapp-repository']
LEAPP_PACKAGES_TO_IGNORE = ['snactor']


def _get_dirs_to_check(component):
if component == 'repository':
return LEAPP_REPO_DIRS
return []


def _get_rpms_to_check(component=None):
if component == 'repository':
return rpms.get_leapp_packages(component=rpms.LeappComponents.REPOSITORY)
if component == 'framework':
return rpms.get_leapp_packages(component=rpms.LeappComponents.FRAMEWORK)
return rpms.get_leapp_packages(components=[rpms.LeappComponents.REPOSITORY, rpms.LeappComponents.FRAMEWORK])


def deduce_actor_name(a_file):
"""
A helper to map an actor/library to the actor name
If a_file is an actor or an actor library, the name of the actor (name attribute of actor class) will be returned.
Empty string is returned if the file could not be associated with any actor.
"""
if not os.path.exists(a_file):
return ''
# NOTE(ivasilev) Actors reside only in actor.py files, so AST processing any other file can be skipped.
# In case this function has been called on a non-actor file, let's go straight to recursive call on the assumed
# location of the actor file.
if os.path.basename(a_file) == 'actor.py':
data = None
with open(a_file) as f:
try:
data = ast.parse(f.read())
except TypeError:
api.current_logger().warning('An error occurred while parsing %s, can not deduce actor name', a_file)
return ''
# NOTE(ivasilev) Making proper syntax analysis is not the goal here, so let's get away with the bare minimum.
# An actor file will have an Actor ClassDef with a name attribute and a process function defined
actor = next((obj for obj in data.body if isinstance(obj, ast.ClassDef) and obj.name and
any(isinstance(o, ast.FunctionDef) and o.name == 'process' for o in obj.body)), None)
# NOTE(ivasilev) obj.name attribute refers only to Class name, so for fetching name attribute need to go
# deeper
if actor:
try:
actor_name = next((expr.value.s for expr in actor.body
if isinstance(expr, ast.Assign) and expr.targets[-1].id == 'name'), None)
except (AttributeError, IndexError):
api.current_logger().warning("Syntax Analysis for %d has failed", a_file)
actor_name = None
if actor_name:
return actor_name

# Assuming here we are dealing with a library or a file, so let's discover actor filename and deduce actor name
# from it. Actor is expected to be found under ../../actor.py
def _check_assumed_location(subdir):
assumed_actor_file = os.path.join(a_file.split(subdir)[0], 'actor.py')
if not os.path.exists(assumed_actor_file):
# Nothing more we can do - no actor name mapping, return ''
return ''
return deduce_actor_name(assumed_actor_file)

return _check_assumed_location('libraries') or _check_assumed_location('files')


def _run_command(cmd, warning_to_log, checked=True):
"""
A helper that executes a command and returns a result or raises StopActorExecution.
Upon success results will contain a list with line-by-line output returned by the command.
"""
try:
res = run(cmd, checked=checked)
output = res['stdout'].strip()
if not output:
return []
return output.split('\n')
except CalledProcessError:
api.current_logger().warning(warning_to_log)
raise StopActorExecution()


def _modification_model(filename, change_type, component, rpm_checks_str=''):
# XXX FIXME(ivasilev) Actively thinking if different model classes inheriting from CustomModifications
# are needed or let's get away with one model for everything (as is implemented now).
# The only difference atm is that actor_name makes sense only for repository modifications.
return CustomModifications(filename=filename, type=change_type, component=component,
actor_name=deduce_actor_name(filename), rpm_checks_str=rpm_checks_str)


def check_for_modifications(component):
"""
This will return a list of any untypical files or changes to shipped leapp files discovered on the system.
An empty list means that no modifications have been found.
"""
rpms = _get_rpms_to_check(component)
dirs = _get_dirs_to_check(component)
source_of_truth = []
leapp_files = []
# Let's collect data about what should have been installed from rpm
for rpm in rpms:
res = _run_command(['rpm', '-ql', rpm], 'Could not get a list of installed files from rpm {}'.format(rpm))
source_of_truth.extend(res)
# Let's collect data about what's really on the system
for directory in dirs:
res = _run_command(['find', directory, '-type', 'f'],
'Could not get a list of leapp files from {}'.format(directory))
leapp_files.extend(res)
# Let's check for unexpected additions
custom_files = sorted(set(leapp_files) - set(source_of_truth))
# Now let's check for modifications
modified_files = []
modified_configs = []
for rpm in rpms:
res = _run_command(
['rpm', '-V', '--nomtime', rpm], 'Could not check authenticity of the files from {}'.format(rpm),
# NOTE(ivasilev) check is False here as in case of any changes found exit code will be 1
checked=False)
if res:
api.current_logger().warning('Modifications to leapp files detected!\n%s', res)
for modification_str in res:
modification = tuple(modification_str.split())
if len(modification) == 3 and modification[1] == 'c':
# Dealing with a configuration that will be displayed as ('S.5......', 'c', '/file/path')
modified_configs.append(modification)
else:
# Modification of any other rpm file detected
modified_files.append(modification)
return ([_modification_model(filename=f[1], component=component, rpm_checks_str=f[0], change_type='modified')
# Let's filter out pyc files not to clutter the output as pyc will be present even in case of
# a plain open & save-not-changed that we agreed not to react upon.
for f in modified_files if not f[1].endswith('.pyc')] +
[_modification_model(filename=f, component=component, change_type='custom')
for f in custom_files] +
[_modification_model(filename=f[2], component='configuration', rpm_checks_str=f[0], change_type='modified')
for f in modified_configs])


def scan():
return check_for_modifications('framework') + check_for_modifications('repository')
Loading
Loading