-
Notifications
You must be signed in to change notification settings - Fork 148
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
LuksScanner: Add LUKS dump scanner and models
Add LuksScanner actor that runs 'cryptsetup luksDump' for all 'crypt' from lsblk output. The output is then parsed and filled into LuksDump and LuksToken models. The LuksDump model contains information about LUKS version, device UUID, corresponding device path, name of the backing device (which contains the LUKS header) and a list of LuksToken models. LuksToken model represents a token associated with the given LUKS device. It contains token ID, IDs of associated keyslot and token type. If the token type is "clevis", we use "clevis luks list" command to determine the clevis-specific subtype and append it to the token name. E.g. if there is a "clevis" token and "clevis luks list" returns "tpm2", the token type will be "clevis-tpm2".
- Loading branch information
Showing
10 changed files
with
925 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
from leapp.actors import Actor | ||
from leapp.libraries.actor import luksscanner | ||
from leapp.models import LuksDumps, StorageInfo | ||
from leapp.reporting import Report | ||
from leapp.tags import FactsPhaseTag, IPUWorkflowTag | ||
|
||
|
||
class LuksScanner(Actor): | ||
""" | ||
Provides data about active LUKS devices. | ||
Scans all block devices of 'crypt' type and attempts to run 'cryptsetup luksDump' on them. | ||
For every 'crypt' device a LuksDump model is produced. Furthermore, if there is any LUKS token | ||
of type clevis, the concrete subtype is determined using 'clevis luks list'. | ||
""" | ||
|
||
name = 'luks_scanner' | ||
consumes = (StorageInfo,) | ||
produces = (Report, LuksDumps) | ||
tags = (IPUWorkflowTag, FactsPhaseTag) | ||
|
||
def process(self): | ||
self.produce(luksscanner.get_luks_dumps_model()) |
199 changes: 199 additions & 0 deletions
199
repos/system_upgrade/common/actors/luksscanner/libraries/luksdump_parser.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
class LuksDumpParser(object): | ||
""" | ||
Class for parsing "cryptsetup luksDump" output. Given a list of lines, it | ||
generates a dictionary representing the dump. | ||
""" | ||
|
||
class Node(object): | ||
""" | ||
Helper class, every line is represented as a node. The node depth is | ||
based on the indentation of the line. A dictionary is produced after | ||
all lines are inserted. | ||
""" | ||
|
||
def __init__(self, indented_line): | ||
self.children = [] | ||
self.level = len(indented_line) - len(indented_line.lstrip()) | ||
self.text = indented_line.strip() | ||
|
||
def add_children(self, nodes): | ||
# NOTE(pstodulk): it's expected that nodes are non-empty list and | ||
# having it empty is an error if it happens. So keeping a hard crash | ||
# for now as having an empty list it's hypothetical now and I would | ||
# probably end with en error anyway if discovered. | ||
childlevel = nodes[0].level | ||
while nodes: | ||
node = nodes.pop(0) | ||
if node.level == childlevel: # add node as a child | ||
self.children.append(node) | ||
elif node.level > childlevel: # add nodes as grandchildren of the last child | ||
nodes.insert(0, node) | ||
self.children[-1].add_children(nodes) | ||
elif node.level <= self.level: # this node is a sibling, no more children | ||
nodes.insert(0, node) | ||
return | ||
|
||
def as_dict(self): | ||
if len(self.children) > 1: | ||
children = [node.as_dict() for node in self.children] | ||
|
||
return {self.text: LuksDumpParser._merge_list(children)} | ||
if len(self.children) == 1: | ||
return {self.text: self.children[0].as_dict()} | ||
return self.text | ||
|
||
@staticmethod | ||
def _count_type(elem_list, elem_type): | ||
""" Count the number of items of elem_type inside the elem_list """ | ||
return sum(isinstance(x, elem_type) for x in elem_list) | ||
|
||
@staticmethod | ||
def _merge_list(elem_list): | ||
""" | ||
Given a list of elements merge them into a single element. If all | ||
elements are strings, concatenate them into a single string. When all | ||
the elements are dictionaries merge them into a single dictionary | ||
containing the keys/values from all of the dictionaries. | ||
""" | ||
|
||
dict_count = LuksDumpParser._count_type(elem_list, dict) | ||
str_count = LuksDumpParser._count_type(elem_list, str) | ||
|
||
result = elem_list | ||
if dict_count == len(elem_list): | ||
result = {} | ||
for element in elem_list: | ||
result.update(element) | ||
elif str_count == len(elem_list): | ||
result = "".join(elem_list) | ||
|
||
return result | ||
|
||
@staticmethod | ||
def _find_single_str(elem_list): | ||
""" If the list contains exactly one string return it or return None otherwise. """ | ||
|
||
result = None | ||
|
||
for elem in elem_list: | ||
if isinstance(elem, str): | ||
if result is not None: | ||
# more than one strings in the list | ||
return None | ||
result = elem | ||
|
||
return result | ||
|
||
@staticmethod | ||
def _fixup_type(elem_list, type_string): | ||
single_string = LuksDumpParser._find_single_str(elem_list) | ||
|
||
if single_string is not None: | ||
elem_list.remove(single_string) | ||
elem_list.append({type_string: single_string}) | ||
|
||
@staticmethod | ||
def _fixup_section(section, type_string): | ||
for key, value in section.items(): | ||
LuksDumpParser._fixup_type(value, type_string) | ||
section[key] = LuksDumpParser._merge_list(section[key]) | ||
|
||
@staticmethod | ||
def _fixup_dict(parsed_dict): | ||
""" Various fixups of the parsed dictionary """ | ||
|
||
if "Version" not in parsed_dict: | ||
return | ||
if parsed_dict["Version"] == "1": | ||
for i in range(8): | ||
keyslot = "Key Slot {}".format(i) | ||
|
||
if keyslot not in parsed_dict: | ||
continue | ||
|
||
if parsed_dict[keyslot] in ["ENABLED", "DISABLED"]: | ||
parsed_dict[keyslot] = {"enabled": parsed_dict[keyslot] == "ENABLED"} | ||
|
||
if not isinstance(parsed_dict[keyslot], list): | ||
continue | ||
|
||
enabled = None | ||
if "ENABLED" in parsed_dict[keyslot]: | ||
enabled = True | ||
parsed_dict[keyslot].remove("ENABLED") | ||
if "DISABLED" in parsed_dict[keyslot]: | ||
enabled = False | ||
parsed_dict[keyslot].remove("DISABLED") | ||
parsed_dict[keyslot] = LuksDumpParser._merge_list(parsed_dict[keyslot]) | ||
if enabled is not None: | ||
parsed_dict[keyslot]["enabled"] = enabled | ||
elif parsed_dict["Version"] == "2": | ||
for section in ["Keyslots", "Digests", "Data segments", "Tokens"]: | ||
if section in parsed_dict: | ||
LuksDumpParser._fixup_section(parsed_dict[section], "type") | ||
|
||
@staticmethod | ||
def _fixup_dump(dump): | ||
""" | ||
Replace tabs with spaces, for lines with colon a move the text | ||
after column on new line with the indent of the following line. | ||
""" | ||
|
||
dump = [line.replace("\t", " "*8).replace("\n", "") for line in dump] | ||
newdump = [] | ||
|
||
for i, line in enumerate(dump): | ||
if not line.strip(): | ||
continue | ||
|
||
if ':' in line: | ||
first_half = line.split(":")[0] | ||
second_half = ":".join(line.split(":")[1:]).lstrip() | ||
|
||
current_level = len(line) - len(line.lstrip()) | ||
if i+1 < len(dump): | ||
next_level = len(dump[i+1]) - len(dump[i+1].lstrip()) | ||
else: | ||
next_level = current_level | ||
|
||
if next_level > current_level: | ||
second_half = " " * next_level + second_half | ||
else: | ||
second_half = " " * (current_level + 8) + second_half | ||
|
||
newdump.append(first_half) | ||
if second_half.strip(): | ||
newdump.append(second_half) | ||
else: | ||
newdump.append(line) | ||
|
||
return newdump | ||
|
||
@staticmethod | ||
def parse(dump): | ||
""" | ||
Parse the output of "cryptsetup luksDump" command into a dictionary. | ||
:param dump: List of output lines of luksDump | ||
:returns: Parsed dictionary | ||
""" | ||
|
||
root = LuksDumpParser.Node('root') | ||
|
||
nodes = [] | ||
for line in LuksDumpParser._fixup_dump(dump): | ||
nodes.append(LuksDumpParser.Node(line)) | ||
|
||
root.add_children(nodes) | ||
root = root.as_dict()['root'] | ||
|
||
if isinstance(root, list): | ||
result = {} | ||
for child in root: | ||
if isinstance(child, str): | ||
child = {child: {}} | ||
result.update(child) | ||
root = result | ||
|
||
LuksDumpParser._fixup_dict(root) | ||
return root |
125 changes: 125 additions & 0 deletions
125
repos/system_upgrade/common/actors/luksscanner/libraries/luksscanner.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
import functools | ||
|
||
from leapp.exceptions import StopActorExecutionError | ||
from leapp.libraries import stdlib | ||
from leapp.libraries.actor.luksdump_parser import LuksDumpParser | ||
from leapp.libraries.stdlib import api | ||
from leapp.models import LuksDump, LuksDumps, LuksToken, StorageInfo | ||
|
||
|
||
def aslist(f): | ||
""" Decorator used to convert generator to list """ | ||
@functools.wraps(f) | ||
def inner(*args, **kwargs): | ||
return list(f(*args, **kwargs)) | ||
return inner | ||
|
||
|
||
def _get_clevis_type(device_path, keyslot): | ||
""" | ||
Assuming the device is initialized using clevis, determine the type of | ||
clevis token associated to the specified keyslot. | ||
""" | ||
try: | ||
result = stdlib.run(["clevis", "luks", "list", "-d", device_path, "-s", str(keyslot)]) | ||
except OSError: | ||
message = ('A LUKS drive with clevis token was discovered, but there is ' | ||
'no clevis package installed. The clevis command is required ' | ||
'to determine clevis token type.') | ||
details = {'hint': 'Use dnf to install the "clevis-luks" package.'} | ||
raise StopActorExecutionError(message=message, details=details) | ||
except stdlib.CalledProcessError as e: | ||
api.current_logger().debug("clevis list command failed with an error code: {}".format(e.exit_code)) | ||
|
||
message = ('The "clevis luks list" command failed. This' | ||
'might be because the clevis-luks package is' | ||
'missing on your system.') | ||
details = {'hint': 'Use dnf to install the "clevis-luks" package.'} | ||
raise StopActorExecutionError(message=message, details=details) | ||
|
||
line = result["stdout"].split() | ||
if len(line) != 3: | ||
raise StopActorExecutionError( | ||
'Invalid "clevis list" output detected' | ||
) | ||
|
||
return "clevis-{}".format(line[1]) | ||
|
||
|
||
@aslist | ||
def _get_tokens(device_path, luksdump_dict): | ||
""" Given a parsed LUKS dump, produce a list of tokens """ | ||
if "Version" not in luksdump_dict or luksdump_dict["Version"] != '2': | ||
return | ||
if "Tokens" not in luksdump_dict: | ||
raise StopActorExecutionError( | ||
'No tokens in cryptsetup luksDump output' | ||
) | ||
|
||
for token_id in luksdump_dict["Tokens"]: | ||
token = luksdump_dict["Tokens"][token_id] | ||
|
||
if "Keyslot" not in token or "type" not in token: | ||
raise StopActorExecutionError( | ||
'Token specification does not contain keyslot or type', | ||
) | ||
keyslot = int(token["Keyslot"]) | ||
token_type = token["type"] | ||
|
||
if token_type == "clevis": | ||
token_type = _get_clevis_type(device_path, keyslot) | ||
|
||
yield LuksToken( | ||
token_id=int(token_id), | ||
keyslot=keyslot, | ||
token_type=token_type | ||
) | ||
|
||
|
||
def get_luks_dump_by_device(device_path, device_name): | ||
""" Determine info about LUKS device using cryptsetup and clevis commands """ | ||
|
||
try: | ||
result = stdlib.run(['cryptsetup', 'luksDump', device_path]) | ||
luksdump_dict = LuksDumpParser.parse(result["stdout"].splitlines()) | ||
|
||
version = int(luksdump_dict["Version"]) if "Version" in luksdump_dict else None | ||
uuid = luksdump_dict["UUID"] if "UUID" in luksdump_dict else None | ||
if version is None or uuid is None: | ||
api.current_logger().error( | ||
'Failed to detect UUID or version from the output "cryptsetup luksDump {}" command'.format(device_path) | ||
) | ||
raise StopActorExecutionError( | ||
'Failed to detect UUID or version from the output "cryptsetup luksDump {}" command'.format(device_path) | ||
) | ||
|
||
return LuksDump( | ||
version=version, | ||
uuid=uuid, | ||
device_path=device_path, | ||
device_name=device_name, | ||
tokens=_get_tokens(device_path, luksdump_dict) | ||
) | ||
|
||
except (OSError, stdlib.CalledProcessError) as ex: | ||
api.current_logger().error( | ||
'Failed to execute "cryptsetup luksDump" command: {}'.format(ex) | ||
) | ||
raise StopActorExecutionError( | ||
'Failed to execute "cryptsetup luksDump {}" command'.format(device_path), | ||
details={'details': str(ex)} | ||
) | ||
|
||
|
||
@aslist | ||
def get_luks_dumps(): | ||
""" Collect info abaout every active LUKS device """ | ||
|
||
for storage_info in api.consume(StorageInfo): | ||
for blk in storage_info.lsblk: | ||
if blk.tp == 'crypt' and blk.parent_path: | ||
yield get_luks_dump_by_device(blk.parent_path, blk.parent_name) | ||
|
||
|
||
def get_luks_dumps_model(): | ||
return LuksDumps(dumps=get_luks_dumps()) |
27 changes: 27 additions & 0 deletions
27
repos/system_upgrade/common/actors/luksscanner/tests/files/luksDump_luks1.txt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
LUKS header information for /dev/loop10 | ||
|
||
Version: 1 | ||
Cipher name: aes | ||
Cipher mode: xts-plain64 | ||
Hash spec: sha256 | ||
Payload offset: 4096 | ||
MK bits: 512 | ||
MK digest: fb ec 6b 31 ae e4 49 03 3e ad 43 22 02 cf a8 78 ad 3c d2 a8 | ||
MK salt: 17 57 4e 2f ed 0b 5c 62 d5 de 54 f5 7f ab 60 68 | ||
71 d8 72 06 64 6c 81 05 39 55 3f 55 32 56 d9 da | ||
MK iterations: 114573 | ||
UUID: 90242257-d00a-4019-aba6-03083f89404b | ||
|
||
Key Slot 0: ENABLED | ||
Iterations: 1879168 | ||
Salt: fc 77 48 72 bd 31 ca 83 23 80 5a 5e b9 5b de bb | ||
55 ac d5 a9 3b 96 ad a5 82 bc 11 68 ba f8 87 56 | ||
Key material offset: 8 | ||
AF stripes: 4000 | ||
Key Slot 1: DISABLED | ||
Key Slot 2: DISABLED | ||
Key Slot 3: DISABLED | ||
Key Slot 4: DISABLED | ||
Key Slot 5: DISABLED | ||
Key Slot 6: DISABLED | ||
Key Slot 7: DISABLED |
Oops, something went wrong.