From e31e3b9b340fc01216838d42860574b0f61842be Mon Sep 17 00:00:00 2001 From: "Endi S. Dewata" Date: Tue, 18 Jul 2023 22:23:37 -0500 Subject: [PATCH] Clean up keygen.py The code that generates system cert requests have been moved into PKIDeployer. --- .../python/pki/server/deployment/__init__.py | 324 +++++++++++++++++ .../server/deployment/scriptlets/keygen.py | 331 +----------------- 2 files changed, 325 insertions(+), 330 deletions(-) diff --git a/base/server/python/pki/server/deployment/__init__.py b/base/server/python/pki/server/deployment/__init__.py index 7d11dfdbaf1..da23a4cc95d 100644 --- a/base/server/python/pki/server/deployment/__init__.py +++ b/base/server/python/pki/server/deployment/__init__.py @@ -906,6 +906,330 @@ def get_cert_id(self, subsystem, tag): else: return tag + def generate_ca_signing_request(self, subsystem): + + csr_path = self.mdict.get('pki_ca_signing_csr_path') + if not csr_path: + return + + basic_constraints_ext = { + 'ca': True, + 'path_length': None, + 'critical': True + } + + key_usage_ext = { + 'digitalSignature': True, + 'nonRepudiation': True, + 'certSigning': True, + 'crlSigning': True, + 'critical': True + } + + # if specified, add generic CSR extension + generic_exts = None + + if 'preop.cert.signing.ext.oid' in subsystem.config and \ + 'preop.cert.signing.ext.data' in subsystem.config: + + data = subsystem.config['preop.cert.signing.ext.data'] + critical = subsystem.config['preop.cert.signing.ext.critical'] + + generic_ext = { + 'oid': subsystem.config['preop.cert.signing.ext.oid'], + 'data': binascii.unhexlify(data), + 'critical': config.str2bool(critical) + } + + generic_exts = [generic_ext] + + tag = 'signing' + cert = subsystem.get_subsystem_cert(tag) + token = pki.nssdb.normalize_token(cert['token']) + + if not token: + token = self.mdict['pki_token_name'] + + nssdb = subsystem.instance.open_nssdb( + token=token, + user=self.mdict.get('pki_user'), + group=self.mdict.get('pki_group'), + ) + + try: + self.generate_csr( + nssdb, + subsystem, + tag, + csr_path, + basic_constraints_ext=basic_constraints_ext, + key_usage_ext=key_usage_ext, + generic_exts=generic_exts, + subject_key_id=self.configuration_file.req_ski, + ) + + finally: + nssdb.close() + + def generate_kra_storage_request(self, subsystem): + + csr_path = self.mdict.get('pki_storage_csr_path') + if not csr_path: + return + + key_usage_ext = { + 'digitalSignature': True, + 'nonRepudiation': True, + 'keyEncipherment': True, + 'dataEncipherment': True, + 'critical': True + } + + extended_key_usage_ext = { + 'clientAuth': True + } + + tag = 'storage' + cert = subsystem.get_subsystem_cert(tag) + token = pki.nssdb.normalize_token(cert['token']) + + if not token: + token = self.mdict['pki_token_name'] + + nssdb = subsystem.instance.open_nssdb(token) + + try: + self.generate_csr( + nssdb, + subsystem, + tag, + csr_path, + key_usage_ext=key_usage_ext, + extended_key_usage_ext=extended_key_usage_ext + ) + + finally: + nssdb.close() + + def generate_kra_transport_request(self, subsystem): + + csr_path = self.mdict.get('pki_transport_csr_path') + if not csr_path: + return + + key_usage_ext = { + 'digitalSignature': True, + 'nonRepudiation': True, + 'keyEncipherment': True, + 'dataEncipherment': True, + 'critical': True + } + + extended_key_usage_ext = { + 'clientAuth': True + } + + tag = 'transport' + cert = subsystem.get_subsystem_cert(tag) + token = pki.nssdb.normalize_token(cert['token']) + + if not token: + token = self.mdict['pki_token_name'] + + nssdb = subsystem.instance.open_nssdb(token) + + try: + self.generate_csr( + nssdb, + subsystem, + tag, + csr_path, + key_usage_ext=key_usage_ext, + extended_key_usage_ext=extended_key_usage_ext + ) + + finally: + nssdb.close() + + def generate_ocsp_signing_request(self, subsystem): + + csr_path = self.mdict.get('pki_ocsp_signing_csr_path') + if not csr_path: + return + + tag = 'signing' + cert = subsystem.get_subsystem_cert(tag) + token = pki.nssdb.normalize_token(cert['token']) + + if not token: + token = self.mdict['pki_token_name'] + + nssdb = subsystem.instance.open_nssdb(token) + + try: + self.generate_csr( + nssdb, + subsystem, + tag, + csr_path + ) + + finally: + nssdb.close() + + def generate_sslserver_request(self, subsystem): + + csr_path = self.mdict.get('pki_sslserver_csr_path') + if not csr_path: + return + + key_usage_ext = { + 'digitalSignature': True, + 'nonRepudiation': True, + 'keyEncipherment': True, + 'dataEncipherment': True, + 'critical': True + } + + extended_key_usage_ext = { + 'serverAuth': True + } + + tag = 'sslserver' + cert = subsystem.get_subsystem_cert(tag) + token = pki.nssdb.normalize_token(cert['token']) + + if not token: + token = self.mdict['pki_token_name'] + + nssdb = subsystem.instance.open_nssdb(token) + + try: + self.generate_csr( + nssdb, + subsystem, + tag, + csr_path, + key_usage_ext=key_usage_ext, + extended_key_usage_ext=extended_key_usage_ext + ) + + finally: + nssdb.close() + + def generate_subsystem_request(self, subsystem): + + csr_path = self.mdict.get('pki_subsystem_csr_path') + if not csr_path: + return + + key_usage_ext = { + 'digitalSignature': True, + 'nonRepudiation': True, + 'keyEncipherment': True, + 'dataEncipherment': True, + 'critical': True + } + + extended_key_usage_ext = { + 'serverAuth': True, + 'clientAuth': True + } + + tag = 'subsystem' + cert = subsystem.get_subsystem_cert(tag) + token = pki.nssdb.normalize_token(cert['token']) + + if not token: + token = self.mdict['pki_token_name'] + + nssdb = subsystem.instance.open_nssdb(token) + + try: + self.generate_csr( + nssdb, + subsystem, + tag, + csr_path, + key_usage_ext=key_usage_ext, + extended_key_usage_ext=extended_key_usage_ext + ) + + finally: + nssdb.close() + + def generate_audit_signing_request(self, subsystem): + + csr_path = self.mdict.get('pki_audit_signing_csr_path') + if not csr_path: + return + + key_usage_ext = { + 'digitalSignature': True, + 'nonRepudiation': True, + 'critical': True + } + + tag = 'audit_signing' + cert = subsystem.get_subsystem_cert(tag) + token = pki.nssdb.normalize_token(cert['token']) + + if not token: + token = self.mdict['pki_token_name'] + + nssdb = subsystem.instance.open_nssdb(token) + + try: + self.generate_csr( + nssdb, + subsystem, + tag, + csr_path, + key_usage_ext=key_usage_ext + ) + + finally: + nssdb.close() + + def generate_admin_request(self, subsystem): + + csr_path = self.mdict.get('pki_admin_csr_path') + if not csr_path: + return + + client_nssdb = pki.nssdb.NSSDatabase( + directory=self.mdict['pki_client_database_dir'], + password=self.mdict['pki_client_database_password']) + + try: + self.generate_csr( + client_nssdb, + subsystem, + 'admin', + csr_path + ) + + finally: + client_nssdb.close() + + def generate_system_cert_requests(self, subsystem): + + if subsystem.name == 'ca': + self.generate_ca_signing_request(subsystem) + + if subsystem.name == 'kra': + self.generate_kra_storage_request(subsystem) + self.generate_kra_transport_request(subsystem) + + if subsystem.name == 'ocsp': + self.generate_ocsp_signing_request(subsystem) + + if subsystem.name in ['kra', 'ocsp', 'tks', 'tps']: + self.generate_sslserver_request(subsystem) + self.generate_subsystem_request(subsystem) + self.generate_audit_signing_request(subsystem) + self.generate_admin_request(subsystem) + def import_system_cert_request(self, subsystem, tag): cert_id = self.get_cert_id(subsystem, tag) diff --git a/base/server/python/pki/server/deployment/scriptlets/keygen.py b/base/server/python/pki/server/deployment/scriptlets/keygen.py index 0728e77cd52..900f3fdbd00 100644 --- a/base/server/python/pki/server/deployment/scriptlets/keygen.py +++ b/base/server/python/pki/server/deployment/scriptlets/keygen.py @@ -19,11 +19,8 @@ # from __future__ import absolute_import -import binascii import logging -import pki.nssdb - from .. import pkiconfig as config from .. import pkiscriptlet @@ -32,330 +29,6 @@ class PkiScriptlet(pkiscriptlet.AbstractBasePkiScriptlet): - def generate_ca_signing_csr(self, deployer, subsystem): - - csr_path = deployer.mdict.get('pki_ca_signing_csr_path') - if not csr_path: - return - - basic_constraints_ext = { - 'ca': True, - 'path_length': None, - 'critical': True - } - - key_usage_ext = { - 'digitalSignature': True, - 'nonRepudiation': True, - 'certSigning': True, - 'crlSigning': True, - 'critical': True - } - - # if specified, add generic CSR extension - generic_exts = None - - if 'preop.cert.signing.ext.oid' in subsystem.config and \ - 'preop.cert.signing.ext.data' in subsystem.config: - - data = subsystem.config['preop.cert.signing.ext.data'] - critical = subsystem.config['preop.cert.signing.ext.critical'] - - generic_ext = { - 'oid': subsystem.config['preop.cert.signing.ext.oid'], - 'data': binascii.unhexlify(data), - 'critical': config.str2bool(critical) - } - - generic_exts = [generic_ext] - - tag = 'signing' - cert = subsystem.get_subsystem_cert(tag) - token = pki.nssdb.normalize_token(cert['token']) - - if not token: - token = deployer.mdict['pki_token_name'] - - nssdb = subsystem.instance.open_nssdb( - token=token, - user=deployer.mdict.get('pki_user'), - group=deployer.mdict.get('pki_group'), - ) - - try: - deployer.generate_csr( - nssdb, - subsystem, - tag, - csr_path, - basic_constraints_ext=basic_constraints_ext, - key_usage_ext=key_usage_ext, - generic_exts=generic_exts, - subject_key_id=deployer.configuration_file.req_ski, - ) - - finally: - nssdb.close() - - def generate_sslserver_csr(self, deployer, subsystem): - - csr_path = deployer.mdict.get('pki_sslserver_csr_path') - if not csr_path: - return - - key_usage_ext = { - 'digitalSignature': True, - 'nonRepudiation': True, - 'keyEncipherment': True, - 'dataEncipherment': True, - 'critical': True - } - - extended_key_usage_ext = { - 'serverAuth': True - } - - tag = 'sslserver' - cert = subsystem.get_subsystem_cert(tag) - token = pki.nssdb.normalize_token(cert['token']) - - if not token: - token = deployer.mdict['pki_token_name'] - - nssdb = subsystem.instance.open_nssdb(token) - - try: - deployer.generate_csr( - nssdb, - subsystem, - tag, - csr_path, - key_usage_ext=key_usage_ext, - extended_key_usage_ext=extended_key_usage_ext - ) - - finally: - nssdb.close() - - def generate_subsystem_csr(self, deployer, subsystem): - - csr_path = deployer.mdict.get('pki_subsystem_csr_path') - if not csr_path: - return - - key_usage_ext = { - 'digitalSignature': True, - 'nonRepudiation': True, - 'keyEncipherment': True, - 'dataEncipherment': True, - 'critical': True - } - - extended_key_usage_ext = { - 'serverAuth': True, - 'clientAuth': True - } - - tag = 'subsystem' - cert = subsystem.get_subsystem_cert(tag) - token = pki.nssdb.normalize_token(cert['token']) - - if not token: - token = deployer.mdict['pki_token_name'] - - nssdb = subsystem.instance.open_nssdb(token) - - try: - deployer.generate_csr( - nssdb, - subsystem, - tag, - csr_path, - key_usage_ext=key_usage_ext, - extended_key_usage_ext=extended_key_usage_ext - ) - - finally: - nssdb.close() - - def generate_audit_signing_csr(self, deployer, subsystem): - - csr_path = deployer.mdict.get('pki_audit_signing_csr_path') - if not csr_path: - return - - key_usage_ext = { - 'digitalSignature': True, - 'nonRepudiation': True, - 'critical': True - } - - tag = 'audit_signing' - cert = subsystem.get_subsystem_cert(tag) - token = pki.nssdb.normalize_token(cert['token']) - - if not token: - token = deployer.mdict['pki_token_name'] - - nssdb = subsystem.instance.open_nssdb(token) - - try: - deployer.generate_csr( - nssdb, - subsystem, - tag, - csr_path, - key_usage_ext=key_usage_ext - ) - - finally: - nssdb.close() - - def generate_admin_csr(self, deployer, subsystem): - - csr_path = deployer.mdict.get('pki_admin_csr_path') - if not csr_path: - return - - client_nssdb = pki.nssdb.NSSDatabase( - directory=deployer.mdict['pki_client_database_dir'], - password=deployer.mdict['pki_client_database_password']) - - try: - deployer.generate_csr( - client_nssdb, - subsystem, - 'admin', - csr_path - ) - - finally: - client_nssdb.close() - - def generate_kra_storage_csr(self, deployer, subsystem): - - csr_path = deployer.mdict.get('pki_storage_csr_path') - if not csr_path: - return - - key_usage_ext = { - 'digitalSignature': True, - 'nonRepudiation': True, - 'keyEncipherment': True, - 'dataEncipherment': True, - 'critical': True - } - - extended_key_usage_ext = { - 'clientAuth': True - } - - tag = 'storage' - cert = subsystem.get_subsystem_cert(tag) - token = pki.nssdb.normalize_token(cert['token']) - - if not token: - token = deployer.mdict['pki_token_name'] - - nssdb = subsystem.instance.open_nssdb(token) - - try: - deployer.generate_csr( - nssdb, - subsystem, - tag, - csr_path, - key_usage_ext=key_usage_ext, - extended_key_usage_ext=extended_key_usage_ext - ) - - finally: - nssdb.close() - - def generate_kra_transport_csr(self, deployer, subsystem): - - csr_path = deployer.mdict.get('pki_transport_csr_path') - if not csr_path: - return - - key_usage_ext = { - 'digitalSignature': True, - 'nonRepudiation': True, - 'keyEncipherment': True, - 'dataEncipherment': True, - 'critical': True - } - - extended_key_usage_ext = { - 'clientAuth': True - } - - tag = 'transport' - cert = subsystem.get_subsystem_cert(tag) - token = pki.nssdb.normalize_token(cert['token']) - - if not token: - token = deployer.mdict['pki_token_name'] - - nssdb = subsystem.instance.open_nssdb(token) - - try: - deployer.generate_csr( - nssdb, - subsystem, - tag, - csr_path, - key_usage_ext=key_usage_ext, - extended_key_usage_ext=extended_key_usage_ext - ) - - finally: - nssdb.close() - - def generate_ocsp_signing_csr(self, deployer, subsystem): - - csr_path = deployer.mdict.get('pki_ocsp_signing_csr_path') - if not csr_path: - return - - tag = 'signing' - cert = subsystem.get_subsystem_cert(tag) - token = pki.nssdb.normalize_token(cert['token']) - - if not token: - token = deployer.mdict['pki_token_name'] - - nssdb = subsystem.instance.open_nssdb(token) - - try: - deployer.generate_csr( - nssdb, - subsystem, - tag, - csr_path - ) - - finally: - nssdb.close() - - def generate_system_cert_requests(self, deployer, subsystem): - - if subsystem.name == 'ca': - self.generate_ca_signing_csr(deployer, subsystem) - - if subsystem.name in ['kra', 'ocsp', 'tks', 'tps']: - self.generate_sslserver_csr(deployer, subsystem) - self.generate_subsystem_csr(deployer, subsystem) - self.generate_audit_signing_csr(deployer, subsystem) - self.generate_admin_csr(deployer, subsystem) - - if subsystem.name == 'kra': - self.generate_kra_storage_csr(deployer, subsystem) - self.generate_kra_transport_csr(deployer, subsystem) - - if subsystem.name == 'ocsp': - self.generate_ocsp_signing_csr(deployer, subsystem) - def spawn(self, deployer): if config.str2bool(deployer.mdict['pki_skip_installation']): @@ -375,9 +48,7 @@ def spawn(self, deployer): step_one = deployer.configuration_file.external_step_one if (external or standalone) and step_one: - - self.generate_system_cert_requests(deployer, subsystem) - + deployer.generate_system_cert_requests(subsystem) subsystem.save() def destroy(self, deployer):