From c3913aa8c2c2f059c8053676b67d50d167e4a4c1 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Mon, 4 Nov 2024 18:16:31 +0000 Subject: [PATCH 01/15] feat: add extra_listeners config, deprecate certificate_extra_sans --- config.yaml | 4 ++++ src/core/structured_config.py | 1 + src/events/tls.py | 8 ++++++++ src/managers/config.py | 1 + src/managers/tls.py | 2 +- 5 files changed, 15 insertions(+), 1 deletion(-) diff --git a/config.yaml b/config.yaml index 881de916..4a3d778c 100644 --- a/config.yaml +++ b/config.yaml @@ -92,6 +92,10 @@ options: description: Config options to add extra-sans to the ones used when requesting server certificates. The extra-sans are specified by comma-separated names to be added when requesting signed certificates. Use "{unit}" as a placeholder to be filled with the unit number, e.g. "worker-{unit}" will be translated as "worker-0" for unit 0 and "worker-1" for unit 1 when requesting the certificate. type: string default: "" + extra_listeners: + description: Config options to add extra-sans to the ones used when requesting server certificates, and to define custom `advertised.listeners` for clients external to the Juju model. These items are comma-separated. Use "{unit}" as a placeholder to be filled with the unit number, e.g. "worker-{unit}" will be translated as "worker-0" for unit 0 and "worker-1" for unit 1. + type: string + default: "" log_level: description: "Level of logging for the different components operated by the charm. Possible values: ERROR, WARNING, INFO, DEBUG" type: string diff --git a/src/core/structured_config.py b/src/core/structured_config.py index 3a18ec50..206a34bd 100644 --- a/src/core/structured_config.py +++ b/src/core/structured_config.py @@ -74,6 +74,7 @@ class CharmConfig(BaseConfigModel): zookeeper_ssl_cipher_suites: str | None profile: str certificate_extra_sans: str | None + extra_listeners: str | None log_level: str network_bandwidth: int = Field(default=50000, validate_default=False, gt=0) cruisecontrol_balance_threshold: float = Field(default=1.1, validate_default=False, ge=1) diff --git a/src/events/tls.py b/src/events/tls.py index d3d906e5..e0d9459f 100644 --- a/src/events/tls.py +++ b/src/events/tls.py @@ -9,6 +9,7 @@ import logging import os import re +import warnings from typing import TYPE_CHECKING from charms.tls_certificates_interface.v1.tls_certificates import ( @@ -296,6 +297,13 @@ def _request_certificate(self): sans = self.charm.broker.tls_manager.build_sans() + # only warn during certificate creation, not every event if in structured_config + if self.charm.config.certificate_extra_sans: + warnings.warn( + "'certificate_extra_sans' config option is deprecated, use 'extra_listeners' instead", + DeprecationWarning, + ) + csr = generate_csr( private_key=self.charm.state.unit_broker.private_key.encode("utf-8"), subject=self.charm.state.unit_broker.relation_data.get("private-address", ""), diff --git a/src/managers/config.py b/src/managers/config.py index 69152417..e0a33df0 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -76,6 +76,7 @@ "profile", "log_level", "certificate_extra_sans", + "extra_listeners", "roles", "expose_external", ] diff --git a/src/managers/tls.py b/src/managers/tls.py index b6b64efa..61d480cf 100644 --- a/src/managers/tls.py +++ b/src/managers/tls.py @@ -129,7 +129,7 @@ def remove_cert(self, alias: str) -> None: def _build_extra_sans(self) -> list[str]: """Parse the certificate_extra_sans config option.""" - extra_sans = self.config.certificate_extra_sans or "" + extra_sans = self.config.extra_listeners or self.config.certificate_extra_sans or "" parsed_sans = [] if extra_sans == "": From 3d9b6f6589a0bca5d4ae5d4b2e7d6e5f98664f23 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Mon, 4 Nov 2024 18:16:53 +0000 Subject: [PATCH 02/15] fix: use ops.main() instead of ops.main.main()? --- src/charm.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/charm.py b/src/charm.py index 339126f8..cee9b51d 100755 --- a/src/charm.py +++ b/src/charm.py @@ -7,6 +7,7 @@ import logging import time +import ops from charms.data_platform_libs.v0.data_models import TypedCharmBase from charms.grafana_agent.v0.cos_agent import COSAgentProvider from charms.operator_libs_linux.v0 import sysctl @@ -17,7 +18,6 @@ EventBase, StatusBase, ) -from ops.main import main from core.cluster import ClusterState from core.models import Substrates @@ -188,4 +188,4 @@ def _on_collect_status(self, event: CollectStatusEvent): if __name__ == "__main__": - main(KafkaCharm) + ops.main(KafkaCharm) # # pyright: ignore[reportCallIssue] From ce33ea9bfc9c25bff0b70828c0154df53cfcbd88 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Mon, 4 Nov 2024 18:17:21 +0000 Subject: [PATCH 03/15] chore: expose extra_listeners in advertised.listeners properties --- src/literals.py | 3 ++- src/managers/config.py | 53 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/src/literals.py b/src/literals.py index 5a2768a0..13739792 100644 --- a/src/literals.py +++ b/src/literals.py @@ -72,11 +72,12 @@ class Ports: client: int internal: int external: int + extra: int = 0 AuthProtocol = Literal["SASL_PLAINTEXT", "SASL_SSL", "SSL"] AuthMechanism = Literal["SCRAM-SHA-512", "OAUTHBEARER", "SSL"] -Scope = Literal["INTERNAL", "CLIENT", "EXTERNAL"] +Scope = Literal["INTERNAL", "CLIENT", "EXTERNAL", "EXTRA"] AuthMap = NamedTuple("AuthMap", protocol=AuthProtocol, mechanism=AuthMechanism) SECURITY_PROTOCOL_PORTS: dict[AuthMap, Ports] = { diff --git a/src/managers/config.py b/src/managers/config.py index e0a33df0..41f610fc 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -9,6 +9,7 @@ import logging import os import re +import socket import textwrap from abc import abstractmethod from typing import Iterable @@ -93,13 +94,19 @@ class Listener: """ def __init__( - self, auth_map: AuthMap, scope: Scope, host: str = "", node_port: int | None = None + self, + auth_map: AuthMap, + scope: Scope, + host: str = "", + extra_count: int = 0, + node_port: int | None = None, ): self.auth_map = auth_map self.protocol = auth_map.protocol self.mechanism = auth_map.mechanism self.host = host self.scope = scope + self.extra_count = extra_count self.node_port = node_port @property @@ -110,8 +117,8 @@ def scope(self) -> Scope: @scope.setter def scope(self, value): """Internal scope validator.""" - if value not in ["CLIENT", "INTERNAL", "EXTERNAL"]: - raise ValueError("Only CLIENT, INTERNAL and EXTERNAL scopes are accepted") + if value not in ["CLIENT", "INTERNAL", "EXTERNAL", "EXTRA"]: + raise ValueError("Only CLIENT, INTERNAL, EXTERNAL and EXTRA scopes are accepted") self._scope = value @@ -122,12 +129,20 @@ def port(self) -> int: Returns: Integer of port number """ + # results in ports 39092, 39192, 39292 etc, for each extra listener + if self.scope == "EXTRA": + return getattr(SECURITY_PROTOCOL_PORTS[self.auth_map], "client") + ( + 30000 + (self.extra_count * 100) + ) + return getattr(SECURITY_PROTOCOL_PORTS[self.auth_map], self.scope.lower()) @property def name(self) -> str: """Name of the listener.""" - return f"{self.scope}_{self.protocol}_{self.mechanism.replace('-', '_')}" + return f"{self.scope}_{self.protocol}_{self.mechanism.replace('-', '_')}" + ( + f"_{self.extra_count}" if self.extra_count else "" + ) @property def protocol_map(self) -> str: @@ -384,7 +399,7 @@ def scram_properties(self) -> list[str]: f'listener.name.{listener_name}.{listener_mechanism}.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{username}" password="{password}";', f"listener.name.{listener_name}.sasl.enabled.mechanisms={self.internal_listener.mechanism}", ] - for auth in self.client_listeners + self.external_listeners: + for auth in self.client_listeners + self.external_listeners + self.extra_listeners: if not auth.mechanism.startswith("SCRAM"): continue @@ -463,6 +478,27 @@ def controller_listener(self) -> None: """Return the controller listener.""" pass # TODO: No good abstraction in place for the controller use case + @property + def extra_listeners(self) -> list[Listener]: + """Foo.""" + if not self.config.extra_listeners: + return [] + + extra_hosts = [] + for sans in self.config.extra_listeners.split(","): + if "{unit}" not in sans: + continue + + extra_hosts.append(sans.replace("{unit}", str(self.state.unit_broker.unit_id))) + + extra_hosts.append(socket.gethostname()) + + return [ + Listener(host=extra_host, auth_map=auth_map, scope="EXTRA", extra_count=i) + for auth_map in self.state.enabled_auth + for i, extra_host in enumerate(extra_hosts) + ] + @property def client_listeners(self) -> list[Listener]: """Return a list of extra listeners.""" @@ -509,7 +545,12 @@ def external_listeners(self) -> list[Listener]: @property def all_listeners(self) -> list[Listener]: """Return a list with all expected listeners.""" - return [self.internal_listener] + self.client_listeners + self.external_listeners + return ( + [self.internal_listener] + + self.client_listeners + + self.external_listeners + + self.extra_listeners + ) @property def inter_broker_protocol_version(self) -> str: From 0dcd844e5b0d71c6f2243e45846073528dc4e639 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Mon, 4 Nov 2024 18:27:48 +0000 Subject: [PATCH 04/15] temp: comment out gethostname extra listener --- src/managers/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/managers/config.py b/src/managers/config.py index 41f610fc..c170c344 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -491,7 +491,7 @@ def extra_listeners(self) -> list[Listener]: extra_hosts.append(sans.replace("{unit}", str(self.state.unit_broker.unit_id))) - extra_hosts.append(socket.gethostname()) + # extra_hosts.append(socket.gethostname()) return [ Listener(host=extra_host, auth_map=auth_map, scope="EXTRA", extra_count=i) From 791ed9208b395cb6c5aef3e4a9df0d913a6a0ce7 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Thu, 7 Nov 2024 11:07:22 +0000 Subject: [PATCH 05/15] temp: stash --- actions.yaml | 3 + config.yaml | 2 +- src/core/structured_config.py | 35 ++++++++- src/events/actions.py | 133 +++++++++++++++++++++++++++++++++ src/events/broker.py | 4 +- src/events/password_actions.py | 10 ++- src/managers/config.py | 55 ++++++++------ src/managers/tls.py | 13 ++-- 8 files changed, 217 insertions(+), 38 deletions(-) create mode 100644 src/events/actions.py diff --git a/actions.yaml b/actions.yaml index 9cf312c1..6cc39eaa 100644 --- a/actions.yaml +++ b/actions.yaml @@ -31,6 +31,9 @@ get-admin-credentials: The returned client_properties can be used for Kafka bin commands using `--bootstrap-server` and `--command-config` for admin level administration This action must be called on the leader unit. +get-listeners: + description: Get all active listeners and their port allocations + pre-upgrade-check: description: Run necessary pre-upgrade checks before executing a charm upgrade. diff --git a/config.yaml b/config.yaml index 4a3d778c..2fa91c65 100644 --- a/config.yaml +++ b/config.yaml @@ -93,7 +93,7 @@ options: type: string default: "" extra_listeners: - description: Config options to add extra-sans to the ones used when requesting server certificates, and to define custom `advertised.listeners` for clients external to the Juju model. These items are comma-separated. Use "{unit}" as a placeholder to be filled with the unit number, e.g. "worker-{unit}" will be translated as "worker-0" for unit 0 and "worker-1" for unit 1. + description: "Config options to add extra SANs to the ones used when requesting server certificates, and to define custom `advertised.listeners` and ports for clients external to the Juju model. These items are comma-separated. Use '{unit}' as a placeholder to be filled with the unit number if necessary. For port allocations, providing the port for a given listener will offset the generated port number by that amount, with an accepted value range of 20001-50000. For example, a provided value of 'worker-{unit}.domain.com:30000' will generate listeners for unit 0 with name 'worker-0.domain.com', and be allocated ports 39092, 39093 etc for each authentication scheme." type: string default: "" log_level: diff --git a/src/core/structured_config.py b/src/core/structured_config.py index 206a34bd..211c0a22 100644 --- a/src/core/structured_config.py +++ b/src/core/structured_config.py @@ -74,7 +74,7 @@ class CharmConfig(BaseConfigModel): zookeeper_ssl_cipher_suites: str | None profile: str certificate_extra_sans: str | None - extra_listeners: str | None + extra_listeners: list[str] log_level: str network_bandwidth: int = Field(default=50000, validate_default=False, gt=0) cruisecontrol_balance_threshold: float = Field(default=1.1, validate_default=False, ge=1) @@ -266,3 +266,36 @@ def roles_values(cls, value: str) -> str: raise ValueError("Unknown role(s):", unknown_roles) return ",".join(sorted(roles)) # this has to be a string as it goes in to properties + + @validator("certificate_extra_sans") + @classmethod + def certificate_extra_sans_values(cls, value: str) -> list[str]: + """Formats certificate_extra_sans values to a list.""" + return value.split(",") if value else [] + + @validator("extra_listeners", pre=True) + @classmethod + def extra_listeners_port_values(cls, value: str) -> list[str]: + """Check extra_listeners port values for each listener, and format values to a list.""" + if not value: + return [] + + listeners = value.split(",") + + ports = [] + for listener in listeners: + if ":" not in listener or not listener.split(":")[1].isdigit(): + raise ValueError("Value for listener does not contain a valid port.") + + port = int(listener.split(":")[1]) + if not 20000 < port < 50000: + raise ValueError( + "Value for port out of accepted values. Accepted values for port are greater than 20000 and less than 50000" + ) + + ports.append(port) + + if len(ports) != len(set(ports)): + raise ValueError("Value for port is not unique for each listener.") + + return listeners diff --git a/src/events/actions.py b/src/events/actions.py new file mode 100644 index 00000000..3b82bc96 --- /dev/null +++ b/src/events/actions.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Event handlers for Juju Actions.""" + +import logging +from typing import TYPE_CHECKING + +from ops.charm import ActionEvent +from ops.framework import Object + +from literals import ADMIN_USER, INTERNAL_USERS + +if TYPE_CHECKING: + from charm import KafkaCharm + from events.broker import BrokerOperator + +logger = logging.getLogger(__name__) + + +class ActionEvents(Object): + """Event handlers for password-related Juju Actions.""" + + def __init__(self, dependent: "BrokerOperator") -> None: + super().__init__(dependent, "action_events") + self.dependent = dependent + self.charm: "KafkaCharm" = dependent.charm + + self.framework.observe( + getattr(self.charm.on, "set_password_action"), self._set_password_action + ) + self.framework.observe( + getattr(self.charm.on, "get_admin_credentials_action"), + self._get_admin_credentials_action, + ) + self.framework.observe( + getattr(self.charm.on, "get_listeners_action"), self._get_listeners_action + ) + + def _get_listeners_action(self, event: ActionEvent) -> None: + """Handler for get-listeners action.""" + listeners = self.dependent.config_manager.all_listeners + + result = {} + for listener in listeners: + key = listener.name.replace("_", "-").lower() + result.update( + { + key: { + "name": listener.name, + "scope": listener.scope, + "port": listener.port, + "protocol": listener.protocol, + "auth-mechanism": listener.mechanism, + "advertised-listener": listener.advertised_listener, + } + } + ) + + event.set_results(result) + + def _set_password_action(self, event: ActionEvent) -> None: + """Handler for set-password action. + + Set the password for a specific user, if no passwords are passed, generate them. + """ + if not self.model.unit.is_leader(): + msg = "Password rotation must be called on leader unit" + logger.error(msg) + event.fail(msg) + return + + if not self.dependent.upgrade.idle: + msg = f"Cannot set password while upgrading (upgrade_stack: {self.dependent.upgrade.upgrade_stack})" + logger.error(msg) + event.fail(msg) + return + + if not self.dependent.healthy: + msg = "Unit is not healthy" + logger.error(msg) + event.fail(msg) + return + + username = event.params["username"] + if username not in INTERNAL_USERS: + msg = f"Can only update internal charm users: {INTERNAL_USERS}, not {username}." + logger.error(msg) + event.fail(msg) + return + + new_password = event.params.get("password", self.dependent.workload.generate_password()) + + if new_password in self.charm.state.cluster.internal_user_credentials.values(): + msg = "Password already exists, please choose a different password." + logger.error(msg) + event.fail(msg) + return + + try: + self.dependent.auth_manager.add_user( + username=username, password=new_password, zk_auth=True + ) + except Exception as e: + logger.error(str(e)) + event.fail(f"unable to set password for {username}") + return + + # Store the password on application databag + self.charm.state.cluster.relation_data.update({f"{username}-password": new_password}) + event.set_results({f"{username}-password": new_password}) + + def _get_admin_credentials_action(self, event: ActionEvent) -> None: + client_properties = self.charm.workload.read(self.charm.workload.paths.client_properties) + + if not client_properties: + msg = "client.properties file not found on target unit." + logger.error(msg) + event.fail(msg) + return + + admin_properties = set(client_properties) - set( + self.dependent.config_manager.tls_properties + ) + + event.set_results( + { + "username": ADMIN_USER, + "password": self.charm.state.cluster.internal_user_credentials[ADMIN_USER], + "client-properties": "\n".join(admin_properties), + } + ) diff --git a/src/events/broker.py b/src/events/broker.py index 1e2c9b3f..fe68b90c 100644 --- a/src/events/broker.py +++ b/src/events/broker.py @@ -24,8 +24,8 @@ UpdateStatusEvent, ) +from events.actions import ActionEvents from events.oauth import OAuthHandler -from events.password_actions import PasswordActionEvents from events.provider import KafkaProvider from events.upgrade import KafkaDependencyModel, KafkaUpgrade from events.zookeeper import ZooKeeperHandler @@ -88,7 +88,7 @@ def __init__(self, charm) -> None: **DEPENDENCIES # pyright: ignore[reportArgumentType] ), ) - self.password_action_events = PasswordActionEvents(self) + self.password_action_events = ActionEvents(self) if not self.charm.state.runs_controller: self.zookeeper = ZooKeeperHandler(self) diff --git a/src/events/password_actions.py b/src/events/password_actions.py index 2049d408..1a70e227 100644 --- a/src/events/password_actions.py +++ b/src/events/password_actions.py @@ -2,7 +2,8 @@ # Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. -"""Event handlers for password-related Juju Actions.""" +"""Event handlers for Juju Actions.""" + import logging from typing import TYPE_CHECKING @@ -18,11 +19,11 @@ logger = logging.getLogger(__name__) -class PasswordActionEvents(Object): +class ActionEvents(Object): """Event handlers for password-related Juju Actions.""" def __init__(self, dependent: "BrokerOperator") -> None: - super().__init__(dependent, "password_events") + super().__init__(dependent, "action_events") self.dependent = dependent self.charm: "KafkaCharm" = dependent.charm @@ -33,6 +34,9 @@ def __init__(self, dependent: "BrokerOperator") -> None: getattr(self.charm.on, "get_admin_credentials_action"), self._get_admin_credentials_action, ) + self.framework.observe( + getattr(self.charm.on, "set_password_action"), self._set_password_action + ) def _set_password_action(self, event: ActionEvent) -> None: """Handler for set-password action. diff --git a/src/managers/config.py b/src/managers/config.py index c170c344..6e8306e2 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -9,7 +9,6 @@ import logging import os import re -import socket import textwrap from abc import abstractmethod from typing import Iterable @@ -88,8 +87,9 @@ class Listener: Args: auth_map: AuthMap representing the auth.protocol and auth.mechanism for the listener - scope: scope of the listener, CLIENT, INTERNAL or EXTERNAL + scope: scope of the listener, CLIENT, INTERNAL, EXTERNAL or EXTRA host: string with the host that will be announced + baseport (optional): integer port to offset CLIENT port numbers for EXTRA listeners node_port (optional): the node-port for the listener if scope=EXTERNAL """ @@ -98,7 +98,8 @@ def __init__( auth_map: AuthMap, scope: Scope, host: str = "", - extra_count: int = 0, + baseport: int = 30000, + extra_count: int = -1, node_port: int | None = None, ): self.auth_map = auth_map @@ -106,6 +107,7 @@ def __init__( self.mechanism = auth_map.mechanism self.host = host self.scope = scope + self.baseport = baseport self.extra_count = extra_count self.node_port = node_port @@ -129,10 +131,12 @@ def port(self) -> int: Returns: Integer of port number """ - # results in ports 39092, 39192, 39292 etc, for each extra listener + # generates ports 39092, 39192, 39292 etc for listener auth if baseport=30000 if self.scope == "EXTRA": - return getattr(SECURITY_PROTOCOL_PORTS[self.auth_map], "client") + ( - 30000 + (self.extra_count * 100) + return ( + getattr(SECURITY_PROTOCOL_PORTS[self.auth_map], "client") + + self.baseport + + (self.extra_count * 100) ) return getattr(SECURITY_PROTOCOL_PORTS[self.auth_map], self.scope.lower()) @@ -141,7 +145,7 @@ def port(self) -> int: def name(self) -> str: """Name of the listener.""" return f"{self.scope}_{self.protocol}_{self.mechanism.replace('-', '_')}" + ( - f"_{self.extra_count}" if self.extra_count else "" + f"_{self.extra_count}" if self.extra_count >= 0 else "" ) @property @@ -480,28 +484,33 @@ def controller_listener(self) -> None: @property def extra_listeners(self) -> list[Listener]: - """Foo.""" - if not self.config.extra_listeners: - return [] - - extra_hosts = [] - for sans in self.config.extra_listeners.split(","): - if "{unit}" not in sans: - continue + """Return a list of extra listeners.""" + extra_host_baseports = [ + tuple(listener.split(":")) for listener in self.config.extra_listeners + ] - extra_hosts.append(sans.replace("{unit}", str(self.state.unit_broker.unit_id))) + extra_listeners = [] + extra_count = 0 + for host, baseport in extra_host_baseports: + for auth_map in self.state.enabled_auth: + host = host.replace("{unit}", str(self.state.unit_broker.unit_id)) + extra_listeners.append( + Listener( + host=host, + auth_map=auth_map, + scope="EXTRA", + baseport=int(baseport), + extra_count=extra_count, + ) + ) - # extra_hosts.append(socket.gethostname()) + extra_count += 1 - return [ - Listener(host=extra_host, auth_map=auth_map, scope="EXTRA", extra_count=i) - for auth_map in self.state.enabled_auth - for i, extra_host in enumerate(extra_hosts) - ] + return extra_listeners @property def client_listeners(self) -> list[Listener]: - """Return a list of extra listeners.""" + """Return a list of client listeners.""" return [ Listener( host=self.state.unit_broker.internal_address, auth_map=auth_map, scope="CLIENT" diff --git a/src/managers/tls.py b/src/managers/tls.py index 61d480cf..d6e3fbc5 100644 --- a/src/managers/tls.py +++ b/src/managers/tls.py @@ -129,14 +129,11 @@ def remove_cert(self, alias: str) -> None: def _build_extra_sans(self) -> list[str]: """Parse the certificate_extra_sans config option.""" - extra_sans = self.config.extra_listeners or self.config.certificate_extra_sans or "" - parsed_sans = [] - - if extra_sans == "": - return parsed_sans - - for sans in extra_sans.split(","): - parsed_sans.append(sans.replace("{unit}", str(self.state.unit_broker.unit_id))) + extra_sans = self.config.extra_listeners or self.config.certificate_extra_sans or [] + clean_sans = [san.split(":")[0] for san in extra_sans] + parsed_sans = [ + san.replace("{unit}", str(self.state.unit_broker.unit_id)) for san in clean_sans + ] return parsed_sans From 48eb1335fb339dbb7241978dc881128e4a01fb82 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Thu, 7 Nov 2024 18:49:22 +0000 Subject: [PATCH 06/15] style: fiddle with comment --- src/charm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/charm.py b/src/charm.py index cee9b51d..61d24ccc 100755 --- a/src/charm.py +++ b/src/charm.py @@ -188,4 +188,4 @@ def _on_collect_status(self, event: CollectStatusEvent): if __name__ == "__main__": - ops.main(KafkaCharm) # # pyright: ignore[reportCallIssue] + ops.main(KafkaCharm) # pyright: ignore[reportCallIssue] From 69f4015f159d4152d700ebbedadd7cb671219af4 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Thu, 7 Nov 2024 18:49:33 +0000 Subject: [PATCH 07/15] test: add unit-test for extra_listeners --- tests/unit/test_config.py | 70 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 4976983b..43d92909 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -160,6 +160,76 @@ def test_listeners_in_server_properties(harness: Harness[KafkaCharm]): ) +def test_extra_listeners_in_server_properties(harness: Harness[KafkaCharm]): + """Checks that the extra-listeners are properly set from config.""" + harness._update_config( + {"extra_listeners": "worker-{unit}.foo.com:30000,worker-{unit}.bar.com:40000"} + ) + harness.charm.broker.config_manager.config = harness.charm.config + + peer_relation_id = harness.add_relation(PEER, CHARM_KEY) + harness.add_relation_unit(peer_relation_id, f"{CHARM_KEY}/1") + harness.update_relation_data( + peer_relation_id, f"{CHARM_KEY}/0", {"private-address": "treebeard"} + ) + + # adding client + client_relation_id = harness.add_relation("kafka-client", "app") + harness.update_relation_data(client_relation_id, "app", {"extra-user-roles": "admin,producer"}) + assert ( + len(harness.charm.broker.config_manager.all_listeners) == 4 + ) # 2 extra, 1 internal, 1 client + + # adding SSL + harness.update_relation_data(peer_relation_id, CHARM_KEY, {"tls": "enabled"}) + assert ( + len(harness.charm.broker.config_manager.all_listeners) == 4 + ) # 2 extra, 1 internal, 1 client + + # adding SSL + harness.update_relation_data(peer_relation_id, CHARM_KEY, {"mtls": "enabled"}) + assert ( + len(harness.charm.broker.config_manager.all_listeners) == 7 + ) # 2 extra sasl_ssl, 2 extra ssl, 1 internal, 2 client + + expected_listener_names = { + "INTERNAL_SASL_PLAINTEXT_SCRAM_SHA_512", + "CLIENT_SASL_PLAINTEXT_SCRAM_SHA_512", + "CLIENT_SSL_SSL", + "EXTRA_SASL_PLAINTEXT_SCRAM_SHA_512_0", + "EXTRA_SASL_PLAINTEXT_SCRAM_SHA_512_1", + "EXTRA_SSL_SSL_0", + "EXTRA_SSL_SSL_1", + } + + advertised_listeners_prop = "" + for prop in harness.charm.broker.config_manager.server_properties: + if "advertised.listener" in prop: + advertised_listeners_prop = prop + + # validating every expected listener is present + for name in expected_listener_names: + assert name in advertised_listeners_prop + + # validating their allocated ports are expected + ports = [] + for listener in advertised_listeners_prop.split("=")[1].split(","): + name, _, port = listener.split(":") + + if name.endswith("_0") or name.endswith("_1"): + # verifying allocation uses the baseport + digit = 10**4 + assert int(port) // digit * digit in (30000, 40000) + + # verifying allocation is in steps of 100 + digit = 10**2 + assert int(port) // digit * digit in (39000, 39100, 49000, 49100) + + # verifying all ports are unique + assert port not in ports + ports.append(port) + + def test_oauth_client_listeners_in_server_properties(harness: Harness[KafkaCharm]): """Checks that oauth client listeners are properly set when a relating through oauth.""" harness.add_relation(ZK, CHARM_KEY) From e34267b719975556d9e8253e734da977af203410 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Thu, 7 Nov 2024 18:54:48 +0000 Subject: [PATCH 08/15] test: update test_extra_sans_config to check extra_listeners+certificate_extra_sans --- tests/unit/test_tls.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/unit/test_tls.py b/tests/unit/test_tls.py index 6ea6ade4..62db76d6 100644 --- a/tests/unit/test_tls.py +++ b/tests/unit/test_tls.py @@ -120,6 +120,19 @@ def test_extra_sans_config(harness: Harness[KafkaCharm]): assert "worker0.com" in "".join(manager._build_extra_sans()) assert "0.example" in "".join(manager._build_extra_sans()) + # verifying that sans can be built with both certificate_extra_sans and extra_listeners + harness._update_config( + { + "certificate_extra_sans": "", + "extra_listeners": "worker{unit}.com:30000,{unit}.example:40000,nonunit.domain.com:45000", + } + ) + manager.config = harness.charm.config + assert manager._build_extra_sans + assert "worker0.com" in "".join(manager._build_extra_sans()) + assert "0.example" in "".join(manager._build_extra_sans()) + assert "nonunit.domain.com" in "".join(manager._build_extra_sans()) + def test_sans(harness: Harness[KafkaCharm], patched_node_ip): # Create peer relation From e35b3b3deff272b10c239f98b8eb21250b6fefb7 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Thu, 7 Nov 2024 18:57:20 +0000 Subject: [PATCH 09/15] refactor: move password_events to events.py --- src/events/broker.py | 3 +- src/events/password_actions.py | 111 --------------------------------- tests/unit/test_upgrade.py | 2 +- 3 files changed, 3 insertions(+), 113 deletions(-) delete mode 100644 src/events/password_actions.py diff --git a/src/events/broker.py b/src/events/broker.py index fe68b90c..b0274c9a 100644 --- a/src/events/broker.py +++ b/src/events/broker.py @@ -88,7 +88,8 @@ def __init__(self, charm) -> None: **DEPENDENCIES # pyright: ignore[reportArgumentType] ), ) - self.password_action_events = ActionEvents(self) + self.action_events = ActionEvents(self) + if not self.charm.state.runs_controller: self.zookeeper = ZooKeeperHandler(self) diff --git a/src/events/password_actions.py b/src/events/password_actions.py deleted file mode 100644 index 1a70e227..00000000 --- a/src/events/password_actions.py +++ /dev/null @@ -1,111 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2024 Canonical Ltd. -# See LICENSE file for licensing details. - -"""Event handlers for Juju Actions.""" - -import logging -from typing import TYPE_CHECKING - -from ops.charm import ActionEvent -from ops.framework import Object - -from literals import ADMIN_USER, INTERNAL_USERS - -if TYPE_CHECKING: - from charm import KafkaCharm - from events.broker import BrokerOperator - -logger = logging.getLogger(__name__) - - -class ActionEvents(Object): - """Event handlers for password-related Juju Actions.""" - - def __init__(self, dependent: "BrokerOperator") -> None: - super().__init__(dependent, "action_events") - self.dependent = dependent - self.charm: "KafkaCharm" = dependent.charm - - self.framework.observe( - getattr(self.charm.on, "set_password_action"), self._set_password_action - ) - self.framework.observe( - getattr(self.charm.on, "get_admin_credentials_action"), - self._get_admin_credentials_action, - ) - self.framework.observe( - getattr(self.charm.on, "set_password_action"), self._set_password_action - ) - - def _set_password_action(self, event: ActionEvent) -> None: - """Handler for set-password action. - - Set the password for a specific user, if no passwords are passed, generate them. - """ - if not self.model.unit.is_leader(): - msg = "Password rotation must be called on leader unit" - logger.error(msg) - event.fail(msg) - return - - if not self.dependent.upgrade.idle: - msg = f"Cannot set password while upgrading (upgrade_stack: {self.dependent.upgrade.upgrade_stack})" - logger.error(msg) - event.fail(msg) - return - - if not self.dependent.healthy: - msg = "Unit is not healthy" - logger.error(msg) - event.fail(msg) - return - - username = event.params["username"] - if username not in INTERNAL_USERS: - msg = f"Can only update internal charm users: {INTERNAL_USERS}, not {username}." - logger.error(msg) - event.fail(msg) - return - - new_password = event.params.get("password", self.dependent.workload.generate_password()) - - if new_password in self.charm.state.cluster.internal_user_credentials.values(): - msg = "Password already exists, please choose a different password." - logger.error(msg) - event.fail(msg) - return - - try: - self.dependent.auth_manager.add_user( - username=username, password=new_password, zk_auth=True - ) - except Exception as e: - logger.error(str(e)) - event.fail(f"unable to set password for {username}") - return - - # Store the password on application databag - self.charm.state.cluster.relation_data.update({f"{username}-password": new_password}) - event.set_results({f"{username}-password": new_password}) - - def _get_admin_credentials_action(self, event: ActionEvent) -> None: - client_properties = self.charm.workload.read(self.charm.workload.paths.client_properties) - - if not client_properties: - msg = "client.properties file not found on target unit." - logger.error(msg) - event.fail(msg) - return - - admin_properties = set(client_properties) - set( - self.dependent.config_manager.tls_properties - ) - - event.set_results( - { - "username": ADMIN_USER, - "password": self.charm.state.cluster.internal_user_credentials[ADMIN_USER], - "client-properties": "\n".join(admin_properties), - } - ) diff --git a/tests/unit/test_upgrade.py b/tests/unit/test_upgrade.py index 757d511f..10042f9e 100644 --- a/tests/unit/test_upgrade.py +++ b/tests/unit/test_upgrade.py @@ -111,7 +111,7 @@ def test_run_password_rotation_while_upgrading(harness: Harness[KafkaCharm], upg ), patch("managers.auth.AuthManager.add_user"), ): - harness.charm.broker.password_action_events._set_password_action(mock_event) + harness.charm.broker.action_events._set_password_action(mock_event) if not upgrade_stack: mock_event.set_results.assert_called() From df0a92a470799bf76bbc15969c458b2746da4946 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Fri, 8 Nov 2024 12:35:43 +0000 Subject: [PATCH 10/15] chore: validate extra_listeners ports not too close to eachother --- src/core/structured_config.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/core/structured_config.py b/src/core/structured_config.py index 211c0a22..933c9487 100644 --- a/src/core/structured_config.py +++ b/src/core/structured_config.py @@ -295,6 +295,15 @@ def extra_listeners_port_values(cls, value: str) -> list[str]: ports.append(port) + current_port = 0 + for port in ports: + if not current_port - 100 < int(port) > current_port + 100: + raise ValueError( + "Value for port is too close to other value for port. Accepted values must be at least 100 apart from each other" + ) + + current_port = int(port) + if len(ports) != len(set(ports)): raise ValueError("Value for port is not unique for each listener.") From f25c04bd451dc9ed9e3856b084ff56d962139507 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Fri, 8 Nov 2024 12:36:11 +0000 Subject: [PATCH 11/15] chore: remove offset for extra_listener port generation --- src/managers/config.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/managers/config.py b/src/managers/config.py index 6e8306e2..b60fe197 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -133,11 +133,7 @@ def port(self) -> int: """ # generates ports 39092, 39192, 39292 etc for listener auth if baseport=30000 if self.scope == "EXTRA": - return ( - getattr(SECURITY_PROTOCOL_PORTS[self.auth_map], "client") - + self.baseport - + (self.extra_count * 100) - ) + return getattr(SECURITY_PROTOCOL_PORTS[self.auth_map], "client") + self.baseport return getattr(SECURITY_PROTOCOL_PORTS[self.auth_map], self.scope.lower()) From ed74aa73b867fd0c7413d8a0f43c74673f791999 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Fri, 8 Nov 2024 12:36:25 +0000 Subject: [PATCH 12/15] test: test unhappy path in extra_listeners --- tests/unit/test_config.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 43d92909..66cf7a6b 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -10,6 +10,7 @@ import pytest import yaml from ops.testing import Harness +from pydantic import ValidationError from charm import KafkaCharm from literals import ( @@ -162,6 +163,20 @@ def test_listeners_in_server_properties(harness: Harness[KafkaCharm]): def test_extra_listeners_in_server_properties(harness: Harness[KafkaCharm]): """Checks that the extra-listeners are properly set from config.""" + # verifying structured config validators + for value in [ + "missing.port", + "low.port:15000", + "high.port:60000", + "non.unique:30000,other.non.unique:30000", + "close.port:30000,other.close.port:30001", + ]: + with pytest.raises(ValidationError): + harness._update_config( + {"extra_listeners": value} + ) + harness.charm.broker.config_manager.config = harness.charm.config + harness._update_config( {"extra_listeners": "worker-{unit}.foo.com:30000,worker-{unit}.bar.com:40000"} ) From 58076bfee0070c959d6e9857d4c2e6aadf439220 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Fri, 8 Nov 2024 12:39:52 +0000 Subject: [PATCH 13/15] docs: update Actions event handler docstring --- src/events/actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/events/actions.py b/src/events/actions.py index 3b82bc96..9e360a0a 100644 --- a/src/events/actions.py +++ b/src/events/actions.py @@ -20,7 +20,7 @@ class ActionEvents(Object): - """Event handlers for password-related Juju Actions.""" + """Event handlers for Juju Actions.""" def __init__(self, dependent: "BrokerOperator") -> None: super().__init__(dependent, "action_events") From 69884e8c26b5a462b211a203be09f89877ae316a Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Fri, 8 Nov 2024 12:41:04 +0000 Subject: [PATCH 14/15] style: fmt --- tests/unit/test_config.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 66cf7a6b..f9ef283f 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -172,9 +172,7 @@ def test_extra_listeners_in_server_properties(harness: Harness[KafkaCharm]): "close.port:30000,other.close.port:30001", ]: with pytest.raises(ValidationError): - harness._update_config( - {"extra_listeners": value} - ) + harness._update_config({"extra_listeners": value}) harness.charm.broker.config_manager.config = harness.charm.config harness._update_config( From 80fd059e8f86a5fcf70d1f865208adabd6bcf97f Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Fri, 8 Nov 2024 19:19:17 +0000 Subject: [PATCH 15/15] chore: only add 1 unit during storage re-use test --- tests/integration/test_charm.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index da50edbc..924abb9b 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -331,10 +331,10 @@ async def test_observability_integration(ops_test: OpsTest): @pytest.mark.abort_on_fail async def test_deploy_with_existing_storage(ops_test: OpsTest): - unit_to_remove, *_ = await ops_test.model.applications[APP_NAME].add_units(count=3) - await ops_test.model.block_until(lambda: len(ops_test.model.applications[APP_NAME].units) == 4) + unit_to_remove, *_ = await ops_test.model.applications[APP_NAME].add_units(count=1) + await ops_test.model.block_until(lambda: len(ops_test.model.applications[APP_NAME].units) == 2) await ops_test.model.wait_for_idle( - apps=[APP_NAME], status="active", timeout=1000, idle_period=30 + apps=[APP_NAME], status="active", timeout=2000, idle_period=30 ) _, stdout, _ = await ops_test.juju("storage", "--format", "json") @@ -347,7 +347,7 @@ async def test_deploy_with_existing_storage(ops_test: OpsTest): break await unit_to_remove.remove(destroy_storage=False) - await ops_test.model.block_until(lambda: len(ops_test.model.applications[APP_NAME].units) == 3) + await ops_test.model.block_until(lambda: len(ops_test.model.applications[APP_NAME].units) == 1) add_unit_cmd = f"add-unit {APP_NAME} --model={ops_test.model.info.name} --attach-storage={data_storage_id}".split() await ops_test.juju(*add_unit_cmd)