Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update charm libraries #321

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 26 additions & 16 deletions lib/charms/observability_libs/v1/cert_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
Since this library uses [Juju Secrets](https://juju.is/docs/juju/secret) it requires Juju >= 3.0.3.
"""
import abc
import hashlib
import ipaddress
import json
import socket
Expand Down Expand Up @@ -67,7 +68,7 @@

LIBID = "b5cd5cd580f3428fa5f59a8876dcbe6a"
LIBAPI = 1
LIBPATCH = 13
LIBPATCH = 14

VAULT_SECRET_LABEL = "cert-handler-private-vault"

Expand Down Expand Up @@ -301,14 +302,11 @@ def __init__(
Must match metadata.yaml.
cert_subject: Custom subject. Name collisions are under the caller's responsibility.
sans: DNS names. If none are given, use FQDN.
refresh_events: an optional list of bound events which
will be observed to replace the current CSR with a new one
if there are changes in the CSR's DNS SANs or IP SANs.
Then, subsequently, replace its corresponding certificate with a new one.
refresh_events: [DEPRECATED].
"""
super().__init__(charm, key)
# use StoredState to store the hash of the CSR
# to potentially trigger a CSR renewal on `refresh_events`
# to potentially trigger a CSR renewal
self._stored.set_default(
csr_hash=None,
)
Expand All @@ -320,8 +318,9 @@ def __init__(

# Use fqdn only if no SANs were given, and drop empty/duplicate SANs
sans = list(set(filter(None, (sans or [socket.getfqdn()]))))
self.sans_ip = list(filter(is_ip_address, sans))
self.sans_dns = list(filterfalse(is_ip_address, sans))
# sort SANS lists to avoid unnecessary csr renewals during reconciliation
self.sans_ip = sorted(filter(is_ip_address, sans))
self.sans_dns = sorted(filterfalse(is_ip_address, sans))

if self._check_juju_supports_secrets():
vault_backend = _SecretVaultBackend(charm, secret_label=VAULT_SECRET_LABEL)
Expand Down Expand Up @@ -367,13 +366,15 @@ def __init__(
)

if refresh_events:
for ev in refresh_events:
self.framework.observe(ev, self._on_refresh_event)
logger.warn(
"DEPRECATION WARNING. `refresh_events` is now deprecated. CertHandler will automatically refresh the CSR when necessary."
)

def _on_refresh_event(self, _):
"""Replace the latest current CSR with a new one if there are any SANs changes."""
if self._stored.csr_hash != self._csr_hash:
self._generate_csr(renew=True)
self._reconcile()

def _reconcile(self):
"""Run all logic that is independent of what event we're processing."""
self._refresh_csr_if_needed()

def _on_upgrade_charm(self, _):
has_privkey = self.vault.get_value("private-key")
Expand All @@ -388,6 +389,11 @@ def _on_upgrade_charm(self, _):
# this will call `self.private_key` which will generate a new privkey.
self._generate_csr(renew=True)

def _refresh_csr_if_needed(self):
"""Refresh the current CSR with a new one if there are any SANs changes."""
if self._stored.csr_hash is not None and self._stored.csr_hash != self._csr_hash:
self._generate_csr(renew=True)

def _migrate_vault(self):
peer_backend = _RelationVaultBackend(self.charm, relation_name="peers")

Expand Down Expand Up @@ -440,13 +446,17 @@ def enabled(self) -> bool:
return True

@property
def _csr_hash(self) -> int:
def _csr_hash(self) -> str:
"""A hash of the config that constructs the CSR.

Only include here the config options that, should they change, should trigger a renewal of
the CSR.
"""
return hash(

def _stable_hash(data):
return hashlib.sha256(str(data).encode()).hexdigest()

return _stable_hash(
(
tuple(self.sans_dns),
tuple(self.sans_ip),
Expand Down
Loading
Loading