From 5501c797f016970b57ae1ebc586963ee62bf0411 Mon Sep 17 00:00:00 2001 From: Telcobot Date: Sat, 8 Feb 2025 00:09:34 +0000 Subject: [PATCH] chore: update charm libraries --- .../v4/tls_certificates.py | 128 ++++++++++++++---- 1 file changed, 105 insertions(+), 23 deletions(-) diff --git a/lib/charms/tls_certificates_interface/v4/tls_certificates.py b/lib/charms/tls_certificates_interface/v4/tls_certificates.py index 1cdf37f..9559b3a 100644 --- a/lib/charms/tls_certificates_interface/v4/tls_certificates.py +++ b/lib/charms/tls_certificates_interface/v4/tls_certificates.py @@ -9,7 +9,7 @@ Pre-requisites: - Juju >= 3.0 - cryptography >= 43.0.0 - - pydantic + - pydantic >= 2.0 Learn more on how-to use the TLS Certificates interface library by reading the documentation: - https://charmhub.io/tls-certificates-interface/ @@ -52,9 +52,12 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 7 +LIBPATCH = 8 -PYDEPS = ["cryptography", "pydantic"] +PYDEPS = [ + "cryptography>=43.0.0", + "pydantic>=2.0", +] logger = logging.getLogger(__name__) @@ -211,6 +214,27 @@ def from_string(cls, private_key: str) -> "PrivateKey": """Create a PrivateKey object from a private key.""" return cls(raw=private_key.strip()) + def is_valid(self) -> bool: + """Validate that the private key is PEM-formatted, RSA, and at least 2048 bits.""" + try: + key = serialization.load_pem_private_key( + self.raw.encode(), + password=None, + ) + + if not isinstance(key, rsa.RSAPrivateKey): + logger.warning("Private key is not an RSA key") + return False + + if key.key_size < 2048: + logger.warning("RSA key size is less than 2048 bits") + return False + + return True + except ValueError: + logger.warning("Invalid private key format") + return False + @dataclass(frozen=True) class Certificate: @@ -612,12 +636,14 @@ def generate_private_key( """Generate a private key with the RSA algorithm. Args: - key_size (int): Key size in bytes + key_size (int): Key size in bits, must be at least 2048 bits public_exponent: Public exponent. Returns: PrivateKey: Private Key """ + if key_size < 2048: + raise ValueError("Key size must be at least 2048 bits for RSA security") private_key = rsa.generate_private_key( public_exponent=public_exponent, key_size=key_size, @@ -978,6 +1004,7 @@ def __init__( certificate_requests: List[CertificateRequestAttributes], mode: Mode = Mode.UNIT, refresh_events: List[BoundEvent] = [], + private_key: Optional[PrivateKey] = None, ): """Create a new instance of the TLSCertificatesRequiresV4 class. @@ -989,6 +1016,12 @@ def __init__( mode (Mode): Whether to use unit or app certificates mode. Default is Mode.UNIT. refresh_events (List[BoundEvent]): A list of events to trigger a refresh of the certificates. + private_key (Optional[PrivateKey]): The private key to use for the certificates. + If provided, it will be used instead of generating a new one. + If the key is not valid an exception will be raised. + Using this parameter is discouraged, + having to pass around private keys manually can be a security concern. + Allowing the library to generate and manage the key is the more secure approach. """ super().__init__(charm, relationship_name) if not JujuVersion.from_environ().has_secrets: @@ -1002,6 +1035,9 @@ def __init__( self.relationship_name = relationship_name self.certificate_requests = certificate_requests self.mode = mode + if private_key and not private_key.is_valid(): + raise TLSCertificatesError("Invalid private key") + self._private_key = private_key self.framework.observe(charm.on[relationship_name].relation_created, self._configure) self.framework.observe(charm.on[relationship_name].relation_changed, self._configure) self.framework.observe(charm.on.secret_expired, self._on_secret_expired) @@ -1020,10 +1056,10 @@ def _configure(self, _: EventBase): if not self._tls_relation_created(): logger.debug("TLS relation not created yet.") return - self._generate_private_key() + self._ensure_private_key() + self._cleanup_certificate_requests() self._send_certificate_requests() self._find_available_certificates() - self._cleanup_certificate_requests() def _mode_is_valid(self, mode: Mode) -> bool: return mode in [Mode.UNIT, Mode.APP] @@ -1114,45 +1150,91 @@ def _get_app_or_unit(self) -> Union[Application, Unit]: @property def private_key(self) -> Optional[PrivateKey]: """Return the private key.""" + if self._private_key: + return self._private_key if not self._private_key_generated(): return None secret = self.charm.model.get_secret(label=self._get_private_key_secret_label()) private_key = secret.get_content(refresh=True)["private-key"] return PrivateKey.from_string(private_key) - def _generate_private_key(self) -> None: + def _ensure_private_key(self) -> None: + """Make sure there is a private key to be used. + + It will make sure there is a private key passed by the charm using the private_key + parameter or generate a new one otherwise. + """ + # Remove the generated private key + # if one has been passed by the charm using the private_key parameter + if self._private_key: + self._remove_private_key_secret() + return if self._private_key_generated(): + logger.debug("Private key already generated") return - private_key = generate_private_key() - self.charm.unit.add_secret( - content={"private-key": str(private_key)}, - label=self._get_private_key_secret_label(), - ) - logger.info("Private key generated") + self._generate_private_key() def regenerate_private_key(self) -> None: """Regenerate the private key. Generate a new private key, remove old certificate requests and send new ones. + + Raises: + TLSCertificatesError: If the private key is passed by the charm using the + private_key parameter. """ + if self._private_key: + raise TLSCertificatesError( + "Private key is passed by the charm through the private_key parameter, " + "this function can't be used" + ) if not self._private_key_generated(): logger.warning("No private key to regenerate") return - self._regenerate_private_key() + self._generate_private_key() self._cleanup_certificate_requests() self._send_certificate_requests() - def _regenerate_private_key(self) -> None: - secret = self.charm.model.get_secret(label=self._get_private_key_secret_label()) - secret.set_content({"private-key": str(generate_private_key())}) - secret.get_content(refresh=True) + def _generate_private_key(self) -> None: + """Generate a new private key and store it in a secret. + + This is the case when the private key used is generated by the library. + and not passed by the charm using the private_key parameter. + """ + self._store_private_key_in_secret(generate_private_key()) + logger.info("Private key generated") def _private_key_generated(self) -> bool: + """Check if a private key is stored in a secret. + + This is the case when the private key used is generated by the library. + This should not exist when the private key used + is passed by the charm using the private_key parameter. + """ try: - self.charm.model.get_secret(label=self._get_private_key_secret_label()) - except (SecretNotFoundError, KeyError): + secret = self.charm.model.get_secret(label=self._get_private_key_secret_label()) + secret.get_content(refresh=True) + return True + except SecretNotFoundError: return False - return True + + def _store_private_key_in_secret(self, private_key: PrivateKey) -> None: + try: + secret = self.charm.model.get_secret(label=self._get_private_key_secret_label()) + secret.set_content({"private-key": str(private_key)}) + except SecretNotFoundError: + self.charm.unit.add_secret( + content={"private-key": str(private_key)}, + label=self._get_private_key_secret_label(), + ) + + def _remove_private_key_secret(self) -> None: + """Remove the private key secret.""" + try: + secret = self.charm.model.get_secret(label=self._get_private_key_secret_label()) + secret.remove_all_revisions() + except SecretNotFoundError: + logger.warning("Private key secret not found, nothing to remove") def _csr_matches_certificate_request( self, certificate_signing_request: CertificateSigningRequest, is_ca: bool @@ -1429,9 +1511,9 @@ def _tls_relation_created(self) -> bool: def _get_private_key_secret_label(self) -> str: if self.mode == Mode.UNIT: - return f"{LIBID}-private-key-{self._get_unit_number()}" + return f"{LIBID}-private-key-{self._get_unit_number()}-{self.relationship_name}" elif self.mode == Mode.APP: - return f"{LIBID}-private-key" + return f"{LIBID}-private-key-{self.relationship_name}" else: raise TLSCertificatesError("Invalid mode. Must be Mode.UNIT or Mode.APP.")