Skip to content

Commit

Permalink
LuksScanner: Add LUKS dump scanner and models
Browse files Browse the repository at this point in the history
Add LuksScanner actor that runs 'cryptsetup luksDump' for all 'crypt'
from lsblk output. The output is then parsed and filled into LuksDump
and LuksToken models.
  • Loading branch information
danzatt committed Apr 18, 2024
1 parent 7ecdb24 commit 4dea4c5
Show file tree
Hide file tree
Showing 10 changed files with 910 additions and 0 deletions.
23 changes: 23 additions & 0 deletions repos/system_upgrade/common/actors/luksscanner/actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from leapp.actors import Actor
from leapp.libraries.actor import luksscanner
from leapp.models import LuksDump, StorageInfo
from leapp.reporting import Report
from leapp.tags import FactsPhaseTag, IPUWorkflowTag


class LuksScanner(Actor):
"""
Provides data about active LUKS devices.
Scans all block devices of 'crypt' type and attempts to run 'cryptsetup luksDump' on them.
For every 'crypt' device a LuksDump model is produced. Furthermore, if there is any LUKS token
of type clevis, the concrete subtype is determined using 'clevis luks list'.
"""

name = 'luks_scanner'
consumes = (StorageInfo,)
produces = (Report, LuksDump)
tags = (IPUWorkflowTag, FactsPhaseTag)

def process(self):
self.produce(*luksscanner.get_luks_dumps())
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
#!/usr/bin/python


class LuksDumpParser(object):
"""
Class for parsing "cryptsetup luksDump" output. Given a list of lines, it
generates a dictionary representing the dump.
"""

class Node(object):
"""
Helper class, every line is represented as a node. The node depth is
based on the indentation of the line. A dictionary is produced after
all lines are inserted.
"""

def __init__(self, indented_line):
self.children = []
self.level = len(indented_line) - len(indented_line.lstrip())
self.text = indented_line.strip()

def add_children(self, nodes):
childlevel = nodes[0].level
while nodes:
node = nodes.pop(0)
if node.level == childlevel: # add node as a child
self.children.append(node)
elif node.level > childlevel: # add nodes as grandchildren of the last child
nodes.insert(0, node)
self.children[-1].add_children(nodes)
elif node.level <= self.level: # this node is a sibling, no more children
nodes.insert(0, node)
return

def as_dict(self):
if len(self.children) > 1:
children = [node.as_dict() for node in self.children]

return {self.text: LuksDumpParser._merge_list(children)}
if len(self.children) == 1:
return {self.text: self.children[0].as_dict()}
return self.text

@staticmethod
def _count_type(elem_list, elem_type):
""" Count the number of items of elem_type inside the elem_list """
return sum(isinstance(x, elem_type) for x in elem_list)

@staticmethod
def _merge_list(elem_list):
"""
Given a list of elements merge them into a single element. If all
elements are strings, concatenate them into a single string. When all
the elements are dictionaries merge them into a single dictionary
containing the keys/values from all of the dictionaries.
"""

dict_count = LuksDumpParser._count_type(elem_list, dict)
str_count = LuksDumpParser._count_type(elem_list, str)

result = elem_list
if dict_count == len(elem_list):
result = {}
for element in elem_list:
result.update(element)
elif str_count == len(elem_list):
result = "".join(elem_list)

return result

@staticmethod
def _find_single_str(elem_list):
""" If the list contains exactly one string return it or return None otherwise. """

result = None

for elem in elem_list:
if isinstance(elem, str):
if result is not None:
# more than one strings in the list
return None
result = elem

return result

@staticmethod
def _fixup_type(elem_list, type_string):
single_string = LuksDumpParser._find_single_str(elem_list)

if single_string is not None:
elem_list.remove(single_string)
elem_list.append({type_string: single_string})

@staticmethod
def _fixup_section(section, type_string):
for key, value in section.items():
LuksDumpParser._fixup_type(value, type_string)
section[key] = LuksDumpParser._merge_list(section[key])

@staticmethod
def _fixup_dict(parsed_dict):
""" Various fixups of the parsed dictionary """

if "Version" not in parsed_dict:
return
if parsed_dict["Version"] == "1":
for i in range(8):
keyslot = "Key Slot {}".format(i)

if keyslot not in parsed_dict:
continue

if parsed_dict[keyslot] in ["ENABLED", "DISABLED"]:
parsed_dict[keyslot] = {"enabled": parsed_dict[keyslot] == "ENABLED"}

if not isinstance(parsed_dict[keyslot], list):
continue

enabled = None
if "ENABLED" in parsed_dict[keyslot]:
enabled = True
parsed_dict[keyslot].remove("ENABLED")
if "DISABLED" in parsed_dict[keyslot]:
enabled = False
parsed_dict[keyslot].remove("DISABLED")
parsed_dict[keyslot] = LuksDumpParser._merge_list(parsed_dict[keyslot])
if enabled is not None:
parsed_dict[keyslot]["enabled"] = enabled
elif parsed_dict["Version"] == "2":
for section in ["Keyslots", "Digests", "Data segments", "Tokens"]:
if section in parsed_dict:
LuksDumpParser._fixup_section(parsed_dict[section], "type")

@staticmethod
def _fixup_dump(dump):
"""
Replace tabs with spaces, for lines with colon a move the text
after column on new line with the indent of the following line.
"""

dump = [line.replace("\t", " "*8).replace("\n", "") for line in dump]
newdump = []

for i, line in enumerate(dump):
if not line.strip():
continue

if ':' in line:
first_half = line.split(":")[0]
second_half = ":".join(line.split(":")[1:]).lstrip()

current_level = len(line) - len(line.lstrip())
if i+1 < len(dump):
next_level = len(dump[i+1]) - len(dump[i+1].lstrip())
else:
next_level = current_level

if next_level > current_level:
second_half = " " * next_level + second_half
else:
second_half = " " * (current_level + 8) + second_half

newdump.append(first_half)
if second_half.strip():
newdump.append(second_half)
else:
newdump.append(line)

return newdump

@staticmethod
def parse(dump):
"""
Parse the output of "cryptsetup luksDump" command into a dictionary.
:param dump: List of output lines of luksDump
:returns: Parsed dictionary
"""

root = LuksDumpParser.Node('root')

nodes = []
for line in LuksDumpParser._fixup_dump(dump):
nodes.append(LuksDumpParser.Node(line))

root.add_children(nodes)
root = root.as_dict()['root']

if isinstance(root, list):
result = {}
for child in root:
if isinstance(child, str):
child = {child: {}}
result.update(child)
root = result

LuksDumpParser._fixup_dict(root)
return root
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import functools

from leapp import reporting
from leapp.exceptions import StopActorExecutionError
from leapp.libraries import stdlib
from leapp.libraries.actor.luksdump_parser import LuksDumpParser
from leapp.libraries.stdlib import api
from leapp.models import LuksDump, LuksToken, StorageInfo


def aslist(f):
""" Decorator used to convert generator to list """
@functools.wraps(f)
def inner(*args, **kwargs):
return list(f(*args, **kwargs))
return inner


def _get_clevis_type(device_path, keyslot):
"""
Assuming the device is initialized using clevis, determine the type of
clevis token associated to the specified keyslot.
"""
try:
result = stdlib.run(["clevis", "luks", "list", "-d", device_path, "-s", str(keyslot)])
except OSError:
reporting.create_report([
reporting.Title('No "clevis" command installed.'),
reporting.Summary('A LUKS drive with clevis token was discovered, but there is '
'no clevis package installed. The clevis command is required '
'to determine clevis token type.'),
reporting.Severity(reporting.Severity.HIGH),
reporting.Groups([reporting.Groups.ENCRYPTION]),
reporting.Groups([reporting.Groups.INHIBITOR]),
reporting.Remediation(hint='Use dnf to install the "clevis-luks" package.'),
])
raise StopActorExecutionError(
'The clevis command is was not found'
)
except stdlib.CalledProcessError as e:
api.current_logger().debug("clevis list command failed with an error code: {}".format(e.returncode))

# return generic token name, without specifying subtype
return "clevis"

line = result["stdout"].split()
if len(line) != 3:
raise StopActorExecutionError(
'Invalid "clevis list" output detected'
)

return "clevis-{}".format(line[1])


@aslist
def _get_tokens(device_path, luksdump_dict):
""" Given a parsed LUKS dump, produce a list of tokens """
if "Version" not in luksdump_dict or luksdump_dict["Version"] != '2':
return
if "Tokens" not in luksdump_dict:
raise StopActorExecutionError(
'No tokens in cryptsetup luksDump output'
)

for token_id in luksdump_dict["Tokens"]:
token = luksdump_dict["Tokens"][token_id]

if "Keyslot" not in token or "type" not in token:
raise StopActorExecutionError(
'Token specification does not contain keyslot or type',
)
keyslot = int(token["Keyslot"])
token_type = token["type"]

if token_type == "clevis":
token_type = _get_clevis_type(device_path, keyslot)

yield LuksToken(
token_id=int(token_id),
keyslot=keyslot,
token_type=token_type
)


def get_luks_dump_by_device(device_path, device_name):
""" Determine info about LUKS device using cryptsetup and clevis commands """

try:
result = stdlib.run(['cryptsetup', 'luksDump', device_path])
luksdump_dict = LuksDumpParser.parse(result["stdout"].splitlines())

version = int(luksdump_dict["Version"]) if "Version" in luksdump_dict else None
uuid = luksdump_dict["UUID"] if "UUID" in luksdump_dict else None
if version is None or uuid is None:
api.current_logger().error(
'Failed to detect UUID or version from the output "cryptsetup luksDump {}" command'.format(device_path)
)
raise StopActorExecutionError(
'Failed to detect UUID or version from the output "cryptsetup luksDump {}" command'.format(device_path)
)

return LuksDump(
version=version,
uuid=uuid,
device_path=device_path,
device_name=device_name,
tokens=_get_tokens(device_path, luksdump_dict)
)

except (OSError, stdlib.CalledProcessError) as ex:
api.current_logger().error(
'Failed to execute "cryptsetup luksDump" command: {}'.format(ex)
)
raise StopActorExecutionError(
'Failed to execute "cryptsetup luksDump {}" command'.format(device_path)
)


@aslist
def get_luks_dumps():
""" Collect info abaout every active LUKS device """

for storage_info in api.consume(StorageInfo):
for blk in storage_info.lsblk:
if blk.tp == 'crypt' and blk.parent_path:
yield get_luks_dump_by_device(blk.parent_path, blk.parent_name)
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
LUKS header information for /dev/loop10

Version: 1
Cipher name: aes
Cipher mode: xts-plain64
Hash spec: sha256
Payload offset: 4096
MK bits: 512
MK digest: fb ec 6b 31 ae e4 49 03 3e ad 43 22 02 cf a8 78 ad 3c d2 a8
MK salt: 17 57 4e 2f ed 0b 5c 62 d5 de 54 f5 7f ab 60 68
71 d8 72 06 64 6c 81 05 39 55 3f 55 32 56 d9 da
MK iterations: 114573
UUID: 90242257-d00a-4019-aba6-03083f89404b

Key Slot 0: ENABLED
Iterations: 1879168
Salt: fc 77 48 72 bd 31 ca 83 23 80 5a 5e b9 5b de bb
55 ac d5 a9 3b 96 ad a5 82 bc 11 68 ba f8 87 56
Key material offset: 8
AF stripes: 4000
Key Slot 1: DISABLED
Key Slot 2: DISABLED
Key Slot 3: DISABLED
Key Slot 4: DISABLED
Key Slot 5: DISABLED
Key Slot 6: DISABLED
Key Slot 7: DISABLED
Loading

0 comments on commit 4dea4c5

Please sign in to comment.