From 772d2143336f07af4130448b39873d3c0a0e8408 Mon Sep 17 00:00:00 2001 From: Daniel Goldman Date: Sat, 11 Nov 2023 18:34:26 -0500 Subject: [PATCH] improve tests and lint --- test/crypt_utils.py | 24 +++++++++++--- test/key_req.py | 47 +++++++++++++++++++++++++--- test/keyutils_test.py | 73 +++++++++++++++++++++++-------------------- 3 files changed, 101 insertions(+), 43 deletions(-) diff --git a/test/crypt_utils.py b/test/crypt_utils.py index 09cffd3..2ad93ac 100644 --- a/test/crypt_utils.py +++ b/test/crypt_utils.py @@ -61,8 +61,8 @@ def process_openssl_objects(objs: dict): if k in {"private-key", "public-key", "P"}: # out[k] = bytes.fromhex(v.replace(":", "")) out[k] = v.replace(":", "").encode("ascii") - elif re.match("\d+ \(0x\d+\)", v): # matches '2 (0x2)' - match = re.match("\d+ \(0x(\d+)\)", v) + elif re.match(r"\d+ \(0x\d+\)", v): # matches '2 (0x2)' + match = re.match(r"\d+ \(0x(\d+)\)", v) out[k] = match.group(0).encode("utf-8") out[k] = b"02" # TODO: this is a shim else: @@ -131,7 +131,23 @@ def gen_child_cert(workdir, cadir, regen: bool): der_path = str(workdir / "rsa.x509.der") subprocess.run(["openssl", "req", "-new", "-key", privkey_path, "-out", csr_path, "-subj", "/C=CA/O=example/CN=turkeyutils-leaf"]) - subprocess.run(["openssl", "x509", "-req", "-in", csr_path, "-CA", str(cadir/"rsa.x509.der"), "-CAkey", str(cadir/"rsa.pem"), "-CAcreateserial", "-days", "365", "-outform", "DER", "-out", der_path]) + subprocess.run([ + "openssl", + "x509", + "-req", + "-in", + csr_path, + "-CA", + str(cadir / "rsa.x509.der"), + "-CAkey", str(cadir / "rsa.pem"), + "-CAcreateserial", + "-days", + "365", + "-outform", + "DER", + "-out", + der_path + ]) with open(der_path, mode="rb") as der_file: der = der_file.read() @@ -157,7 +173,7 @@ def rsa_keys(workdir: Path, regen=True): ca_key, ca_cert = gen_rsa(workdir / "ca", regen) unsigned_key, unsigned_cert = gen_rsa(workdir / "unsigned", regen) leaf_key, leaf_cert = gen_child_cert(workdir / "leaf", workdir / "ca", regen) - pkcs8 = rsa_to_pkcs8(workdir/ "unsigned") + pkcs8 = rsa_to_pkcs8(workdir / "unsigned") return { "ca": ca_cert, "unsigned": unsigned_cert, diff --git a/test/key_req.py b/test/key_req.py index 158ce47..f15f22c 100755 --- a/test/key_req.py +++ b/test/key_req.py @@ -1,5 +1,9 @@ #!/tmp/venv/bin/python3 +import ast +import os import sys +import textwrap +from pathlib import Path def report(s): @@ -7,6 +11,7 @@ def report(s): f.write(str(s)) f.write("\n") + def debug(keyid, desc, callout, keyring_id): import keyutils @@ -27,14 +32,46 @@ def debug(keyid, desc, callout, keyring_id): keyutils.instantiate(keyid, f"Debug {callout}".encode("utf-8"), keyring_id) +def install(tmpdir, install): + """Install the keyutils config file""" + if install: + report("install request-key handler") + # Create a launcher script to get around a parser limitation in `/sbin/request-key`. + # request-key will take the executable path as everything up to the last "/", + # which is a problem for having both the python and the script file with absolute paths. + # eg "/path/to/bin/python /path/to/script.py %k %d %c %S" + # is parsed as ["/path/to/bin/python /path/to/script.py", "%k", "%d", "%c", "%S"] + launcher_path = Path(tmpdir) / "key_req.sh" + with open(launcher_path, mode="w", encoding="utf-8") as launcher: + launcher.write(textwrap.dedent(f"""\ + #!/bin/bash + {sys.executable} {Path(__file__)} $@ + """)) + os.chmod(launcher_path, 0o755) + + with open(Path("/etc/request-key.d/turkeyutils.conf"), mode="w", encoding="utf-8") as config: + # add a config for to call into our executable for key requests + config.write(textwrap.dedent(f"""\ + #OP TYPE DESCRIPTION CALLOUT INFO PROGRAM ARG1 ARG2 ARG3 ... + #====== ======= =============== =============== =============================== + create user turkeyutils:* * {launcher_path} %k %d %c %S + """)) + else: + report("uninstall request-key handler") + Path("/etc/request-key.d/turkeyutils.conf").unlink(missing_ok=True) + + if __name__ == "__main__": report(sys.argv) - if len(sys.argv) != 5: - raise ValueError("need to pass 4 arguments") - - _, keyid, desc, callout, keyring_id = sys.argv try: - debug(int(keyid), desc, callout, int(keyring_id)) + if len(sys.argv) == 5: + _, keyid, desc, callout, keyring_id = sys.argv + debug(int(keyid), desc, callout, int(keyring_id)) + elif len(sys.argv) == 3: + _, tmpdir, install_raw = sys.argv + install(tmpdir, bool(ast.literal_eval(install_raw))) + else: + raise ValueError("wrong number of args passed") except Exception as e: report(e) sys.exit(1) diff --git a/test/keyutils_test.py b/test/keyutils_test.py index 25628bf..08c2a94 100644 --- a/test/keyutils_test.py +++ b/test/keyutils_test.py @@ -18,7 +18,6 @@ import os import subprocess import sys -import textwrap import time import unittest from pathlib import Path @@ -29,6 +28,17 @@ from test import crypt_utils +def has_sudo(): + """ + Test that we have root capabilities which we need for privileged key operations + """ + proc = subprocess.run(["sudo", "-v", "-n"], capture_output=True, text=True) + return proc.returncode == 0 + + +needs_sudo = pytest.mark.skipif(not has_sudo(), reason="requires sudo permissions") + + @pytest.fixture(scope="function") def ring(request): return keyutils.add_ring(request.function.__name__.encode("utf-8"), keyutils.KEY_SPEC_THREAD_KEYRING) @@ -41,7 +51,6 @@ def rings(parent: int, n: int = 2): return rings - class BasicTest(unittest.TestCase): def testSet(self): keyDesc = b"test:key:01" @@ -123,6 +132,9 @@ def testLink(self): keyutils.link(child, parent) self.assertEqual(keyutils.search(parent, desc), keyId) + keyutils.unlink(child, parent) + self.assertEqual(keyutils.search(parent, desc), None) + def testTimeout(self): desc = b"dummyKey" value = b"dummyValue" @@ -238,44 +250,30 @@ def test_move_exclusive(self, ring): def test_capabilities(self): caps = keyutils.capabilities() assert caps - assert not caps.startswith(b'\x00') # the first byte will contain the results, and it should contain _something_ + assert not caps.startswith(b'\x00') # the first byte will contain the results, and it should contain _something_ + def test_get_keyring_id(): keyring = keyutils.get_keyring_id(keyutils.KEY_SPEC_THREAD_KEYRING, False) assert keyring is not None and keyring != 0 -@pytest.mark.skip -class TestNeedsSudo: - def test_keyring_chown(self): - key_id = keyutils.add_key(b"chown_n", b"chown_v", keyutils.KEY_SPEC_THREAD_KEYRING) +@pytest.mark.xfail(reason="Not implemented") +@needs_sudo +def test_keyring_chown(self): + # TODO: implement this + keyutils.add_key(b"chown_n", b"chown_v", keyutils.KEY_SPEC_THREAD_KEYRING) + raise NotImplementedError() @pytest.fixture def request_key(tmpdir): - # Create a launcher script to get around a parser limitation in `/sbin/request-key`. - # request-key will take the executable path as everything up to the last "/", - # which is a problem for having both the python and the script file with absolute paths. - # eg "/path/to/bin/python /path/to/script.py %k %d %c %S" - # is parsed as ["/path/to/bin/python /path/to/script.py", "%k", "%d", "%c", "%S"] - launcher_path = Path(tmpdir) / "key_req.sh" - with open(launcher_path, mode="w", encoding="utf-8") as launcher: - launcher.write(textwrap.dedent(f"""\ - #!/bin/bash - {sys.executable} {Path(__file__).parent / "key_req.py"} $@ - """)) - os.chmod(launcher_path, 0o755) - - with open(Path("/etc/request-key.d/turkeyutils.conf"), mode="w", encoding="utf-8") as config: - # add a config for to call into our executable for key requests - config.write(textwrap.dedent(f"""\ - #OP TYPE DESCRIPTION CALLOUT INFO PROGRAM ARG1 ARG2 ARG3 ... - #====== ======= =============== =============== =============================== - create user turkeyutils:* * {launcher_path} %k %d %c %S - """)) - - -@pytest.mark.skip + subprocess.run(["sudo", "-n", sys.executable, Path(__file__).parent / "key_req.py", tmpdir, "True"]) + yield + subprocess.run(["sudo", "-n", sys.executable, Path(__file__).parent / "key_req.py", tmpdir, "False"]) + + +@needs_sudo class TestInstantiate: def test_instantiate(self, request_key): key = keyutils.request_key(b"turkeyutils:instantiate", keyutils.KEY_SPEC_THREAD_KEYRING, callout_info=b"pytest") @@ -291,9 +289,13 @@ def test_negate(self, request_key): def test_reject(self, request_key): with pytest.raises(keyutils.KeyutilsError) as e: - key = keyutils.request_key(b"turkeyutils:reject", keyutils.KEY_SPEC_THREAD_KEYRING, callout_info=b"reject") + keyutils.request_key(b"turkeyutils:reject", keyutils.KEY_SPEC_THREAD_KEYRING, callout_info=b"reject") assert e.value.args[0] == 128 + def test_instantiation_failed(self, request_key): + key = keyutils.request_key(b"aaaa:aaaa", keyutils.KEY_SPEC_THREAD_KEYRING, callout_info=b"pytest") + assert key is None + @pytest.fixture def dh_keys(tmpdir): @@ -360,11 +362,15 @@ def test_restrict_keyring(self, rsa_keys): def supports_pkcs8(): + """ + We need the pkcs8_key_parser to be loaded in order to handle asymmetric operations + Try loading it with `sudo modprobe pkcs8_key_parser` + """ lsmod = subprocess.run(["lsmod"], capture_output=True, text=True) return "pkcs8_key_parser" in lsmod.stdout -needs_pkcs8 = pytest.mark.skipif(not supports_pkcs8(), reason="requires pkcs8 parser to insert ") +needs_pkcs8 = pytest.mark.skipif(not supports_pkcs8(), reason="requires pkcs8 parser to insert asymmetric keys") class TestPKey: @@ -394,10 +400,9 @@ def test_sign(self, ring, rsa_keys): bad_data = b"some bad data" with pytest.raises(keyutils.KeyutilsError) as e: - bad_verify = keyutils.pkey_verify(pkey, bad_data, sig, info=b'enc=pkcs1 hash=sha256') + keyutils.pkey_verify(pkey, bad_data, sig, info=b'enc=pkcs1 hash=sha256') assert e.value.args[1] == 'Key was rejected by service' - if __name__ == "__main__": sys.exit(unittest.main())