diff --git a/src/stratis_cli/_actions/_list_pool.py b/src/stratis_cli/_actions/_list_pool.py index 2417b5ac6..12e45fe51 100644 --- a/src/stratis_cli/_actions/_list_pool.py +++ b/src/stratis_cli/_actions/_list_pool.py @@ -16,7 +16,6 @@ """ # isort: STDLIB -import json from abc import ABC, abstractmethod # isort: THIRDPARTY @@ -35,7 +34,7 @@ print_table, size_triple, ) -from ._utils import ClevisInfo +from ._utils import EncryptionInfoClevis, EncryptionInfoKeyDescription, StoppedPool def _fetch_stopped_pools_property(proxy): @@ -57,24 +56,22 @@ def _non_existent_or_inconsistent_to_str( value, *, inconsistent_str="inconsistent", non_existent_str="N/A", interp=str ): """ - Process dbus value that encodes both inconsistency and existence of the + Process dbus result that encodes both inconsistency and existence of the value. - :param value: a dbus value + :param EncryptionInfo value: a dbus result :param str inconsistent_str: value to return if inconsistent :param str non_existent_str: value to return if non-existent :param interp: how to interpret the value if it exists and is consistent :returns: a string to print :rtype: str """ - (consistent, tuple_or_err_str) = value - - if not consistent: # pragma: no cover + if not value.consistent(): # pragma: no cover return inconsistent_str - (exists, value) = tuple_or_err_str + value = value.value - if not exists: + if value is None: return non_existent_str return interp(value) @@ -92,15 +89,17 @@ def list_pools(uuid_formatter, *, stopped=False, selection=None): klass.list_pools() -def clevis_to_str(clevis_dbus_object): # pragma: no cover +def _clevis_to_str(clevis_info): # pragma: no cover """ - :param dbus.Struct clevis_dbus_object: clevis information + :param ClevisInfo clevis_info: the Clevis info to stringify + :return: a string that represents the clevis info :rtype: str """ - clevis_pin = str(clevis_dbus_object[0]) - clevis_config = json.loads(clevis_dbus_object[1]) - clevis_info = ClevisInfo(clevis_pin, clevis_config) - return str(clevis_info) + + config_string = " ".join( + f"{key}: {value}" for key, value in clevis_info.config.items() + ) + return f"{clevis_info.pin} {config_string}" class List(ABC): # pylint: disable=too-few-public-methods @@ -262,7 +261,9 @@ def _print_detail_view(self, mopool, size_change_codes): ) key_description_str = ( - _non_existent_or_inconsistent_to_str(mopool.KeyDescription()) + _non_existent_or_inconsistent_to_str( + EncryptionInfoKeyDescription(mopool.KeyDescription()) + ) if encrypted else "unencrypted" ) @@ -270,7 +271,7 @@ def _print_detail_view(self, mopool, size_change_codes): clevis_info_str = ( _non_existent_or_inconsistent_to_str( - mopool.ClevisInfo(), interp=clevis_to_str + EncryptionInfoClevis(mopool.ClevisInfo()), interp=_clevis_to_str ) if encrypted else "unencrypted" @@ -435,14 +436,14 @@ def _print_detail_view(self, pool_uuid, pool): Print detailed view of a stopped pool. :param str pool_uuid: the pool UUID - :param pool: a table of information on this pool + :param StoppedPool pool: information about a single pool :type pool: dict of str * object """ - print(f"Name: {self._pool_name(pool.get('name'))}") + print(f"Name: {self._pool_name(pool.name)}") print(f"UUID: {self.uuid_formatter(pool_uuid)}") - key_description = pool.get("key_description") + key_description = pool.key_description key_description_str = ( "unencrypted" if key_description is None @@ -450,17 +451,19 @@ def _print_detail_view(self, pool_uuid, pool): ) print(f"Key Description: {key_description_str}") - clevis_info = pool.get("clevis_info") + clevis_info = pool.clevis_info clevis_info_str = ( "unencrypted" if clevis_info is None - else _non_existent_or_inconsistent_to_str(clevis_info, interp=clevis_to_str) + else _non_existent_or_inconsistent_to_str( + clevis_info, interp=_clevis_to_str + ) ) print(f"Clevis Configuration: {clevis_info_str}") print("Devices:") - for dev in pool["devs"]: - print(f"{self.uuid_formatter(dev['uuid'])} {dev['devnode']}") + for dev in pool.devs: + print(f"{self.uuid_formatter(dev.uuid)} {dev.devnode}") def list_pools(self): """ @@ -488,13 +491,16 @@ def key_description_str(value): if self.selection is None: tables = [ ( - self._pool_name(info.get("name")), + self._pool_name(sp.name), self.uuid_formatter(pool_uuid), - str(len(info["devs"])), - key_description_str(info.get("key_description")), - clevis_str(info.get("clevis_info")), + str(len(sp.devs)), + key_description_str(sp.key_description), + clevis_str(sp.clevis_info), + ) + for pool_uuid, sp in ( + (pool_uuid, StoppedPool(info)) + for pool_uuid, info in stopped_pools.items() ) - for (pool_uuid, info) in stopped_pools.items() ] print_table( @@ -531,4 +537,4 @@ def selection_func(_uuid, info): (pool_uuid, pool) = stopped_pool - self._print_detail_view(pool_uuid, pool) + self._print_detail_view(pool_uuid, StoppedPool(pool)) diff --git a/src/stratis_cli/_actions/_pool.py b/src/stratis_cli/_actions/_pool.py index 1c108b032..11826b9ed 100644 --- a/src/stratis_cli/_actions/_pool.py +++ b/src/stratis_cli/_actions/_pool.py @@ -46,7 +46,7 @@ from ._constants import TOP_OBJECT from ._formatting import get_property, get_uuid_formatter from ._list_pool import list_pools -from ._utils import get_clevis_info +from ._utils import ClevisInfo def _generate_pools_to_blockdevs(managed_objects, to_be_added, tier): @@ -176,7 +176,7 @@ def create_pool(namespace): # pylint: disable=too-many-locals _check_same_tier(pool_name, managed_objects, blockdevs, BlockDevTiers.DATA) - clevis_info = get_clevis_info(namespace) + clevis_info = ClevisInfo.get_info_from_namespace(namespace) ( (changed, (pool_object_path, _)), diff --git a/src/stratis_cli/_actions/_utils.py b/src/stratis_cli/_actions/_utils.py index 00859cf95..a7fbf0b42 100644 --- a/src/stratis_cli/_actions/_utils.py +++ b/src/stratis_cli/_actions/_utils.py @@ -16,6 +16,10 @@ Miscellaneous functions. """ +# isort: STDLIB +import json +from uuid import UUID + from .._errors import ( StratisCliMissingClevisTangURLError, StratisCliMissingClevisThumbprintError, @@ -46,46 +50,138 @@ def __init__(self, pin, config): self.pin = pin self.config = config - def __str__(self): # pragma: no cover - config_string = " ".join( - f"{key}: {value}" for key, value in self.config.items() - ) - return f"{self.pin} {config_string}" + @staticmethod + def get_info_from_namespace(namespace): + """ + Get clevis info, if any, from the namespace. + :param namespace: namespace set up by the parser -def get_clevis_info(namespace): - """ - Get clevis info, if any, from the namespace. + :returns: clevis info or None + :rtype: ClevisInfo or NoneType + """ + clevis_info = None + if namespace.clevis is not None: + if namespace.clevis in ("nbde", "tang"): + if namespace.tang_url is None: + raise StratisCliMissingClevisTangURLError() - :param namespace: namespace set up by the parser + if not namespace.trust_url and namespace.thumbprint is None: + raise StratisCliMissingClevisThumbprintError() + + clevis_config = {CLEVIS_KEY_URL: namespace.tang_url} + if namespace.trust_url: + clevis_config[CLEVIS_KEY_TANG_TRUST_URL] = True + else: + assert namespace.thumbprint is not None + clevis_config[CLEVIS_KEY_THP] = namespace.thumbprint + + clevis_info = ClevisInfo(CLEVIS_PIN_TANG, clevis_config) + + elif namespace.clevis == "tpm2": + clevis_info = ClevisInfo(CLEVIS_PIN_TPM2, {}) - :returns: clevis info or None - :rtype: ClevisInfo or NoneType - """ - clevis_info = None - if namespace.clevis is not None: - if namespace.clevis in ("nbde", "tang"): - if namespace.tang_url is None: - raise StratisCliMissingClevisTangURLError() - - if not namespace.trust_url and namespace.thumbprint is None: - raise StratisCliMissingClevisThumbprintError() - - clevis_config = {CLEVIS_KEY_URL: namespace.tang_url} - if namespace.trust_url: - clevis_config[CLEVIS_KEY_TANG_TRUST_URL] = True else: - assert namespace.thumbprint is not None - clevis_config[CLEVIS_KEY_THP] = namespace.thumbprint + raise AssertionError( + f"unexpected value {namespace.clevis} for clevis option" + ) # pragma: no cover - clevis_info = ClevisInfo(CLEVIS_PIN_TANG, clevis_config) + return clevis_info - elif namespace.clevis == "tpm2": - clevis_info = ClevisInfo(CLEVIS_PIN_TPM2, {}) +class EncryptionInfo: # pylint: disable=too-few-public-methods + """ + Generic information about a single encryption method. + """ + + def __init__(self, info): + """ + Initializer. + :param info: info about an encryption method, as a dbus-python type + """ + (consistent, info) = info + if consistent: + (encrypted, value) = info + self.value = value if encrypted else None else: - raise AssertionError( - f"unexpected value {namespace.clevis} for clevis option" - ) # pragma: no cover + # No tests that generate inconsistent encryption information + self.error = str(info) # pragma: no cover + + def consistent(self): + """ + True if consistent, otherwise False. + """ + return not hasattr(self, "error") + + +class EncryptionInfoClevis(EncryptionInfo): # pylint: disable=too-few-public-methods + """ + Encryption info for Clevis + """ + + def __init__(self, info): + super().__init__(info) + + # We don't test with Clevis for coverage + if hasattr(self, "value"): # pragma: no cover + value = self.value + if value is not None: + (pin, config) = value + self.value = ClevisInfo(str(pin), json.loads(str(config))) + + +class EncryptionInfoKeyDescription( + EncryptionInfo +): # pylint: disable=too-few-public-methods + """ + Encryption info for kernel keyring + """ + + def __init__(self, info): + super().__init__(info) + + # Our listing code excludes creating an object of this class without + # it being consistent and set. + if hasattr(self, "value"): # pragma: no cover + value = self.value + if value is not None: + self.value = str(value) + + +class Device: # pylint: disable=too-few-public-methods + """ + A representation of a device in a stopped pool. + """ + + def __init__(self, mapping): + self.uuid = UUID(mapping["uuid"]) + self.devnode = str(mapping["devnode"]) + + +class StoppedPool: # pylint: disable=too-few-public-methods + """ + A representation of a single stopped pool. + """ + + def __init__(self, pool_info): + """ + Initializer. + :param pool_info: a D-Bus structure + """ + + self.devs = [Device(info) for info in pool_info["devs"]] + + clevis_info = pool_info.get("clevis_info") + self.clevis_info = ( + None if clevis_info is None else EncryptionInfoClevis(clevis_info) + ) + + key_description = pool_info.get("key_description") + self.key_description = ( + None + if key_description is None + else EncryptionInfoKeyDescription(key_description) + ) - return clevis_info + name = pool_info.get("name") + self.name = None if name is None else str(name)