diff --git a/lib/charms/data_platform_libs/v0/data_interfaces.py b/lib/charms/data_platform_libs/v0/data_interfaces.py index 86d7521a8..db653089d 100644 --- a/lib/charms/data_platform_libs/v0/data_interfaces.py +++ b/lib/charms/data_platform_libs/v0/data_interfaces.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Library to manage the relation for the data-platform products. +r"""Library to manage the relation for the data-platform products. This library contains the Requires and Provides classes for handling the relation between an application and multiple managed application supported by the data-team: @@ -296,7 +296,7 @@ def _on_topic_requested(self, event: TopicRequestedEvent): from abc import ABC, abstractmethod from collections import namedtuple from datetime import datetime -from typing import List, Optional +from typing import List, Optional, Union from ops.charm import ( CharmBase, @@ -306,7 +306,7 @@ def _on_topic_requested(self, event: TopicRequestedEvent): RelationJoinedEvent, ) from ops.framework import EventSource, Object -from ops.model import Relation +from ops.model import Application, Relation, Unit # The unique Charmhub library identifier, never change it LIBID = "6c3e6b6680d64e9c89e611d1a15f65be" @@ -316,7 +316,7 @@ def _on_topic_requested(self, event: TopicRequestedEvent): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 12 +LIBPATCH = 14 PYDEPS = ["ops>=2.0.0"] @@ -331,7 +331,7 @@ def _on_topic_requested(self, event: TopicRequestedEvent): deleted - key that were deleted""" -def diff(event: RelationChangedEvent, bucket: str) -> Diff: +def diff(event: RelationChangedEvent, bucket: Union[Unit, Application]) -> Diff: """Retrieves the diff of the data in the relation changed databag. Args: @@ -345,9 +345,11 @@ def diff(event: RelationChangedEvent, bucket: str) -> Diff: # Retrieve the old data from the data key in the application relation databag. old_data = json.loads(event.relation.data[bucket].get("data", "{}")) # Retrieve the new data from the event relation databag. - new_data = { - key: value for key, value in event.relation.data[event.app].items() if key != "data" - } + new_data = ( + {key: value for key, value in event.relation.data[event.app].items() if key != "data"} + if event.app + else {} + ) # These are the keys that were added to the databag and triggered this event. added = new_data.keys() - old_data.keys() @@ -409,9 +411,11 @@ def fetch_relation_data(self) -> dict: """ data = {} for relation in self.relations: - data[relation.id] = { - key: value for key, value in relation.data[relation.app].items() if key != "data" - } + data[relation.id] = ( + {key: value for key, value in relation.data[relation.app].items() if key != "data"} + if relation.app + else {} + ) return data def _update_relation_data(self, relation_id: int, data: dict) -> None: @@ -426,8 +430,8 @@ def _update_relation_data(self, relation_id: int, data: dict) -> None: that should be updated in the relation. """ if self.local_unit.is_leader(): - relation = self.charm.model.get_relation(self.relation_name, relation_id) - relation.data[self.local_app].update(data) + if relation := self.charm.model.get_relation(self.relation_name, relation_id): + relation.data[self.local_app].update(data) @property def relations(self) -> List[Relation]: @@ -479,7 +483,7 @@ def __init__( self, charm, relation_name: str, - extra_user_roles: str = None, + extra_user_roles: Optional[str] = None, ): """Manager of base client relations.""" super().__init__(charm, relation_name) @@ -517,9 +521,11 @@ def fetch_relation_data(self) -> dict: """ data = {} for relation in self.relations: - data[relation.id] = { - key: value for key, value in relation.data[relation.app].items() if key != "data" - } + data[relation.id] = ( + {key: value for key, value in relation.data[relation.app].items() if key != "data"} + if relation.app + else {} + ) return data def _update_relation_data(self, relation_id: int, data: dict) -> None: @@ -567,7 +573,10 @@ def _is_relation_active(relation: Relation): return False @staticmethod - def _is_resource_created_for_relation(relation: Relation): + def _is_resource_created_for_relation(relation: Relation) -> bool: + if not relation.app: + return False + return ( "username" in relation.data[relation.app] and "password" in relation.data[relation.app] ) @@ -599,10 +608,7 @@ def is_resource_created(self, relation_id: Optional[int] = None) -> bool: else: return ( all( - [ - self._is_resource_created_for_relation(relation) - for relation in self.relations - ] + self._is_resource_created_for_relation(relation) for relation in self.relations ) if self.relations else False @@ -618,6 +624,9 @@ class ExtraRoleEvent(RelationEvent): @property def extra_user_roles(self) -> Optional[str]: """Returns the extra user roles that were requested.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("extra-user-roles") @@ -627,21 +636,33 @@ class AuthenticationEvent(RelationEvent): @property def username(self) -> Optional[str]: """Returns the created username.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("username") @property def password(self) -> Optional[str]: """Returns the password for the created user.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("password") @property def tls(self) -> Optional[str]: """Returns whether TLS is configured.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("tls") @property def tls_ca(self) -> Optional[str]: """Returns TLS CA.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("tls-ca") @@ -654,6 +675,9 @@ class DatabaseProvidesEvent(RelationEvent): @property def database(self) -> Optional[str]: """Returns the database that was requested.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("database") @@ -676,6 +700,9 @@ class DatabaseRequiresEvent(RelationEvent): @property def database(self) -> Optional[str]: """Returns the database name.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("database") @property @@ -685,6 +712,9 @@ def endpoints(self) -> Optional[str]: In VM charms, this is the primary's address. In kubernetes charms, this is the service to the primary pod. """ + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("endpoints") @property @@ -694,6 +724,9 @@ def read_only_endpoints(self) -> Optional[str]: In VM charms, this is the address of all the secondary instances. In kubernetes charms, this is the service to all replica pod instances. """ + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("read-only-endpoints") @property @@ -702,6 +735,9 @@ def replset(self) -> Optional[str]: MongoDB only. """ + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("replset") @property @@ -710,6 +746,9 @@ def uris(self) -> Optional[str]: MongoDB, Redis, OpenSearch. """ + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("uris") @property @@ -718,6 +757,9 @@ def version(self) -> Optional[str]: Version as informed by the database daemon. """ + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("version") @@ -750,7 +792,7 @@ class DatabaseRequiresEvents(CharmEvents): class DatabaseProvides(DataProvides): """Provider-side of the database relations.""" - on = DatabaseProvidesEvents() + on = DatabaseProvidesEvents() # pyright: ignore [reportGeneralTypeIssues] def __init__(self, charm: CharmBase, relation_name: str) -> None: super().__init__(charm, relation_name) @@ -767,7 +809,9 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: # Emit a database requested event if the setup key (database name and optional # extra user roles) was added to the relation databag by the application. if "database" in diff.added: - self.on.database_requested.emit(event.relation, app=event.app, unit=event.unit) + getattr(self.on, "database_requested").emit( + event.relation, app=event.app, unit=event.unit + ) def set_database(self, relation_id: int, database_name: str) -> None: """Set database name. @@ -844,15 +888,15 @@ def set_version(self, relation_id: int, version: str) -> None: class DatabaseRequires(DataRequires): """Requires-side of the database relation.""" - on = DatabaseRequiresEvents() + on = DatabaseRequiresEvents() # pyright: ignore [reportGeneralTypeIssues] def __init__( self, charm, relation_name: str, database_name: str, - extra_user_roles: str = None, - relations_aliases: List[str] = None, + extra_user_roles: Optional[str] = None, + relations_aliases: Optional[List[str]] = None, ): """Manager of database client relations.""" super().__init__(charm, relation_name, extra_user_roles) @@ -974,7 +1018,9 @@ def is_postgresql_plugin_enabled(self, plugin: str, relation_index: int = 0) -> try: with psycopg.connect(connection_string) as connection: with connection.cursor() as cursor: - cursor.execute(f"SELECT TRUE FROM pg_extension WHERE extname='{plugin}';") + cursor.execute( + "SELECT TRUE FROM pg_extension WHERE extname=%s::text;", (plugin,) + ) return cursor.fetchone() is not None except psycopg.Error as e: logger.exception( @@ -1010,7 +1056,9 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: if "username" in diff.added and "password" in diff.added: # Emit the default event (the one without an alias). logger.info("database created at %s", datetime.now()) - self.on.database_created.emit(event.relation, app=event.app, unit=event.unit) + getattr(self.on, "database_created").emit( + event.relation, app=event.app, unit=event.unit + ) # Emit the aliased event (if any). self._emit_aliased_event(event, "database_created") @@ -1024,7 +1072,9 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: if "endpoints" in diff.added or "endpoints" in diff.changed: # Emit the default event (the one without an alias). logger.info("endpoints changed on %s", datetime.now()) - self.on.endpoints_changed.emit(event.relation, app=event.app, unit=event.unit) + getattr(self.on, "endpoints_changed").emit( + event.relation, app=event.app, unit=event.unit + ) # Emit the aliased event (if any). self._emit_aliased_event(event, "endpoints_changed") @@ -1038,7 +1088,7 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: if "read-only-endpoints" in diff.added or "read-only-endpoints" in diff.changed: # Emit the default event (the one without an alias). logger.info("read-only-endpoints changed on %s", datetime.now()) - self.on.read_only_endpoints_changed.emit( + getattr(self.on, "read_only_endpoints_changed").emit( event.relation, app=event.app, unit=event.unit ) @@ -1055,11 +1105,17 @@ class KafkaProvidesEvent(RelationEvent): @property def topic(self) -> Optional[str]: """Returns the topic that was requested.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("topic") @property def consumer_group_prefix(self) -> Optional[str]: """Returns the consumer-group-prefix that was requested.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("consumer-group-prefix") @@ -1082,21 +1138,33 @@ class KafkaRequiresEvent(RelationEvent): @property def topic(self) -> Optional[str]: """Returns the topic.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("topic") @property def bootstrap_server(self) -> Optional[str]: """Returns a comma-separated list of broker uris.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("endpoints") @property def consumer_group_prefix(self) -> Optional[str]: """Returns the consumer-group-prefix.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("consumer-group-prefix") @property def zookeeper_uris(self) -> Optional[str]: """Returns a comma separated list of Zookeeper uris.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("zookeeper-uris") @@ -1124,7 +1192,7 @@ class KafkaRequiresEvents(CharmEvents): class KafkaProvides(DataProvides): """Provider-side of the Kafka relation.""" - on = KafkaProvidesEvents() + on = KafkaProvidesEvents() # pyright: ignore [reportGeneralTypeIssues] def __init__(self, charm: CharmBase, relation_name: str) -> None: super().__init__(charm, relation_name) @@ -1141,7 +1209,9 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: # Emit a topic requested event if the setup key (topic name and optional # extra user roles) was added to the relation databag by the application. if "topic" in diff.added: - self.on.topic_requested.emit(event.relation, app=event.app, unit=event.unit) + getattr(self.on, "topic_requested").emit( + event.relation, app=event.app, unit=event.unit + ) def set_topic(self, relation_id: int, topic: str) -> None: """Set topic name in the application relation databag. @@ -1183,7 +1253,7 @@ def set_zookeeper_uris(self, relation_id: int, zookeeper_uris: str) -> None: class KafkaRequires(DataRequires): """Requires-side of the Kafka relation.""" - on = KafkaRequiresEvents() + on = KafkaRequiresEvents() # pyright: ignore [reportGeneralTypeIssues] def __init__( self, @@ -1200,6 +1270,18 @@ def __init__( self.topic = topic self.consumer_group_prefix = consumer_group_prefix or "" + @property + def topic(self): + """Topic to use in Kafka.""" + return self._topic + + @topic.setter + def topic(self, value): + # Avoid wildcards + if value == "*": + raise ValueError(f"Error on topic '{value}', cannot be a wildcard.") + self._topic = value + def _on_relation_joined_event(self, event: RelationJoinedEvent) -> None: """Event emitted when the application joins the Kafka relation.""" # Sets topic, extra user roles, and "consumer-group-prefix" in the relation @@ -1220,7 +1302,7 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: if "username" in diff.added and "password" in diff.added: # Emit the default event (the one without an alias). logger.info("topic created at %s", datetime.now()) - self.on.topic_created.emit(event.relation, app=event.app, unit=event.unit) + getattr(self.on, "topic_created").emit(event.relation, app=event.app, unit=event.unit) # To avoid unnecessary application restarts do not trigger # “endpoints_changed“ event if “topic_created“ is triggered. @@ -1231,7 +1313,7 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: if "endpoints" in diff.added or "endpoints" in diff.changed: # Emit the default event (the one without an alias). logger.info("endpoints changed on %s", datetime.now()) - self.on.bootstrap_server_changed.emit( + getattr(self.on, "bootstrap_server_changed").emit( event.relation, app=event.app, unit=event.unit ) # here check if this is the right design return @@ -1246,6 +1328,9 @@ class OpenSearchProvidesEvent(RelationEvent): @property def index(self) -> Optional[str]: """Returns the index that was requested.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("index") @@ -1287,7 +1372,7 @@ class OpenSearchRequiresEvents(CharmEvents): class OpenSearchProvides(DataProvides): """Provider-side of the OpenSearch relation.""" - on = OpenSearchProvidesEvents() + on = OpenSearchProvidesEvents() # pyright: ignore[reportGeneralTypeIssues] def __init__(self, charm: CharmBase, relation_name: str) -> None: super().__init__(charm, relation_name) @@ -1304,7 +1389,9 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: # Emit an index requested event if the setup key (index name and optional extra user roles) # have been added to the relation databag by the application. if "index" in diff.added: - self.on.index_requested.emit(event.relation, app=event.app, unit=event.unit) + getattr(self.on, "index_requested").emit( + event.relation, app=event.app, unit=event.unit + ) def set_index(self, relation_id: int, index: str) -> None: """Set the index in the application relation databag. @@ -1339,7 +1426,7 @@ def set_version(self, relation_id: int, version: str) -> None: class OpenSearchRequires(DataRequires): """Requires-side of the OpenSearch relation.""" - on = OpenSearchRequiresEvents() + on = OpenSearchRequiresEvents() # pyright: ignore[reportGeneralTypeIssues] def __init__( self, charm, relation_name: str, index: str, extra_user_roles: Optional[str] = None @@ -1371,14 +1458,16 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: updates = {"username", "password", "tls", "tls-ca"} if len(set(diff._asdict().keys()) - updates) < len(diff): logger.info("authentication updated at: %s", datetime.now()) - self.on.authentication_updated.emit(event.relation, app=event.app, unit=event.unit) + getattr(self.on, "authentication_updated").emit( + event.relation, app=event.app, unit=event.unit + ) # Check if the index is created # (the OpenSearch charm shares the credentials). if "username" in diff.added and "password" in diff.added: # Emit the default event (the one without an alias). logger.info("index created at: %s", datetime.now()) - self.on.index_created.emit(event.relation, app=event.app, unit=event.unit) + getattr(self.on, "index_created").emit(event.relation, app=event.app, unit=event.unit) # To avoid unnecessary application restarts do not trigger # “endpoints_changed“ event if “index_created“ is triggered. @@ -1389,7 +1478,7 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: if "endpoints" in diff.added or "endpoints" in diff.changed: # Emit the default event (the one without an alias). logger.info("endpoints changed on %s", datetime.now()) - self.on.endpoints_changed.emit( + getattr(self.on, "endpoints_changed").emit( event.relation, app=event.app, unit=event.unit ) # here check if this is the right design return diff --git a/lib/charms/grafana_agent/v0/cos_agent.py b/lib/charms/grafana_agent/v0/cos_agent.py index 0acaed361..b72897df7 100644 --- a/lib/charms/grafana_agent/v0/cos_agent.py +++ b/lib/charms/grafana_agent/v0/cos_agent.py @@ -185,13 +185,13 @@ class _MetricsEndpointDict(TypedDict): port: int except ModuleNotFoundError: - _MetricsEndpointDict = dict + _MetricsEndpointDict = Dict # pyright: ignore LIBID = "dc15fa84cef84ce58155fb84f6c6213a" LIBAPI = 0 -LIBPATCH = 3 +LIBPATCH = 4 -PYDEPS = ["cosl", "pydantic"] +PYDEPS = ["cosl", "pydantic<2"] DEFAULT_RELATION_NAME = "cos-agent" DEFAULT_PEER_RELATION_NAME = "peers" @@ -217,8 +217,12 @@ def _serialize(raw_json: Union[str, bytes]) -> "GrafanaDashboard": return GrafanaDashboard(encoded) def _deserialize(self) -> Dict: - raw = lzma.decompress(base64.b64decode(self.encode("utf-8"))).decode() - return json.loads(raw) + try: + raw = lzma.decompress(base64.b64decode(self.encode("utf-8"))).decode() + return json.loads(raw) + except json.decoder.JSONDecodeError as e: + logger.error("Invalid Dashboard format: %s", e) + return {} def __repr__(self): """Return string representation of self.""" @@ -247,7 +251,7 @@ class CosAgentProviderUnitData(pydantic.BaseModel): class CosAgentPeersUnitData(pydantic.BaseModel): - """Unit databag model for `cluster` cos-agent machine charm peer relation.""" + """Unit databag model for `peers` cos-agent machine charm peer relation.""" # We need the principal unit name and relation metadata to be able to render identifiers # (e.g. topology) on the leader side, after all the data moves into peer data (the grafana @@ -307,12 +311,11 @@ def __init__( refresh_events: List of events on which to refresh relation data. """ super().__init__(charm, relation_name) - metrics_endpoints = metrics_endpoints or [DEFAULT_METRICS_ENDPOINT] dashboard_dirs = dashboard_dirs or ["./src/grafana_dashboards"] self._charm = charm self._relation_name = relation_name - self._metrics_endpoints = metrics_endpoints + self._metrics_endpoints = metrics_endpoints or [DEFAULT_METRICS_ENDPOINT] self._metrics_rules = metrics_rules_dir self._logs_rules = logs_rules_dir self._recursive = recurse_rules_dirs @@ -339,14 +342,20 @@ def _on_refresh(self, event): # Add a guard to make sure it doesn't happen. if relation.data and self._charm.unit in relation.data: # Subordinate relations can communicate only over unit data. - data = CosAgentProviderUnitData( - metrics_alert_rules=self._metrics_alert_rules, - log_alert_rules=self._log_alert_rules, - dashboards=self._dashboards, - metrics_scrape_jobs=self._scrape_jobs, - log_slots=self._log_slots, - ) - relation.data[self._charm.unit][data.KEY] = data.json() + try: + data = CosAgentProviderUnitData( + metrics_alert_rules=self._metrics_alert_rules, + log_alert_rules=self._log_alert_rules, + dashboards=self._dashboards, + metrics_scrape_jobs=self._scrape_jobs, + log_slots=self._log_slots, + ) + relation.data[self._charm.unit][data.KEY] = data.json() + except ( + pydantic.ValidationError, + json.decoder.JSONDecodeError, + ) as e: + logger.error("Invalid relation data provided: %s", e) @property def _scrape_jobs(self) -> List[Dict]: @@ -387,16 +396,33 @@ class COSAgentDataChanged(EventBase): """Event emitted by `COSAgentRequirer` when relation data changes.""" +class COSAgentValidationError(EventBase): + """Event emitted by `COSAgentRequirer` when there is an error in the relation data.""" + + def __init__(self, handle, message: str = ""): + super().__init__(handle) + self.message = message + + def snapshot(self) -> Dict: + """Save COSAgentValidationError source information.""" + return {"message": self.message} + + def restore(self, snapshot): + """Restore COSAgentValidationError source information.""" + self.message = snapshot["message"] + + class COSAgentRequirerEvents(ObjectEvents): """`COSAgentRequirer` events.""" data_changed = EventSource(COSAgentDataChanged) + validation_error = EventSource(COSAgentValidationError) class COSAgentRequirer(Object): """Integration endpoint wrapper for the Requirer side of the cos_agent interface.""" - on = COSAgentRequirerEvents() + on = COSAgentRequirerEvents() # pyright: ignore def __init__( self, @@ -426,7 +452,7 @@ def __init__( ) # TODO: do we need this? self.framework.observe(events.relation_changed, self._on_relation_data_changed) for event in self._refresh_events: - self.framework.observe(event, self.trigger_refresh) + self.framework.observe(event, self.trigger_refresh) # pyright: ignore # Peer relation events # A peer relation is needed as it is the only mechanism for exchanging data across @@ -450,7 +476,7 @@ def _on_peer_relation_changed(self, _): # Peer data is used for forwarding data from principal units to the grafana agent # subordinate leader, for updating the app data of the outgoing o11y relations. if self._charm.unit.is_leader(): - self.on.data_changed.emit() + self.on.data_changed.emit() # pyright: ignore def _on_relation_data_changed(self, event: RelationChangedEvent): # Peer data is the only means of communication between subordinate units. @@ -474,7 +500,9 @@ def _on_relation_data_changed(self, event: RelationChangedEvent): if not (raw := cos_agent_relation.data[principal_unit].get(CosAgentProviderUnitData.KEY)): return - provider_data = CosAgentProviderUnitData(**json.loads(raw)) + + if not (provider_data := self._validated_provider_data(raw)): + return # Copy data from the principal relation to the peer relation, so the leader could # follow up. @@ -492,12 +520,19 @@ def _on_relation_data_changed(self, event: RelationChangedEvent): # We can't easily tell if the data that was changed is limited to only the data # that goes into peer relation (in which case, if this is not a leader unit, we wouldn't # need to emit `on.data_changed`), so we're emitting `on.data_changed` either way. - self.on.data_changed.emit() + self.on.data_changed.emit() # pyright: ignore + + def _validated_provider_data(self, raw) -> Optional[CosAgentProviderUnitData]: + try: + return CosAgentProviderUnitData(**json.loads(raw)) + except (pydantic.ValidationError, json.decoder.JSONDecodeError) as e: + self.on.validation_error.emit(message=str(e)) # pyright: ignore + return None def trigger_refresh(self, _): """Trigger a refresh of relation data.""" # FIXME: Figure out what we should do here - self.on.data_changed.emit() + self.on.data_changed.emit() # pyright: ignore @property def _principal_unit(self) -> Optional[Unit]: @@ -529,17 +564,24 @@ def _principal_unit_data(self) -> Optional[CosAgentProviderUnitData]: Relies on the fact that, for subordinate relations, the only remote unit visible to *this unit* is the principal unit that this unit is attached to. """ - if relations := self._principal_relations: - # Technically it's a list, but for subordinates there can only be one relation - principal_relation = next(iter(relations)) - if units := principal_relation.units: - # Technically it's a list, but for subordinates there can only be one - unit = next(iter(units)) - raw = principal_relation.data[unit].get(CosAgentProviderUnitData.KEY) - if raw: - return CosAgentProviderUnitData(**json.loads(raw)) + if not (relations := self._principal_relations): + return None - return None + # Technically it's a list, but for subordinates there can only be one relation + principal_relation = next(iter(relations)) + + if not (units := principal_relation.units): + return None + + # Technically it's a list, but for subordinates there can only be one + unit = next(iter(units)) + if not (raw := principal_relation.data[unit].get(CosAgentProviderUnitData.KEY)): + return None + + if not (provider_data := self._validated_provider_data(raw)): + return None + + return provider_data def _gather_peer_data(self) -> List[CosAgentPeersUnitData]: """Collect data from the peers. @@ -578,7 +620,7 @@ def metrics_alerts(self) -> Dict[str, Any]: alert_rules = {} seen_apps: List[str] = [] - for data in self._gather_peer_data(): # type: CosAgentPeersUnitData + for data in self._gather_peer_data(): if rules := data.metrics_alert_rules: app_name = data.app_name if app_name in seen_apps: @@ -649,7 +691,7 @@ def logs_alerts(self) -> Dict[str, Any]: alert_rules = {} seen_apps: List[str] = [] - for data in self._gather_peer_data(): # type: CosAgentPeersUnitData + for data in self._gather_peer_data(): if rules := data.log_alert_rules: # This is only used for naming the file, so be as specific as we can be app_name = data.app_name @@ -678,10 +720,10 @@ def dashboards(self) -> List[Dict[str, str]]: Dashboards are assumed not to vary across units of the same primary. """ - dashboards: List[Dict[str, str]] = [] + dashboards: List[Dict[str, Any]] = [] seen_apps: List[str] = [] - for data in self._gather_peer_data(): # type: CosAgentPeersUnitData + for data in self._gather_peer_data(): app_name = data.app_name if app_name in seen_apps: continue # dedup! diff --git a/lib/charms/operator_libs_linux/v2/snap.py b/lib/charms/operator_libs_linux/v2/snap.py index b82024c52..16e0261b0 100644 --- a/lib/charms/operator_libs_linux/v2/snap.py +++ b/lib/charms/operator_libs_linux/v2/snap.py @@ -83,7 +83,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 0 +LIBPATCH = 1 # Regex to locate 7-bit C1 ANSI sequences @@ -273,13 +273,13 @@ def _snap(self, command: str, optargs: Optional[Iterable[str]] = None) -> str: SnapError if there is a problem encountered """ optargs = optargs or [] - _cmd = ["snap", command, self._name, *optargs] + args = ["snap", command, self._name, *optargs] try: - return subprocess.check_output(_cmd, universal_newlines=True) + return subprocess.check_output(args, universal_newlines=True) except CalledProcessError as e: raise SnapError( "Snap: {!r}; command {!r} failed with output = {!r}".format( - self._name, _cmd, e.output + self._name, args, e.output ) ) @@ -303,12 +303,12 @@ def _snap_daemons( else: services = [self._name] - _cmd = ["snap", *command, *services] + args = ["snap", *command, *services] try: - return subprocess.run(_cmd, universal_newlines=True, check=True, capture_output=True) + return subprocess.run(args, universal_newlines=True, check=True, capture_output=True) except CalledProcessError as e: - raise SnapError("Could not {} for snap [{}]: {}".format(_cmd, self._name, e.stderr)) + raise SnapError("Could not {} for snap [{}]: {}".format(args, self._name, e.stderr)) def get(self, key) -> str: """Fetch a snap configuration value. @@ -387,11 +387,11 @@ def connect( elif slot: command = command + [slot] - _cmd = ["snap", *command] + args = ["snap", *command] try: - subprocess.run(_cmd, universal_newlines=True, check=True, capture_output=True) + subprocess.run(args, universal_newlines=True, check=True, capture_output=True) except CalledProcessError as e: - raise SnapError("Could not {} for snap [{}]: {}".format(_cmd, self._name, e.stderr)) + raise SnapError("Could not {} for snap [{}]: {}".format(args, self._name, e.stderr)) def hold(self, duration: Optional[timedelta] = None) -> None: """Add a refresh hold to a snap. @@ -409,6 +409,25 @@ def unhold(self) -> None: """Remove the refresh hold of a snap.""" self._snap("refresh", ["--unhold"]) + def alias(self, application: str, alias: Optional[str] = None) -> None: + """Create an alias for a given application. + + Args: + application: application to get an alias. + alias: (optional) name of the alias; if not provided, the application name is used. + """ + if alias is None: + alias = application + args = ["snap", "alias", f"{self.name}.{application}", alias] + try: + subprocess.check_output(args, universal_newlines=True) + except CalledProcessError as e: + raise SnapError( + "Snap: {!r}; command {!r} failed with output = {!r}".format( + self._name, args, e.output + ) + ) + def restart( self, services: Optional[List[str]] = None, reload: Optional[bool] = False ) -> None: @@ -992,17 +1011,17 @@ def install_local( Raises: SnapError if there is a problem encountered """ - _cmd = [ + args = [ "snap", "install", filename, ] if classic: - _cmd.append("--classic") + args.append("--classic") if dangerous: - _cmd.append("--dangerous") + args.append("--dangerous") try: - result = subprocess.check_output(_cmd, universal_newlines=True).splitlines()[-1] + result = subprocess.check_output(args, universal_newlines=True).splitlines()[-1] snap_name, _ = result.split(" ", 1) snap_name = ansi_filter.sub("", snap_name) @@ -1026,9 +1045,9 @@ def _system_set(config_item: str, value: str) -> None: config_item: name of snap system setting. E.g. 'refresh.hold' value: value to assign """ - _cmd = ["snap", "set", "system", "{}={}".format(config_item, value)] + args = ["snap", "set", "system", "{}={}".format(config_item, value)] try: - subprocess.check_call(_cmd, universal_newlines=True) + subprocess.check_call(args, universal_newlines=True) except CalledProcessError: raise SnapError("Failed setting system config '{}' to '{}'".format(config_item, value)) diff --git a/src/charm.py b/src/charm.py index 21e48fd6b..a9e6479c6 100755 --- a/src/charm.py +++ b/src/charm.py @@ -139,6 +139,19 @@ def _on_install(self, _) -> None: f"/etc/systemd/system/{PGB}-{self.app.name}@.service", rendered, perms=0o644 ) systemd.daemon_reload() + # Render the logrotate config + with open("templates/logrotate.j2", "r") as file: + template = Template(file.read()) + # Logrotate expects the file to be owned by root + with open(f"/etc/logrotate.d/{PGB}-{self.app.name}", "w+") as file: + file.write( + template.render( + log_dir=PGB_LOG_DIR, + app_name=self.app.name, + service_ids=self.service_ids, + prefix=PGB, + ) + ) self.unit.status = WaitingStatus("Waiting to start PgBouncer") @@ -165,6 +178,7 @@ def _on_remove(self, _) -> None: os.remove(f"/etc/systemd/system/{PGB}-{self.app.name}@.service") self.remove_exporter_service() + os.remove(f"/etc/logrotate.d/{PGB}-{self.app.name}") shutil.rmtree(f"{PGB_CONF_DIR}/{self.app.name}") shutil.rmtree(f"{PGB_LOG_DIR}/{self.app.name}") @@ -386,6 +400,7 @@ def render_prometheus_service(self): # Render the template file with the correct values. rendered = template.render( stats_user=self.backend.stats_user, + pgb_service=f"{PGB}-{self.app.name}", stats_password=self.peers.get_secret("app", MONITORING_PASSWORD_KEY), listen_port=self.config["listen_port"], metrics_port=self.config["metrics_port"], diff --git a/templates/logrotate.j2 b/templates/logrotate.j2 new file mode 100644 index 000000000..1b1532f56 --- /dev/null +++ b/templates/logrotate.j2 @@ -0,0 +1,16 @@ +{{ log_dir }}/{{ app_name }}/instance_*/pgbouncer.log { + rotate 10 + missingok + sharedscripts + notifempty + nocompress + daily + create 0600 snap_daemon snap_daemon + dateext + dateformat -%Y%m%d_%H:%M.log + postrotate + {% for id in service_ids %} + systemctl reload {{ prefix }}-{{ app_name }}@{{ id }} + {% endfor %} + endscript +} \ No newline at end of file diff --git a/templates/pgbouncer.service.j2 b/templates/pgbouncer.service.j2 index 151812a78..cf75e9a56 100644 --- a/templates/pgbouncer.service.j2 +++ b/templates/pgbouncer.service.j2 @@ -11,6 +11,7 @@ ExecStartPre=-/usr/bin/install -o snap_daemon -g snap_daemon -m 700 -d \ {{ snap_tmp_dir }}/{{ app_name }}/instance_%i/ ExecStart=/snap/bin/charmed-postgresql.pgbouncer-server -R {{ conf_dir }}/{{ app_name }}/instance_%i/pgbouncer.ini KillSignal=SIGINT +ExecReload=kill -HUP $MAINPID Restart=always RestartSec=5s diff --git a/templates/prometheus-exporter.service.j2 b/templates/prometheus-exporter.service.j2 index fd5b96c49..0dcee1f83 100644 --- a/templates/prometheus-exporter.service.j2 +++ b/templates/prometheus-exporter.service.j2 @@ -1,6 +1,6 @@ [Unit] Description=prometheus exporter for pgbouncer -After=network.target +After=network.target {{ pgb_service }}@.target [Service] Type=simple diff --git a/tests/integration/relations/pgbouncer_provider/application-charm/lib/charms/data_platform_libs/v0/data_interfaces.py b/tests/integration/relations/pgbouncer_provider/application-charm/lib/charms/data_platform_libs/v0/data_interfaces.py index 86d7521a8..10bda6dbe 100644 --- a/tests/integration/relations/pgbouncer_provider/application-charm/lib/charms/data_platform_libs/v0/data_interfaces.py +++ b/tests/integration/relations/pgbouncer_provider/application-charm/lib/charms/data_platform_libs/v0/data_interfaces.py @@ -316,7 +316,7 @@ def _on_topic_requested(self, event: TopicRequestedEvent): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 12 +LIBPATCH = 13 PYDEPS = ["ops>=2.0.0"] @@ -1200,6 +1200,18 @@ def __init__( self.topic = topic self.consumer_group_prefix = consumer_group_prefix or "" + @property + def topic(self): + """Topic to use in Kafka.""" + return self._topic + + @topic.setter + def topic(self, value): + # Avoid wildcards + if value == "*": + raise ValueError(f"Error on topic '{value}', cannot be a wildcard.") + self._topic = value + def _on_relation_joined_event(self, event: RelationJoinedEvent) -> None: """Event emitted when the application joins the Kafka relation.""" # Sets topic, extra user roles, and "consumer-group-prefix" in the relation diff --git a/tests/integration/relations/test_db.py b/tests/integration/relations/test_db.py index d54d420b6..7e194308c 100644 --- a/tests/integration/relations/test_db.py +++ b/tests/integration/relations/test_db.py @@ -128,7 +128,7 @@ async def test_extensions(ops_test: OpsTest, pgb_charm_jammy, application_charm) == EXTENSIONS_BLOCKING_MESSAGE ) - logger.info("Rekatung with enabled extensions") + logger.info("Relating with enabled extensions") await ops_test.model.applications[pgb_jammy].remove_relation( f"{CLIENT_APP_NAME}:db", f"{pgb_jammy}:db" ) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 150b14f54..019332f74 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -9,7 +9,7 @@ from charms.pgbouncer_k8s.v0.pgb import DEFAULT_CONFIG, PgbConfig from pytest_operator.plugin import OpsTest -from constants import BACKEND_RELATION_NAME, PGB_CONF_DIR +from constants import BACKEND_RELATION_NAME, PGB_CONF_DIR, PGB_LOG_DIR from tests.integration.helpers.helpers import ( CLIENT_APP_NAME, FIRST_DATABASE_RELATION_NAME, @@ -116,3 +116,16 @@ async def test_systemd_restarts_exporter_process(ops_test: OpsTest): # verify all processes start again assert await get_running_instances(unit, "pgbouncer_expor") == 1 + + +async def test_logrotate(ops_test: OpsTest): + """Verify that logs will be rotated.""" + unit = ops_test.model.units[f"{PGB}/0"] + await unit.run("logrotate -f /etc/logrotate.conf") + + cmd = f"ssh {PGB}/0 sudo ls {PGB_LOG_DIR}/{PGB}/instance_0" + return_code, output, _ = await ops_test.juju(*cmd.split(" ")) + assert return_code == 0 + logs = output.strip().split() + logs.remove("pgbouncer.log") + assert len(logs), "Log not rotated" diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index ea0dadb1a..59eb8e483 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -39,6 +39,7 @@ def setUp(self): self.charm = self.harness.charm self.unit = self.harness.charm.unit + @patch("builtins.open", unittest.mock.mock_open()) @patch("charm.PgBouncerCharm._install_snap_packages") @patch("charms.operator_libs_linux.v1.systemd.service_stop") @patch("os.makedirs")