Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use some classes to unpack StoppedPools consistently #1006

Merged
merged 2 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 36 additions & 30 deletions src/stratis_cli/_actions/_list_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
"""

# isort: STDLIB
import json
from abc import ABC, abstractmethod

# isort: THIRDPARTY
Expand All @@ -35,7 +34,7 @@
print_table,
size_triple,
)
from ._utils import ClevisInfo
from ._utils import EncryptionInfoClevis, EncryptionInfoKeyDescription, StoppedPool


def _fetch_stopped_pools_property(proxy):
Expand All @@ -57,24 +56,22 @@ def _non_existent_or_inconsistent_to_str(
value, *, inconsistent_str="inconsistent", non_existent_str="N/A", interp=str
):
"""
Process dbus value that encodes both inconsistency and existence of the
Process dbus result that encodes both inconsistency and existence of the
value.

:param value: a dbus value
:param EncryptionInfo value: a dbus result
:param str inconsistent_str: value to return if inconsistent
:param str non_existent_str: value to return if non-existent
:param interp: how to interpret the value if it exists and is consistent
:returns: a string to print
:rtype: str
"""
(consistent, tuple_or_err_str) = value

if not consistent: # pragma: no cover
if not value.consistent(): # pragma: no cover
return inconsistent_str

(exists, value) = tuple_or_err_str
value = value.value

if not exists:
if value is None:
return non_existent_str

return interp(value)
Expand All @@ -92,15 +89,17 @@ def list_pools(uuid_formatter, *, stopped=False, selection=None):
klass.list_pools()


def clevis_to_str(clevis_dbus_object): # pragma: no cover
def _clevis_to_str(clevis_info): # pragma: no cover
"""
:param dbus.Struct clevis_dbus_object: clevis information
:param ClevisInfo clevis_info: the Clevis info to stringify
:return: a string that represents the clevis info
:rtype: str
"""
clevis_pin = str(clevis_dbus_object[0])
clevis_config = json.loads(clevis_dbus_object[1])
clevis_info = ClevisInfo(clevis_pin, clevis_config)
return str(clevis_info)

config_string = " ".join(
f"{key}: {value}" for key, value in clevis_info.config.items()
)
return f"{clevis_info.pin} {config_string}"


class List(ABC): # pylint: disable=too-few-public-methods
Expand Down Expand Up @@ -262,15 +261,17 @@ def _print_detail_view(self, mopool, size_change_codes):
)

key_description_str = (
_non_existent_or_inconsistent_to_str(mopool.KeyDescription())
_non_existent_or_inconsistent_to_str(
EncryptionInfoKeyDescription(mopool.KeyDescription())
)
if encrypted
else "unencrypted"
)
print(f"Key Description: {key_description_str}")

clevis_info_str = (
_non_existent_or_inconsistent_to_str(
mopool.ClevisInfo(), interp=clevis_to_str
EncryptionInfoClevis(mopool.ClevisInfo()), interp=_clevis_to_str
)
if encrypted
else "unencrypted"
Expand Down Expand Up @@ -435,32 +436,34 @@ def _print_detail_view(self, pool_uuid, pool):
Print detailed view of a stopped pool.

:param str pool_uuid: the pool UUID
:param pool: a table of information on this pool
:param StoppedPool pool: information about a single pool
:type pool: dict of str * object
"""
print(f"Name: {self._pool_name(pool.get('name'))}")
print(f"Name: {self._pool_name(pool.name)}")

print(f"UUID: {self.uuid_formatter(pool_uuid)}")

key_description = pool.get("key_description")
key_description = pool.key_description
key_description_str = (
"unencrypted"
if key_description is None
else _non_existent_or_inconsistent_to_str(key_description)
)
print(f"Key Description: {key_description_str}")

clevis_info = pool.get("clevis_info")
clevis_info = pool.clevis_info
clevis_info_str = (
"unencrypted"
if clevis_info is None
else _non_existent_or_inconsistent_to_str(clevis_info, interp=clevis_to_str)
else _non_existent_or_inconsistent_to_str(
clevis_info, interp=_clevis_to_str
)
)
print(f"Clevis Configuration: {clevis_info_str}")

print("Devices:")
for dev in pool["devs"]:
print(f"{self.uuid_formatter(dev['uuid'])} {dev['devnode']}")
for dev in pool.devs:
print(f"{self.uuid_formatter(dev.uuid)} {dev.devnode}")

def list_pools(self):
"""
Expand Down Expand Up @@ -488,13 +491,16 @@ def key_description_str(value):
if self.selection is None:
tables = [
(
self._pool_name(info.get("name")),
self._pool_name(sp.name),
self.uuid_formatter(pool_uuid),
str(len(info["devs"])),
key_description_str(info.get("key_description")),
clevis_str(info.get("clevis_info")),
str(len(sp.devs)),
key_description_str(sp.key_description),
clevis_str(sp.clevis_info),
)
for pool_uuid, sp in (
(pool_uuid, StoppedPool(info))
for pool_uuid, info in stopped_pools.items()
)
for (pool_uuid, info) in stopped_pools.items()
]

print_table(
Expand Down Expand Up @@ -531,4 +537,4 @@ def selection_func(_uuid, info):

(pool_uuid, pool) = stopped_pool

self._print_detail_view(pool_uuid, pool)
self._print_detail_view(pool_uuid, StoppedPool(pool))
4 changes: 2 additions & 2 deletions src/stratis_cli/_actions/_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from ._constants import TOP_OBJECT
from ._formatting import get_property, get_uuid_formatter
from ._list_pool import list_pools
from ._utils import get_clevis_info
from ._utils import ClevisInfo


def _generate_pools_to_blockdevs(managed_objects, to_be_added, tier):
Expand Down Expand Up @@ -176,7 +176,7 @@ def create_pool(namespace): # pylint: disable=too-many-locals

_check_same_tier(pool_name, managed_objects, blockdevs, BlockDevTiers.DATA)

clevis_info = get_clevis_info(namespace)
clevis_info = ClevisInfo.get_info_from_namespace(namespace)

(
(changed, (pool_object_path, _)),
Expand Down
162 changes: 129 additions & 33 deletions src/stratis_cli/_actions/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
Miscellaneous functions.
"""

# isort: STDLIB
import json
from uuid import UUID

from .._errors import (
StratisCliMissingClevisTangURLError,
StratisCliMissingClevisThumbprintError,
Expand Down Expand Up @@ -46,46 +50,138 @@ def __init__(self, pin, config):
self.pin = pin
self.config = config

def __str__(self): # pragma: no cover
config_string = " ".join(
f"{key}: {value}" for key, value in self.config.items()
)
return f"{self.pin} {config_string}"
@staticmethod
def get_info_from_namespace(namespace):
"""
Get clevis info, if any, from the namespace.

:param namespace: namespace set up by the parser

def get_clevis_info(namespace):
"""
Get clevis info, if any, from the namespace.
:returns: clevis info or None
:rtype: ClevisInfo or NoneType
"""
clevis_info = None
if namespace.clevis is not None:
if namespace.clevis in ("nbde", "tang"):
if namespace.tang_url is None:
raise StratisCliMissingClevisTangURLError()

:param namespace: namespace set up by the parser
if not namespace.trust_url and namespace.thumbprint is None:
raise StratisCliMissingClevisThumbprintError()

clevis_config = {CLEVIS_KEY_URL: namespace.tang_url}
if namespace.trust_url:
clevis_config[CLEVIS_KEY_TANG_TRUST_URL] = True
else:
assert namespace.thumbprint is not None
clevis_config[CLEVIS_KEY_THP] = namespace.thumbprint

clevis_info = ClevisInfo(CLEVIS_PIN_TANG, clevis_config)

elif namespace.clevis == "tpm2":
clevis_info = ClevisInfo(CLEVIS_PIN_TPM2, {})

:returns: clevis info or None
:rtype: ClevisInfo or NoneType
"""
clevis_info = None
if namespace.clevis is not None:
if namespace.clevis in ("nbde", "tang"):
if namespace.tang_url is None:
raise StratisCliMissingClevisTangURLError()

if not namespace.trust_url and namespace.thumbprint is None:
raise StratisCliMissingClevisThumbprintError()

clevis_config = {CLEVIS_KEY_URL: namespace.tang_url}
if namespace.trust_url:
clevis_config[CLEVIS_KEY_TANG_TRUST_URL] = True
else:
assert namespace.thumbprint is not None
clevis_config[CLEVIS_KEY_THP] = namespace.thumbprint
raise AssertionError(
f"unexpected value {namespace.clevis} for clevis option"
) # pragma: no cover

clevis_info = ClevisInfo(CLEVIS_PIN_TANG, clevis_config)
return clevis_info

elif namespace.clevis == "tpm2":
clevis_info = ClevisInfo(CLEVIS_PIN_TPM2, {})

class EncryptionInfo: # pylint: disable=too-few-public-methods
"""
Generic information about a single encryption method.
"""

def __init__(self, info):
"""
Initializer.
:param info: info about an encryption method, as a dbus-python type
"""
(consistent, info) = info
if consistent:
(encrypted, value) = info
self.value = value if encrypted else None
else:
raise AssertionError(
f"unexpected value {namespace.clevis} for clevis option"
) # pragma: no cover
# No tests that generate inconsistent encryption information
self.error = str(info) # pragma: no cover

def consistent(self):
"""
True if consistent, otherwise False.
"""
return not hasattr(self, "error")


class EncryptionInfoClevis(EncryptionInfo): # pylint: disable=too-few-public-methods
"""
Encryption info for Clevis
"""

def __init__(self, info):
super().__init__(info)

# We don't test with Clevis for coverage
if hasattr(self, "value"): # pragma: no cover
value = self.value
if value is not None:
(pin, config) = value
self.value = ClevisInfo(str(pin), json.loads(str(config)))


class EncryptionInfoKeyDescription(
EncryptionInfo
): # pylint: disable=too-few-public-methods
"""
Encryption info for kernel keyring
"""

def __init__(self, info):
super().__init__(info)

# Our listing code excludes creating an object of this class without
# it being consistent and set.
if hasattr(self, "value"): # pragma: no cover
value = self.value
if value is not None:
self.value = str(value)


class Device: # pylint: disable=too-few-public-methods
"""
A representation of a device in a stopped pool.
"""

def __init__(self, mapping):
self.uuid = UUID(mapping["uuid"])
self.devnode = str(mapping["devnode"])


class StoppedPool: # pylint: disable=too-few-public-methods
"""
A representation of a single stopped pool.
"""

def __init__(self, pool_info):
"""
Initializer.
:param pool_info: a D-Bus structure
"""

self.devs = [Device(info) for info in pool_info["devs"]]

clevis_info = pool_info.get("clevis_info")
self.clevis_info = (
None if clevis_info is None else EncryptionInfoClevis(clevis_info)
)

key_description = pool_info.get("key_description")
self.key_description = (
None
if key_description is None
else EncryptionInfoKeyDescription(key_description)
)

return clevis_info
name = pool_info.get("name")
self.name = None if name is None else str(name)