diff --git a/lib/charms/tls_certificates_interface/v3/tls_certificates.py b/lib/charms/tls_certificates_interface/v3/tls_certificates.py index f028c23..8840b4b 100644 --- a/lib/charms/tls_certificates_interface/v3/tls_certificates.py +++ b/lib/charms/tls_certificates_interface/v3/tls_certificates.py @@ -312,7 +312,7 @@ def _on_all_certificates_invalidated(self, event: AllCertificatesInvalidatedEven # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 3 +LIBPATCH = 4 PYDEPS = ["cryptography", "jsonschema"] @@ -469,7 +469,7 @@ def __init__( self.chain = chain def snapshot(self) -> dict: - """Returns snapshot.""" + """Return snapshot.""" return { "certificate": self.certificate, "certificate_signing_request": self.certificate_signing_request, @@ -478,7 +478,7 @@ def snapshot(self) -> dict: } def restore(self, snapshot: dict): - """Restores snapshot.""" + """Restore snapshot.""" self.certificate = snapshot["certificate"] self.certificate_signing_request = snapshot["certificate_signing_request"] self.ca = snapshot["ca"] @@ -502,11 +502,11 @@ def __init__(self, handle, certificate: str, expiry: str): self.expiry = expiry def snapshot(self) -> dict: - """Returns snapshot.""" + """Return snapshot.""" return {"certificate": self.certificate, "expiry": self.expiry} def restore(self, snapshot: dict): - """Restores snapshot.""" + """Restore snapshot.""" self.certificate = snapshot["certificate"] self.expiry = snapshot["expiry"] @@ -531,7 +531,7 @@ def __init__( self.chain = chain def snapshot(self) -> dict: - """Returns snapshot.""" + """Return snapshot.""" return { "reason": self.reason, "certificate_signing_request": self.certificate_signing_request, @@ -541,7 +541,7 @@ def snapshot(self) -> dict: } def restore(self, snapshot: dict): - """Restores snapshot.""" + """Restore snapshot.""" self.reason = snapshot["reason"] self.certificate_signing_request = snapshot["certificate_signing_request"] self.certificate = snapshot["certificate"] @@ -556,11 +556,11 @@ def __init__(self, handle: Handle): super().__init__(handle) def snapshot(self) -> dict: - """Returns snapshot.""" + """Return snapshot.""" return {} def restore(self, snapshot: dict): - """Restores snapshot.""" + """Restore snapshot.""" pass @@ -580,7 +580,7 @@ def __init__( self.is_ca = is_ca def snapshot(self) -> dict: - """Returns snapshot.""" + """Return snapshot.""" return { "certificate_signing_request": self.certificate_signing_request, "relation_id": self.relation_id, @@ -588,7 +588,7 @@ def snapshot(self) -> dict: } def restore(self, snapshot: dict): - """Restores snapshot.""" + """Restore snapshot.""" self.certificate_signing_request = snapshot["certificate_signing_request"] self.relation_id = snapshot["relation_id"] self.is_ca = snapshot["is_ca"] @@ -612,7 +612,7 @@ def __init__( self.chain = chain def snapshot(self) -> dict: - """Returns snapshot.""" + """Return snapshot.""" return { "certificate": self.certificate, "certificate_signing_request": self.certificate_signing_request, @@ -621,7 +621,7 @@ def snapshot(self) -> dict: } def restore(self, snapshot: dict): - """Restores snapshot.""" + """Restore snapshot.""" self.certificate = snapshot["certificate"] self.certificate_signing_request = snapshot["certificate_signing_request"] self.ca = snapshot["ca"] @@ -629,7 +629,7 @@ def restore(self, snapshot: dict): def _load_relation_data(relation_data_content: RelationDataContent) -> dict: - """Loads relation data from the relation data bag. + """Load relation data from the relation data bag. Json loads all data. @@ -639,7 +639,7 @@ def _load_relation_data(relation_data_content: RelationDataContent) -> dict: Returns: dict: Relation data in dict format. """ - certificate_data = dict() + certificate_data = {} try: for key in relation_data_content: try: @@ -692,7 +692,7 @@ def generate_ca( validity: int = 365, country: str = "US", ) -> bytes: - """Generates a CA Certificate. + """Generate a CA Certificate. Args: private_key (bytes): Private key @@ -761,7 +761,7 @@ def get_certificate_extensions( alt_names: Optional[List[str]], is_ca: bool, ) -> List[x509.Extension]: - """Generates a list of certificate extensions from a CSR and other known information. + """Generate a list of certificate extensions from a CSR and other known information. Args: authority_key_identifier (bytes): Authority key identifier @@ -863,7 +863,7 @@ def generate_certificate( alt_names: Optional[List[str]] = None, is_ca: bool = False, ) -> bytes: - """Generates a TLS certificate based on a CSR. + """Generate a TLS certificate based on a CSR. Args: csr (bytes): CSR @@ -918,7 +918,7 @@ def generate_private_key( key_size: int = 2048, public_exponent: int = 65537, ) -> bytes: - """Generates a private key. + """Generate a private key. Args: password (bytes): Password for decrypting the private key @@ -944,7 +944,7 @@ def generate_private_key( return key_bytes -def generate_csr( +def generate_csr( # noqa: C901 private_key: bytes, subject: str, add_unique_id_to_subject_name: bool = True, @@ -958,7 +958,7 @@ def generate_csr( sans_dns: Optional[List[str]] = None, additional_critical_extensions: Optional[List] = None, ) -> bytes: - """Generates a CSR using private key and subject. + """Generate a CSR using private key and subject. Args: private_key (bytes): Private key @@ -1051,11 +1051,12 @@ def csr_matches_certificate(csr: str, cert: str) -> bool: def _relation_data_is_valid( relation: Relation, app_or_unit: Union[Application, Unit], json_schema: dict ) -> bool: - """Checks whether relation data is valid based on json schema. + """Check whether relation data is valid based on json schema. Args: relation (Relation): Relation object app_or_unit (Union[Application, Unit]): Application or unit object + json_schema (dict): Json schema Returns: bool: Whether relation data is valid. @@ -1098,12 +1099,12 @@ def __init__(self, charm: CharmBase, relationship_name: str): self.relationship_name = relationship_name def _load_app_relation_data(self, relation: Relation) -> dict: - """Loads relation data from the application relation data bag. + """Load relation data from the application relation data bag. Json loads all data. Args: - relation_object: Relation data from the application databag + relation: Relation data from the application databag Returns: dict: Relation data in dict format. @@ -1121,7 +1122,7 @@ def _add_certificate( ca: str, chain: List[str], ) -> None: - """Adds certificate to relation data. + """Add certificate to relation data. Args: relation_id (int): Relation id @@ -1162,7 +1163,7 @@ def _remove_certificate( certificate: Optional[str] = None, certificate_signing_request: Optional[str] = None, ) -> None: - """Removes certificate from a given relation based on user provided certificate or csr. + """Remove certificate from a given relation based on user provided certificate or csr. Args: relation_id (int): Relation id @@ -1194,7 +1195,7 @@ def _remove_certificate( relation.data[self.model.app]["certificates"] = json.dumps(certificates) def revoke_all_certificates(self) -> None: - """Revokes all certificates of this provider. + """Revoke all certificates of this provider. This method is meant to be used when the Root CA has changed. """ @@ -1213,7 +1214,7 @@ def set_relation_certificate( chain: List[str], relation_id: int, ) -> None: - """Adds certificates to relation data. + """Add certificates to relation data. Args: certificate (str): Certificate @@ -1245,7 +1246,7 @@ def set_relation_certificate( ) def remove_certificate(self, certificate: str) -> None: - """Removes a given certificate from relation data. + """Remove a given certificate from relation data. Args: certificate (str): TLS Certificate @@ -1262,7 +1263,7 @@ def remove_certificate(self, certificate: str) -> None: def get_issued_certificates( self, relation_id: Optional[int] = None ) -> List[ProviderCertificate]: - """Returns a List of issued (non revoked) certificates. + """Return a List of issued (non revoked) certificates. Returns: List: List of ProviderCertificate objects @@ -1273,7 +1274,7 @@ def get_issued_certificates( def get_provider_certificates( self, relation_id: Optional[int] = None ) -> List[ProviderCertificate]: - """Returns a List of issued certificates. + """Return a List of issued certificates. Returns: List: List of ProviderCertificate objects @@ -1308,7 +1309,7 @@ def get_provider_certificates( return certificates def _on_relation_changed(self, event: RelationChangedEvent) -> None: - """Handler triggered on relation changed event. + """Handle relation changed event. Looks at the relation data and either emits: - certificate request event: If the unit relation data contains a CSR for which @@ -1346,7 +1347,7 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: self._revoke_certificates_for_which_no_csr_exists(relation_id=event.relation.id) def _revoke_certificates_for_which_no_csr_exists(self, relation_id: int) -> None: - """Revokes certificates for which no unit has a CSR. + """Revoke certificates for which no unit has a CSR. Goes through all generated certificates and compare against the list of CSRs for all units. @@ -1369,7 +1370,7 @@ def _revoke_certificates_for_which_no_csr_exists(self, relation_id: int) -> None def get_outstanding_certificate_requests( self, relation_id: Optional[int] = None ) -> List[RequirerCSR]: - """Returns CSR's for which no certificate has been issued. + """Return CSR's for which no certificate has been issued. Args: relation_id (int): Relation id @@ -1389,7 +1390,7 @@ def get_outstanding_certificate_requests( return outstanding_csrs def get_requirer_csrs(self, relation_id: Optional[int] = None) -> List[RequirerCSR]: - """Returns a list of requirers' CSRs. + """Return a list of requirers' CSRs. It returns CSRs from all relations if relation_id is not specified. CSRs are returned per relation id, application name and unit name. @@ -1434,7 +1435,7 @@ def get_requirer_csrs(self, relation_id: Optional[int] = None) -> List[RequirerC def certificate_issued_for_csr( self, app_name: str, csr: str, relation_id: Optional[int] ) -> bool: - """Checks whether a certificate has been issued for a given CSR. + """Check whether a certificate has been issued for a given CSR. Args: app_name (str): Application name that the CSR belongs to. @@ -1462,7 +1463,7 @@ def __init__( relationship_name: str, expiry_notification_time: int = 168, ): - """Generates/use private key and observes relation changed event. + """Generate/use private key and observes relation changed event. Args: charm: Charm object @@ -1483,7 +1484,7 @@ def __init__( self.framework.observe(charm.on.secret_expired, self._on_secret_expired) def get_requirer_csrs(self) -> List[RequirerCSR]: - """Returns list of requirer's CSRs from relation unit data. + """Return list of requirer's CSRs from relation unit data. Returns: list: List of RequirerCSR objects. @@ -1511,7 +1512,7 @@ def get_requirer_csrs(self) -> List[RequirerCSR]: return requirer_csrs def get_provider_certificates(self) -> List[ProviderCertificate]: - """Returns list of certificates from the provider's relation data.""" + """Return list of certificates from the provider's relation data.""" provider_certificates: List[ProviderCertificate] = [] relation = self.model.get_relation(self.relationship_name) if not relation: @@ -1547,7 +1548,7 @@ def get_provider_certificates(self) -> List[ProviderCertificate]: return provider_certificates def _add_requirer_csr_to_relation_data(self, csr: str, is_ca: bool) -> None: - """Adds CSR to relation data. + """Add CSR to relation data. Args: csr (str): Certificate Signing Request @@ -1579,7 +1580,7 @@ def _add_requirer_csr_to_relation_data(self, csr: str, is_ca: bool) -> None: ) def _remove_requirer_csr_from_relation_data(self, csr: str) -> None: - """Removes CSR from relation data. + """Remove CSR from relation data. Args: csr (str): Certificate signing request @@ -1630,7 +1631,7 @@ def request_certificate_creation( logger.info("Certificate request sent to provider") def request_certificate_revocation(self, certificate_signing_request: bytes) -> None: - """Removes CSR from relation data. + """Remove CSR from relation data. The provider of this relation is then expected to remove certificates associated to this CSR from the relation data as well and emit a request_certificate_revocation event for the @@ -1648,7 +1649,7 @@ def request_certificate_revocation(self, certificate_signing_request: bytes) -> def request_certificate_renewal( self, old_certificate_signing_request: bytes, new_certificate_signing_request: bytes ) -> None: - """Renews certificate. + """Renew certificate. Removes old CSR from relation data and adds new one. @@ -1706,7 +1707,7 @@ def get_certificate_signing_requests( fulfilled_only: bool = False, unfulfilled_only: bool = False, ) -> List[RequirerCSR]: - """Gets the list of CSR's that were sent to the provider. + """Get the list of CSR's that were sent to the provider. You can choose to get only the CSR's that have a certificate assigned or only the CSR's that don't. @@ -1728,7 +1729,7 @@ def get_certificate_signing_requests( return csrs def _on_relation_changed(self, event: RelationChangedEvent) -> None: - """Handler triggered on relation changed events. + """Handle relation changed event. Goes through all providers certificates that match a requested CSR. @@ -1810,7 +1811,7 @@ def _get_next_secret_expiry_time(self, certificate: str) -> Optional[datetime]: return _get_closest_future_time(expiry_notification_time, expiry_time) def _on_relation_broken(self, event: RelationBrokenEvent) -> None: - """Handler triggered on relation broken event. + """Handle Relation Broken Event. Emitting `all_certificates_invalidated` from `relation-broken` rather than `relation-departed` since certs are stored in app data. @@ -1824,7 +1825,7 @@ def _on_relation_broken(self, event: RelationBrokenEvent) -> None: self.on.all_certificates_invalidated.emit() def _on_secret_expired(self, event: SecretExpiredEvent) -> None: - """Triggered when a certificate is set to expire. + """Handle Secret Expired Event. Loads the certificate from the secret, and will emit 1 of 2 events. @@ -1876,7 +1877,7 @@ def _on_secret_expired(self, event: SecretExpiredEvent) -> None: event.secret.remove_all_revisions() def _find_certificate_in_relation_data(self, csr: str) -> Optional[ProviderCertificate]: - """Returns the certificate that match the given CSR.""" + """Return the certificate that match the given CSR.""" for provider_certificate in self.get_provider_certificates(): if provider_certificate.csr != csr: continue