diff --git a/repos/system_upgrade/common/actors/inhibitwhenluks/actor.py b/repos/system_upgrade/common/actors/inhibitwhenluks/actor.py index 40b845b01d..65607167d3 100644 --- a/repos/system_upgrade/common/actors/inhibitwhenluks/actor.py +++ b/repos/system_upgrade/common/actors/inhibitwhenluks/actor.py @@ -1,40 +1,24 @@ -from leapp import reporting from leapp.actors import Actor -from leapp.models import CephInfo, StorageInfo -from leapp.reporting import create_report, Report +from leapp.libraries.actor.inhibitwhenluks import check_invalid_luks_devices +from leapp.models import CephInfo, LuksDumps, StorageInfo, TargetUserSpaceUpgradeTasks, UpgradeInitramfsTasks +from leapp.reporting import Report from leapp.tags import ChecksPhaseTag, IPUWorkflowTag class InhibitWhenLuks(Actor): """ - Check if any encrypted partitions is in use. If yes, inhibit the upgrade process. + Check if any encrypted partitions are in use and whether they are supported for the upgrade. - Upgrading system with encrypted partition is not supported. + Upgrading EL7 system with encrypted partition is not supported (but ceph OSDs). + For EL8+ it's ok if the discovered used encrypted storage has LUKS2 format + and it's bounded to clevis-tpm2 token (so it can be automatically unlocked + during the process). """ name = 'check_luks_and_inhibit' - consumes = (StorageInfo, CephInfo) - produces = (Report,) + consumes = (CephInfo, LuksDumps, StorageInfo) + produces = (Report, TargetUserSpaceUpgradeTasks, UpgradeInitramfsTasks) tags = (ChecksPhaseTag, IPUWorkflowTag) def process(self): - # If encrypted Ceph volumes present, check if there are more encrypted disk in lsblk than Ceph vol - ceph_vol = [] - try: - ceph_info = next(self.consume(CephInfo)) - if ceph_info: - ceph_vol = ceph_info.encrypted_volumes[:] - except StopIteration: - pass - - for storage_info in self.consume(StorageInfo): - for blk in storage_info.lsblk: - if blk.tp == 'crypt' and blk.name not in ceph_vol: - create_report([ - reporting.Title('LUKS encrypted partition detected'), - reporting.Summary('Upgrading system with encrypted partitions is not supported'), - reporting.Severity(reporting.Severity.HIGH), - reporting.Groups([reporting.Groups.BOOT, reporting.Groups.ENCRYPTION]), - reporting.Groups([reporting.Groups.INHIBITOR]), - ]) - break + check_invalid_luks_devices() diff --git a/repos/system_upgrade/common/actors/inhibitwhenluks/libraries/inhibitwhenluks.py b/repos/system_upgrade/common/actors/inhibitwhenluks/libraries/inhibitwhenluks.py new file mode 100644 index 0000000000..57a94e9d6d --- /dev/null +++ b/repos/system_upgrade/common/actors/inhibitwhenluks/libraries/inhibitwhenluks.py @@ -0,0 +1,164 @@ +from leapp import reporting +from leapp.libraries.common.config.version import get_source_major_version +from leapp.libraries.stdlib import api +from leapp.models import ( + CephInfo, + DracutModule, + LuksDumps, + StorageInfo, + TargetUserSpaceUpgradeTasks, + UpgradeInitramfsTasks +) +from leapp.reporting import create_report + +# https://red.ht/clevis-tpm2-luks-auto-unlock-rhel8 +# https://red.ht/clevis-tpm2-luks-auto-unlock-rhel9 +# https://red.ht/convert-to-luks2-rhel8 +# https://red.ht/convert-to-luks2-rhel9 +CLEVIS_DOC_URL_FMT = 'https://red.ht/clevis-tpm2-luks-auto-unlock-rhel{}' +LUKS2_CONVERT_DOC_URL_FMT = 'https://red.ht/convert-to-luks2-rhel{}' + +FMT_LIST_SEPARATOR = '\n - ' + + +def _formatted_list_output(input_list, sep=FMT_LIST_SEPARATOR): + return ['{}{}'.format(sep, item) for item in input_list] + + +def _at_least_one_tpm_token(luks_dump): + return any([token.token_type == "clevis-tpm2" for token in luks_dump.tokens]) + + +def _get_ceph_volumes(): + ceph_info = next(api.consume(CephInfo), None) + return ceph_info.encrypted_volumes[:] if ceph_info else [] + + +def apply_obsoleted_check_ipu_7_8(): + ceph_vol = _get_ceph_volumes() + for storage_info in api.consume(StorageInfo): + for blk in storage_info.lsblk: + if blk.tp == 'crypt' and blk.name not in ceph_vol: + create_report([ + reporting.Title('LUKS encrypted partition detected'), + reporting.Summary('Upgrading system with encrypted partitions is not supported'), + reporting.Severity(reporting.Severity.HIGH), + reporting.Groups([reporting.Groups.BOOT, reporting.Groups.ENCRYPTION]), + reporting.Groups([reporting.Groups.INHIBITOR]), + ]) + break + + +def report_inhibitor(luks1_partitions, no_tpm2_partitions): + source_major_version = get_source_major_version() + clevis_doc_url = CLEVIS_DOC_URL_FMT.format(source_major_version) + luks2_convert_doc_url = LUKS2_CONVERT_DOC_URL_FMT.format(source_major_version) + summary = ( + 'We have detected LUKS encrypted volumes that do not meet current' + ' criteria to be able to proceed the in-place upgrade process.' + ' Right now the upgrade process requires for encrypted storage to be' + ' in LUKS2 format configured with Clevis TPM 2.0.' + ) + + report_hints = [] + + if luks1_partitions: + + summary += ( + '\n\nSince RHEL 8 the default format for LUKS encryption is LUKS2.' + ' Despite the old LUKS1 format is still supported on RHEL systems' + ' it has some limitations in comparison to LUKS2.' + ' Only the LUKS2 format is supported for upgrades.' + ' The following LUKS1 partitions have been discovered on your system:{}' + .format(''.join(_formatted_list_output(luks1_partitions))) + ) + report_hints.append(reporting.Remediation( + hint=( + 'Convert your LUKS1 encrypted devices to LUKS2 and bind it to TPM2 using clevis.' + ' If this is not possible in your case consider clean installation' + ' of the target RHEL system instead.' + ) + )) + report_hints.append(reporting.ExternalLink( + url=luks2_convert_doc_url, + title='LUKS versions in RHEL: Conversion' + )) + + if no_tpm2_partitions: + summary += ( + '\n\nCurrently we require the process to be non-interactive and' + ' offline. For this reason we require automatic unlock of' + ' encrypted devices during the upgrade process.' + ' Currently we support automatic unlocking during the upgrade only' + ' for volumes bound to Clevis TPM2 token.' + ' The following LUKS2 devices without Clevis TPM2 token ' + ' have been discovered on your system: {}' + .format(''.join(_formatted_list_output(no_tpm2_partitions))) + ) + + report_hints.append(reporting.Remediation( + hint=( + 'Add Clevis TPM2 binding to LUKS devices.' + ' If some LUKS devices use still the old LUKS1 format, convert' + ' them to LUKS2 prior to binding.' + ) + )) + report_hints.append(reporting.ExternalLink( + url=clevis_doc_url, + title='Configuring manual enrollment of LUKS-encrypted volumes by using a TPM 2.0 policy' + ) + ) + create_report([ + reporting.Title('Detected LUKS devices unsuitable for in-place upgrade.'), + reporting.Summary(summary), + reporting.Severity(reporting.Severity.HIGH), + reporting.Groups([reporting.Groups.BOOT, reporting.Groups.ENCRYPTION]), + reporting.Groups([reporting.Groups.INHIBITOR]), + ] + report_hints) + + +def check_invalid_luks_devices(): + if get_source_major_version() == '7': + # NOTE: keeping unchanged behaviour for IPU 7 -> 8 + apply_obsoleted_check_ipu_7_8() + return + + luks_dumps = next(api.consume(LuksDumps), None) + if not luks_dumps: + api.current_logger().debug('No LUKS volumes detected. Skipping.') + return + + luks1_partitions = [] + no_tpm2_partitions = [] + ceph_vol = _get_ceph_volumes() + for luks_dump in luks_dumps.dumps: + # if the device is managed by ceph, don't inhibit + if luks_dump.device_name in ceph_vol: + api.current_logger().debug('Skipping LUKS CEPH volume: {}'.format(luks_dump.device_name)) + continue + + if luks_dump.version == 1: + luks1_partitions.append(luks_dump.device_name) + elif luks_dump.version == 2 and not _at_least_one_tpm_token(luks_dump): + no_tpm2_partitions.append(luks_dump.device_name) + + if luks1_partitions or no_tpm2_partitions: + report_inhibitor(luks1_partitions, no_tpm2_partitions) + else: + required_crypt_rpms = [ + 'clevis', + 'clevis-dracut', + 'clevis-systemd', + 'clevis-udisks2', + 'clevis-luks', + 'cryptsetup', + 'tpm2-tss', + 'tpm2-tools', + 'tpm2-abrmd' + ] + api.produce(TargetUserSpaceUpgradeTasks(install_rpms=required_crypt_rpms)) + api.produce(UpgradeInitramfsTasks(include_dracut_modules=[ + DracutModule(name='clevis'), + DracutModule(name='clevis-pin-tpm2') + ]) + ) diff --git a/repos/system_upgrade/common/actors/inhibitwhenluks/tests/test_inhibitwhenluks.py b/repos/system_upgrade/common/actors/inhibitwhenluks/tests/test_inhibitwhenluks.py index 405a34295b..d559b54ccc 100644 --- a/repos/system_upgrade/common/actors/inhibitwhenluks/tests/test_inhibitwhenluks.py +++ b/repos/system_upgrade/common/actors/inhibitwhenluks/tests/test_inhibitwhenluks.py @@ -1,34 +1,173 @@ -from leapp.models import CephInfo, LsblkEntry, StorageInfo +""" +Unit tests for inhibitwhenluks actor + +Skip isort as it's kind of broken when mixing grid import and one line imports + +isort:skip_file +""" + +from leapp.libraries.common.config import version +from leapp.models import ( + CephInfo, + LsblkEntry, + LuksDump, + LuksDumps, + LuksToken, + StorageInfo, + TargetUserSpaceUpgradeTasks, + UpgradeInitramfsTasks +) from leapp.reporting import Report from leapp.snactor.fixture import current_actor_context from leapp.utils.report import is_inhibitor +_REPORT_TITLE_UNSUITABLE = 'Detected LUKS devices unsuitable for in-place upgrade.' -def test_actor_with_luks(current_actor_context): - with_luks = [LsblkEntry(name='luks-132', kname='kname1', maj_min='253:0', rm='0', size='10G', bsize=10*(1 << 39), - ro='0', tp='crypt', mountpoint='', parent_name='', parent_path='')] - current_actor_context.feed(StorageInfo(lsblk=with_luks)) +def test_actor_with_luks1_notpm(monkeypatch, current_actor_context): + monkeypatch.setattr(version, 'get_source_major_version', lambda: '8') + luks_dump = LuksDump( + version=1, + uuid='dd09e6d4-b595-4f1c-80b8-fd47540e6464', + device_path='/dev/sda', + device_name='sda') + current_actor_context.feed(LuksDumps(dumps=[luks_dump])) + current_actor_context.feed(CephInfo(encrypted_volumes=[])) current_actor_context.run() assert current_actor_context.consume(Report) report_fields = current_actor_context.consume(Report)[0].report assert is_inhibitor(report_fields) + assert not current_actor_context.consume(TargetUserSpaceUpgradeTasks) + assert not current_actor_context.consume(UpgradeInitramfsTasks) + assert report_fields['title'] == _REPORT_TITLE_UNSUITABLE + assert 'LUKS1 partitions have been discovered' in report_fields['summary'] + assert luks_dump.device_name in report_fields['summary'] -def test_actor_with_luks_ceph_only(current_actor_context): - with_luks = [LsblkEntry(name='luks-132', kname='kname1', maj_min='253:0', rm='0', size='10G', bsize=10*(1 << 39), - ro='0', tp='crypt', mountpoint='', parent_name='', parent_path='')] - ceph_volume = ['luks-132'] - current_actor_context.feed(StorageInfo(lsblk=with_luks)) - current_actor_context.feed(CephInfo(encrypted_volumes=ceph_volume)) + +def test_actor_with_luks2_notpm(monkeypatch, current_actor_context): + monkeypatch.setattr(version, 'get_source_major_version', lambda: '8') + luks_dump = LuksDump( + version=2, + uuid='27b57c75-9adf-4744-ab04-9eb99726a301', + device_path='/dev/sda', + device_name='sda') + current_actor_context.feed(LuksDumps(dumps=[luks_dump])) + current_actor_context.feed(CephInfo(encrypted_volumes=[])) + current_actor_context.run() + assert current_actor_context.consume(Report) + report_fields = current_actor_context.consume(Report)[0].report + assert is_inhibitor(report_fields) + assert not current_actor_context.consume(TargetUserSpaceUpgradeTasks) + assert not current_actor_context.consume(UpgradeInitramfsTasks) + + assert report_fields['title'] == _REPORT_TITLE_UNSUITABLE + assert 'LUKS2 devices without Clevis TPM2 token' in report_fields['summary'] + assert luks_dump.device_name in report_fields['summary'] + + +def test_actor_with_luks2_invalid_token(monkeypatch, current_actor_context): + monkeypatch.setattr(version, 'get_source_major_version', lambda: '8') + luks_dump = LuksDump( + version=2, + uuid='dc1dbe37-6644-4094-9839-8fc5dcbec0c6', + device_path='/dev/sda', + device_name='sda', + tokens=[LuksToken(token_id=0, keyslot=1, token_type='clevis')]) + current_actor_context.feed(LuksDumps(dumps=[luks_dump])) + current_actor_context.feed(CephInfo(encrypted_volumes=[])) + current_actor_context.run() + assert current_actor_context.consume(Report) + report_fields = current_actor_context.consume(Report)[0].report + assert is_inhibitor(report_fields) + + assert report_fields['title'] == _REPORT_TITLE_UNSUITABLE + assert 'LUKS2 devices without Clevis TPM2 token' in report_fields['summary'] + assert luks_dump.device_name in report_fields['summary'] + assert not current_actor_context.consume(TargetUserSpaceUpgradeTasks) + assert not current_actor_context.consume(UpgradeInitramfsTasks) + + +def test_actor_with_luks2_clevis_tpm_token(monkeypatch, current_actor_context): + monkeypatch.setattr(version, 'get_source_major_version', lambda: '8') + luks_dump = LuksDump( + version=2, + uuid='83050bd9-61c6-4ff0-846f-bfd3ac9bfc67', + device_path='/dev/sda', + device_name='sda', + tokens=[LuksToken(token_id=0, keyslot=1, token_type='clevis-tpm2')]) + current_actor_context.feed(LuksDumps(dumps=[luks_dump])) + current_actor_context.feed(CephInfo(encrypted_volumes=[])) current_actor_context.run() assert not current_actor_context.consume(Report) + upgrade_tasks = current_actor_context.consume(TargetUserSpaceUpgradeTasks) + assert len(upgrade_tasks) == 1 + assert set(upgrade_tasks[0].install_rpms) == set([ + 'clevis', + 'clevis-dracut', + 'clevis-systemd', + 'clevis-udisks2', + 'clevis-luks', + 'cryptsetup', + 'tpm2-tss', + 'tpm2-tools', + 'tpm2-abrmd' + ]) + assert current_actor_context.consume(UpgradeInitramfsTasks) -def test_actor_without_luks(current_actor_context): - without_luks = [LsblkEntry(name='sda1', kname='sda1', maj_min='8:0', rm='0', size='10G', bsize=10*(1 << 39), - ro='0', tp='part', mountpoint='/boot', parent_name='', parent_path='')] - current_actor_context.feed(StorageInfo(lsblk=without_luks)) +def test_actor_with_luks2_ceph(monkeypatch, current_actor_context): + monkeypatch.setattr(version, 'get_source_major_version', lambda: '8') + ceph_volume = ['sda'] + current_actor_context.feed(CephInfo(encrypted_volumes=ceph_volume)) + luks_dump = LuksDump( + version=2, + uuid='0edb8c11-1a04-4abd-a12d-93433ee7b8d8', + device_path='/dev/sda', + device_name='sda', + tokens=[LuksToken(token_id=0, keyslot=1, token_type='clevis')]) + current_actor_context.feed(LuksDumps(dumps=[luks_dump])) current_actor_context.run() assert not current_actor_context.consume(Report) + + # make sure we don't needlessly include clevis packages, when there is no clevis token + assert not current_actor_context.consume(TargetUserSpaceUpgradeTasks) + + +LSBLK_ENTRY = LsblkEntry( + name="luks-whatever", + kname="dm-0", + maj_min="252:1", + rm="0", + size="1G", + bsize=1073741824, + ro="0", + tp="crypt", + mountpoint="/", + parent_name="", + parent_path="" +) + + +def test_inhibitor_on_el7(monkeypatch, current_actor_context): + # NOTE(pstodulk): consider it good enough as el7 stuff is going to be removed + # soon. + monkeypatch.setattr(version, 'get_source_major_version', lambda: '7') + + luks_dump = LuksDump( + version=2, + uuid='83050bd9-61c6-4ff0-846f-bfd3ac9bfc67', + device_path='/dev/sda', + device_name='sda', + tokens=[LuksToken(token_id=0, keyslot=1, token_type='clevis-tpm2')]) + current_actor_context.feed(LuksDumps(dumps=[luks_dump])) + current_actor_context.feed(CephInfo(encrypted_volumes=[])) + + current_actor_context.feed(StorageInfo(lsblk=[LSBLK_ENTRY])) + current_actor_context.run() + assert current_actor_context.consume(Report) + + report_fields = current_actor_context.consume(Report)[0].report + assert is_inhibitor(report_fields) + assert report_fields['title'] == 'LUKS encrypted partition detected'