Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Update charm libraries #390

Merged
merged 1 commit into from
Feb 8, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 105 additions & 23 deletions lib/charms/tls_certificates_interface/v4/tls_certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
Pre-requisites:
- Juju >= 3.0
- cryptography >= 43.0.0
- pydantic
- pydantic >= 2.0

Learn more on how-to use the TLS Certificates interface library by reading the documentation:
- https://charmhub.io/tls-certificates-interface/
Expand Down Expand Up @@ -52,9 +52,12 @@

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 7
LIBPATCH = 8

PYDEPS = ["cryptography", "pydantic"]
PYDEPS = [
"cryptography>=43.0.0",
"pydantic>=2.0",
]

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -211,6 +214,27 @@ def from_string(cls, private_key: str) -> "PrivateKey":
"""Create a PrivateKey object from a private key."""
return cls(raw=private_key.strip())

def is_valid(self) -> bool:
"""Validate that the private key is PEM-formatted, RSA, and at least 2048 bits."""
try:
key = serialization.load_pem_private_key(
self.raw.encode(),
password=None,
)

if not isinstance(key, rsa.RSAPrivateKey):
logger.warning("Private key is not an RSA key")
return False

if key.key_size < 2048:
logger.warning("RSA key size is less than 2048 bits")
return False

return True
except ValueError:
logger.warning("Invalid private key format")
return False


@dataclass(frozen=True)
class Certificate:
Expand Down Expand Up @@ -612,12 +636,14 @@ def generate_private_key(
"""Generate a private key with the RSA algorithm.

Args:
key_size (int): Key size in bytes
key_size (int): Key size in bits, must be at least 2048 bits
public_exponent: Public exponent.

Returns:
PrivateKey: Private Key
"""
if key_size < 2048:
raise ValueError("Key size must be at least 2048 bits for RSA security")
private_key = rsa.generate_private_key(
public_exponent=public_exponent,
key_size=key_size,
Expand Down Expand Up @@ -978,6 +1004,7 @@ def __init__(
certificate_requests: List[CertificateRequestAttributes],
mode: Mode = Mode.UNIT,
refresh_events: List[BoundEvent] = [],
private_key: Optional[PrivateKey] = None,
):
"""Create a new instance of the TLSCertificatesRequiresV4 class.

Expand All @@ -989,6 +1016,12 @@ def __init__(
mode (Mode): Whether to use unit or app certificates mode. Default is Mode.UNIT.
refresh_events (List[BoundEvent]): A list of events to trigger a refresh of
the certificates.
private_key (Optional[PrivateKey]): The private key to use for the certificates.
If provided, it will be used instead of generating a new one.
If the key is not valid an exception will be raised.
Using this parameter is discouraged,
having to pass around private keys manually can be a security concern.
Allowing the library to generate and manage the key is the more secure approach.
"""
super().__init__(charm, relationship_name)
if not JujuVersion.from_environ().has_secrets:
Expand All @@ -1002,6 +1035,9 @@ def __init__(
self.relationship_name = relationship_name
self.certificate_requests = certificate_requests
self.mode = mode
if private_key and not private_key.is_valid():
raise TLSCertificatesError("Invalid private key")
self._private_key = private_key
self.framework.observe(charm.on[relationship_name].relation_created, self._configure)
self.framework.observe(charm.on[relationship_name].relation_changed, self._configure)
self.framework.observe(charm.on.secret_expired, self._on_secret_expired)
Expand All @@ -1020,10 +1056,10 @@ def _configure(self, _: EventBase):
if not self._tls_relation_created():
logger.debug("TLS relation not created yet.")
return
self._generate_private_key()
self._ensure_private_key()
self._cleanup_certificate_requests()
self._send_certificate_requests()
self._find_available_certificates()
self._cleanup_certificate_requests()

def _mode_is_valid(self, mode: Mode) -> bool:
return mode in [Mode.UNIT, Mode.APP]
Expand Down Expand Up @@ -1114,45 +1150,91 @@ def _get_app_or_unit(self) -> Union[Application, Unit]:
@property
def private_key(self) -> Optional[PrivateKey]:
"""Return the private key."""
if self._private_key:
return self._private_key
if not self._private_key_generated():
return None
secret = self.charm.model.get_secret(label=self._get_private_key_secret_label())
private_key = secret.get_content(refresh=True)["private-key"]
return PrivateKey.from_string(private_key)

def _generate_private_key(self) -> None:
def _ensure_private_key(self) -> None:
"""Make sure there is a private key to be used.

It will make sure there is a private key passed by the charm using the private_key
parameter or generate a new one otherwise.
"""
# Remove the generated private key
# if one has been passed by the charm using the private_key parameter
if self._private_key:
self._remove_private_key_secret()
return
if self._private_key_generated():
logger.debug("Private key already generated")
return
private_key = generate_private_key()
self.charm.unit.add_secret(
content={"private-key": str(private_key)},
label=self._get_private_key_secret_label(),
)
logger.info("Private key generated")
self._generate_private_key()

def regenerate_private_key(self) -> None:
"""Regenerate the private key.

Generate a new private key, remove old certificate requests and send new ones.

Raises:
TLSCertificatesError: If the private key is passed by the charm using the
private_key parameter.
"""
if self._private_key:
raise TLSCertificatesError(
"Private key is passed by the charm through the private_key parameter, "
"this function can't be used"
)
if not self._private_key_generated():
logger.warning("No private key to regenerate")
return
self._regenerate_private_key()
self._generate_private_key()
self._cleanup_certificate_requests()
self._send_certificate_requests()

def _regenerate_private_key(self) -> None:
secret = self.charm.model.get_secret(label=self._get_private_key_secret_label())
secret.set_content({"private-key": str(generate_private_key())})
secret.get_content(refresh=True)
def _generate_private_key(self) -> None:
"""Generate a new private key and store it in a secret.

This is the case when the private key used is generated by the library.
and not passed by the charm using the private_key parameter.
"""
self._store_private_key_in_secret(generate_private_key())
logger.info("Private key generated")

def _private_key_generated(self) -> bool:
"""Check if a private key is stored in a secret.

This is the case when the private key used is generated by the library.
This should not exist when the private key used
is passed by the charm using the private_key parameter.
"""
try:
self.charm.model.get_secret(label=self._get_private_key_secret_label())
except (SecretNotFoundError, KeyError):
secret = self.charm.model.get_secret(label=self._get_private_key_secret_label())
secret.get_content(refresh=True)
return True
except SecretNotFoundError:
return False
return True

def _store_private_key_in_secret(self, private_key: PrivateKey) -> None:
try:
secret = self.charm.model.get_secret(label=self._get_private_key_secret_label())
secret.set_content({"private-key": str(private_key)})
except SecretNotFoundError:
self.charm.unit.add_secret(
content={"private-key": str(private_key)},
label=self._get_private_key_secret_label(),
)

def _remove_private_key_secret(self) -> None:
"""Remove the private key secret."""
try:
secret = self.charm.model.get_secret(label=self._get_private_key_secret_label())
secret.remove_all_revisions()
except SecretNotFoundError:
logger.warning("Private key secret not found, nothing to remove")

def _csr_matches_certificate_request(
self, certificate_signing_request: CertificateSigningRequest, is_ca: bool
Expand Down Expand Up @@ -1429,9 +1511,9 @@ def _tls_relation_created(self) -> bool:

def _get_private_key_secret_label(self) -> str:
if self.mode == Mode.UNIT:
return f"{LIBID}-private-key-{self._get_unit_number()}"
return f"{LIBID}-private-key-{self._get_unit_number()}-{self.relationship_name}"
elif self.mode == Mode.APP:
return f"{LIBID}-private-key"
return f"{LIBID}-private-key-{self.relationship_name}"
else:
raise TLSCertificatesError("Invalid mode. Must be Mode.UNIT or Mode.APP.")

Expand Down
Loading