diff --git a/lib/charms/mongodb/v0/mongodb_secrets.py b/lib/charms/mongodb/v0/mongodb_secrets.py new file mode 100644 index 00000000..50696548 --- /dev/null +++ b/lib/charms/mongodb/v0/mongodb_secrets.py @@ -0,0 +1,152 @@ +"""Secrets related helper classes/functions.""" + +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +from typing import Dict, Optional + +from ops import Secret, SecretInfo +from ops.charm import CharmBase +from ops.model import ModelError, SecretNotFoundError + +from config import Config +from exceptions import SecretAlreadyExistsError + +# The unique Charmhub library identifier, never change it + +# The unique Charmhub library identifier, never change it +LIBID = "89cefc863fd747d7ace12cb508e7bec2" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 2 + +APP_SCOPE = Config.Relations.APP_SCOPE +UNIT_SCOPE = Config.Relations.UNIT_SCOPE +Scopes = Config.Relations.Scopes + + +def generate_secret_label(charm: CharmBase, scope: Scopes) -> str: + """Generate unique group_mappings for secrets within a relation context. + + Defined as a standalone function, as the choice on secret labels definition belongs to the + Application Logic. To be kept separate from classes below, which are simply to provide a + (smart) abstraction layer above Juju Secrets. + """ + members = [charm.app.name, scope] + return f"{'.'.join(members)}" + + +# Secret cache + + +class CachedSecret: + """Abstraction layer above direct Juju access with caching. + + The data structure is precisely re-using/simulating Juju Secrets behavior, while + also making sure not to fetch a secret multiple times within the same event scope. + """ + + def __init__(self, charm: CharmBase, label: str, secret_uri: Optional[str] = None): + self._secret_meta = None + self._secret_content = {} + self._secret_uri = secret_uri + self.label = label + self.charm = charm + + def add_secret(self, content: Dict[str, str], scope: Scopes) -> Secret: + """Create a new secret.""" + if self._secret_uri: + raise SecretAlreadyExistsError( + "Secret is already defined with uri %s", self._secret_uri + ) + + if scope == Config.Relations.APP_SCOPE: + secret = self.charm.app.add_secret(content, label=self.label) + else: + secret = self.charm.unit.add_secret(content, label=self.label) + self._secret_uri = secret.id + self._secret_meta = secret + return self._secret_meta + + @property + def meta(self) -> Optional[Secret]: + """Getting cached secret meta-information.""" + if self._secret_meta: + return self._secret_meta + + if not (self._secret_uri or self.label): + return + + try: + self._secret_meta = self.charm.model.get_secret(label=self.label) + except SecretNotFoundError: + if self._secret_uri: + self._secret_meta = self.charm.model.get_secret( + id=self._secret_uri, label=self.label + ) + return self._secret_meta + + def get_content(self) -> Dict[str, str]: + """Getting cached secret content.""" + if not self._secret_content: + if self.meta: + try: + self._secret_content = self.meta.get_content(refresh=True) + except (ValueError, ModelError) as err: + # https://bugs.launchpad.net/juju/+bug/2042596 + # Only triggered when 'refresh' is set + known_model_errors = [ + "ERROR either URI or label should be used for getting an owned secret but not both", + "ERROR secret owner cannot use --refresh", + ] + if isinstance(err, ModelError) and not any( + msg in str(err) for msg in known_model_errors + ): + raise + # Due to: ValueError: Secret owner cannot use refresh=True + self._secret_content = self.meta.get_content() + return self._secret_content + + def set_content(self, content: Dict[str, str]) -> None: + """Setting cached secret content.""" + if self.meta: + self.meta.set_content(content) + self._secret_content = content + + def get_info(self) -> Optional[SecretInfo]: + """Wrapper function for get the corresponding call on the Secret object if any.""" + if self.meta: + return self.meta.get_info() + + +class SecretCache: + """A data structure storing CachedSecret objects.""" + + def __init__(self, charm): + self.charm = charm + self._secrets: Dict[str, CachedSecret] = {} + + def get(self, label: str, uri: Optional[str] = None) -> Optional[CachedSecret]: + """Getting a secret from Juju Secret store or cache.""" + if not self._secrets.get(label): + secret = CachedSecret(self.charm, label, uri) + if secret.meta: + self._secrets[label] = secret + return self._secrets.get(label) + + def add(self, label: str, content: Dict[str, str], scope: Scopes) -> CachedSecret: + """Adding a secret to Juju Secret.""" + if self._secrets.get(label): + raise SecretAlreadyExistsError(f"Secret {label} already exists") + + secret = CachedSecret(self.charm, label) + secret.add_secret(content, scope) + self._secrets[label] = secret + return self._secrets[label] + + +# END: Secret cache diff --git a/lib/charms/mongodb/v0/set_status.py b/lib/charms/mongodb/v0/set_status.py new file mode 100644 index 00000000..22c2f25b --- /dev/null +++ b/lib/charms/mongodb/v0/set_status.py @@ -0,0 +1,259 @@ +#!/usr/bin/env python3 +"""Code for handing statuses in the app and unit.""" +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +import json +import logging +from typing import Tuple + +from charms.mongodb.v1.mongodb import MongoDBConfiguration, MongoDBConnection +from ops.charm import CharmBase +from ops.framework import Object +from ops.model import ActiveStatus, BlockedStatus, StatusBase, WaitingStatus +from pymongo.errors import AutoReconnect, OperationFailure, ServerSelectionTimeoutError + +from config import Config + +# The unique Charmhub library identifier, never change it +LIBID = "9b0b9fac53244229aed5ffc5e62141eb" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 1 + +AUTH_FAILED_CODE = 18 +UNAUTHORISED_CODE = 13 +TLS_CANNOT_FIND_PRIMARY = 133 + + +logger = logging.getLogger(__name__) + + +class MongoDBStatusHandler(Object): + """Verifies versions across multiple integrated applications.""" + + def __init__( + self, + charm: CharmBase, + ) -> None: + """Constructor for CrossAppVersionChecker. + + Args: + charm: charm to inherit from. + """ + super().__init__(charm, None) + self.charm = charm + + # TODO Future PR: handle update_status + + # BEGIN Helpers + + def set_and_share_status(self, status: StatusBase): + """Sets the charm status and shares to app status and config-server if applicable.""" + # TODO Future Feature/Epic: process other statuses, i.e. only set provided status if its + # appropriate. + self.charm.unit.status = status + + self.set_app_status() + + if self.charm.is_role(Config.Role.SHARD): + self.share_status_to_config_server() + + def set_app_status(self): + """TODO Future Feature/Epic: parse statuses and set a status for the entire app.""" + + def is_current_unit_ready(self, ignore_unhealthy_upgrade: bool = False) -> bool: + """Returns True if the current unit status shows that the unit is ready. + + Note: we allow the use of ignore_unhealthy_upgrade, to avoid infinite loops due to this + function returning False and preventing the status from being reset. + """ + if isinstance(self.charm.unit.status, ActiveStatus): + return True + + if ignore_unhealthy_upgrade and self.charm.unit.status == Config.Status.UNHEALTHY_UPGRADE: + return True + + return self.is_status_related_to_mismatched_revision( + type(self.charm.unit.status).__name__.lower() + ) + + def is_status_related_to_mismatched_revision(self, status_type: str) -> bool: + """Returns True if the current status is related to a mimsatch in revision. + + Note: A few functions calling this method receive states differently. One receives them by + "goal state" which processes data differently and the other via the ".status" property. + Hence we have to be flexible to handle each. + """ + if not self.charm.get_cluster_mismatched_revision_status(): + return False + + if "waiting" in status_type and self.charm.is_role(Config.Role.CONFIG_SERVER): + return True + + if "blocked" in status_type and self.charm.is_role(Config.Role.SHARD): + return True + + return False + + def are_all_units_ready_for_upgrade(self, unit_to_ignore: str = "") -> bool: + """Returns True if all charm units status's show that they are ready for upgrade.""" + goal_state = self.charm.model._backend._run( + "goal-state", return_output=True, use_json=True + ) + for unit_name, unit_state in goal_state["units"].items(): + if unit_name == unit_to_ignore: + continue + if unit_state["status"] == "active": + continue + if not self.is_status_related_to_mismatched_revision(unit_state["status"]): + return False + + return True + + def are_shards_status_ready_for_upgrade(self) -> bool: + """Returns True if all integrated shards status's show that they are ready for upgrade. + + A shard is ready for upgrade if it is either in the waiting for upgrade status or active + status. + """ + if not self.charm.is_role(Config.Role.CONFIG_SERVER): + return False + + for sharding_relation in self.charm.config_server.get_all_sharding_relations(): + for unit in sharding_relation.units: + unit_data = sharding_relation.data[unit] + status_ready_for_upgrade = json.loads( + unit_data.get(Config.Status.STATUS_READY_FOR_UPGRADE, None) + ) + if not status_ready_for_upgrade: + return False + + return True + + def share_status_to_config_server(self): + """Shares this shards status info to the config server.""" + if not self.charm.is_role(Config.Role.SHARD): + return + + if not (config_relation := self.charm.shard.get_config_server_relation()): + return + + config_relation.data[self.charm.unit][Config.Status.STATUS_READY_FOR_UPGRADE] = json.dumps( + self.is_unit_status_ready_for_upgrade() + ) + + def is_unit_status_ready_for_upgrade(self) -> bool: + """Returns True if the status of the current unit reflects that it is ready for upgrade.""" + current_status = self.charm.unit.status + status_message = current_status.message + if isinstance(current_status, ActiveStatus): + return True + + if not isinstance(current_status, WaitingStatus): + return False + + if status_message and "is not up-to date with config-server" in status_message: + return True + + return False + + def process_statuses(self) -> StatusBase: + """Retrieves statuses from processes inside charm and returns the highest priority status. + + When a non-fatal error occurs while processing statuses, the error is processed and + returned as a statuses. + + TODO: add more status handling here for other cases: i.e. TLS, or resetting a status that + should not be reset + """ + # retrieve statuses of different services running on Charmed MongoDB + deployment_mode = ( + "replica set" if self.charm.is_role(Config.Role.REPLICATION) else "cluster" + ) + waiting_status = None + try: + statuses = self.get_statuses() + except OperationFailure as e: + if e.code in [UNAUTHORISED_CODE, AUTH_FAILED_CODE]: + waiting_status = f"Waiting to sync passwords across the {deployment_mode}" + elif e.code == TLS_CANNOT_FIND_PRIMARY: + waiting_status = ( + f"Waiting to sync internal membership across the {deployment_mode}" + ) + else: + raise + except ServerSelectionTimeoutError: + waiting_status = f"Waiting to sync internal membership across the {deployment_mode}" + + if waiting_status: + return WaitingStatus(waiting_status) + + return self.prioritize_statuses(statuses) + + def get_statuses(self) -> Tuple: + """Retrieves statuses for the different processes running inside the unit.""" + mongodb_status = build_unit_status( + self.charm.mongodb_config, self.charm.unit_host(self.charm.unit) + ) + shard_status = self.charm.shard.get_shard_status() + config_server_status = self.charm.config_server.get_config_server_status() + pbm_status = self.charm.backups.get_pbm_status() + return (mongodb_status, shard_status, config_server_status, pbm_status) + + def prioritize_statuses(self, statuses: Tuple) -> StatusBase: + """Returns the status with the highest priority from backups, sharding, and mongod.""" + mongodb_status, shard_status, config_server_status, pbm_status = statuses + # failure in mongodb takes precedence over sharding and config server + if not isinstance(mongodb_status, ActiveStatus): + return mongodb_status + + if shard_status and not isinstance(shard_status, ActiveStatus): + return shard_status + + if config_server_status and not isinstance(config_server_status, ActiveStatus): + return config_server_status + + if pbm_status and not isinstance(pbm_status, ActiveStatus): + return pbm_status + + # if all statuses are active report mongodb status over sharding status + return mongodb_status + + +def build_unit_status(mongodb_config: MongoDBConfiguration, unit_host: str) -> StatusBase: + """Generates the status of a unit based on its status reported by mongod.""" + try: + with MongoDBConnection(mongodb_config) as mongo: + replset_status = mongo.get_replset_status() + + if unit_host not in replset_status: + return WaitingStatus("Member being added..") + + replica_status = replset_status[unit_host] + + match replica_status: + case "PRIMARY": + return ActiveStatus("Primary") + case "SECONDARY": + return ActiveStatus("") + case "STARTUP" | "STARTUP2" | "ROLLBACK" | "RECOVERING": + return WaitingStatus("Member is syncing...") + case "REMOVED": + return WaitingStatus("Member is removing...") + case _: + return BlockedStatus(replica_status) + except ServerSelectionTimeoutError as e: + # ServerSelectionTimeoutError is commonly due to ReplicaSetNoPrimary + logger.debug("Got error: %s, while checking replica set status", str(e)) + return WaitingStatus("Waiting for primary re-election..") + except AutoReconnect as e: + # AutoReconnect is raised when a connection to the database is lost and an attempt to + # auto-reconnect will be made by pymongo. + logger.debug("Got error: %s, while checking replica set status", str(e)) + return WaitingStatus("Waiting to reconnect to unit..") + + # END: Helpers diff --git a/lib/charms/mongodb/v1/mongodb.py b/lib/charms/mongodb/v1/mongodb.py new file mode 100644 index 00000000..8c80cd31 --- /dev/null +++ b/lib/charms/mongodb/v1/mongodb.py @@ -0,0 +1,514 @@ +"""Code for interactions with MongoDB.""" + +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +import logging +import re +from dataclasses import dataclass +from typing import Dict, List, Optional, Set +from urllib.parse import quote_plus + +from bson.json_util import dumps +from pymongo import MongoClient +from pymongo.errors import OperationFailure, PyMongoError +from tenacity import ( + RetryError, + Retrying, + before_log, + retry, + stop_after_attempt, + stop_after_delay, + wait_fixed, +) + +from config import Config + +# The unique Charmhub library identifier, never change it +LIBID = "49c69d9977574dd7942eb7b54f43355b" + +# Increment this major API version when introducing breaking changes +LIBAPI = 1 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 1 + +# path to store mongodb ketFile +logger = logging.getLogger(__name__) + + +class FailedToMovePrimaryError(Exception): + """Raised when attempt to move a primary fails.""" + + +@dataclass +class MongoDBConfiguration: + """Class for MongoDB configuration. + + — replset: name of replica set, needed for connection URI. + — database: database name. + — username: username. + — password: password. + — hosts: full list of hosts to connect to, needed for the URI. + - tls_external: indicator for use of internal TLS connection. + - tls_internal: indicator for use of external TLS connection. + """ + + replset: str + database: Optional[str] + username: str + password: str + hosts: Set[str] + roles: Set[str] + tls_external: bool + tls_internal: bool + standalone: bool = False + + @property + def uri(self): + """Return URI concatenated from fields.""" + hosts = ",".join(self.hosts) + # Auth DB should be specified while user connects to application DB. + auth_source = "" + if self.database != "admin": + auth_source = "&authSource=admin" + + if self.standalone: + return ( + f"mongodb://{quote_plus(self.username)}:" + f"{quote_plus(self.password)}@" + f"localhost:{Config.MONGODB_PORT}/?authSource=admin" + ) + + return ( + f"mongodb://{quote_plus(self.username)}:" + f"{quote_plus(self.password)}@" + f"{hosts}/{quote_plus(self.database)}?" + f"replicaSet={quote_plus(self.replset)}" + f"{auth_source}" + ) + + +class NotReadyError(PyMongoError): + """Raised when not all replica set members healthy or finished initial sync.""" + + +class MongoDBConnection: + """In this class we create connection object to MongoDB. + + Real connection is created on the first call to MongoDB. + Delayed connectivity allows to firstly check database readiness + and reuse the same connection for an actual query later in the code. + + Connection is automatically closed when object destroyed. + Automatic close allows to have more clean code. + + Note that connection when used may lead to the following pymongo errors: ConfigurationError, + ConfigurationError, OperationFailure. It is suggested that the following pattern be adopted + when using MongoDBConnection: + + with MongoDBConnection(self._mongodb_config) as mongo: + try: + mongo. + except ConfigurationError, ConfigurationError, OperationFailure: + + """ + + def __init__(self, config: MongoDBConfiguration, uri=None, direct=False): + """A MongoDB client interface. + + Args: + config: MongoDB Configuration object. + uri: allow using custom MongoDB URI, needed for replSet init. + direct: force a direct connection to a specific host, avoiding + reading replica set configuration and reconnection. + """ + self.mongodb_config = config + + if uri is None: + uri = config.uri + + self.client = MongoClient( + uri, + directConnection=direct, + connect=False, + serverSelectionTimeoutMS=1000, + connectTimeoutMS=2000, + ) + return + + def __enter__(self): + """Return a reference to the new connection.""" + return self + + def __exit__(self, object_type, value, traceback): + """Disconnect from MongoDB client.""" + self.client.close() + self.client = None + + @property + def is_ready(self) -> bool: + """Is the MongoDB server ready for services requests. + + Returns: + True if services is ready False otherwise. Retries over a period of 60 seconds times to + allow server time to start up. + + Raises: + ConfigurationError, ConfigurationError, OperationFailure + """ + try: + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + # The ping command is cheap and does not require auth. + self.client.admin.command("ping") + except RetryError: + return False + + return True + + @retry( + stop=stop_after_attempt(3), + wait=wait_fixed(5), + reraise=True, + before=before_log(logger, logging.DEBUG), + ) + def init_replset(self) -> None: + """Create replica set config the first time. + + Raises: + ConfigurationError, ConfigurationError, OperationFailure + """ + config = { + "_id": self.mongodb_config.replset, + "members": [{"_id": i, "host": h} for i, h in enumerate(self.mongodb_config.hosts)], + } + try: + self.client.admin.command("replSetInitiate", config) + except OperationFailure as e: + if e.code not in (13, 23): # Unauthorized, AlreadyInitialized + # Unauthorized error can be raised only if initial user were + # created the step after this. + # AlreadyInitialized error can be raised only if this step + # finished. + logger.error("Cannot initialize replica set. error=%r", e) + raise e + + def get_replset_status(self) -> Dict: + """Get a replica set status as a dict. + + Returns: + A set of the replica set members along with their status. + + Raises: + ConfigurationError, ConfigurationError, OperationFailure + """ + rs_status = self.client.admin.command("replSetGetStatus") + rs_status_parsed = {} + for member in rs_status["members"]: + member_name = self._hostname_from_hostport(member["name"]) + rs_status_parsed[member_name] = member["stateStr"] + + return rs_status_parsed + + def get_replset_members(self) -> Set[str]: + """Get a replica set members. + + Returns: + A set of the replica set members as reported by mongod. + + Raises: + ConfigurationError, ConfigurationError, OperationFailure + """ + rs_status = self.client.admin.command("replSetGetStatus") + curr_members = [ + self._hostname_from_hostport(member["name"]) for member in rs_status["members"] + ] + return set(curr_members) + + def add_replset_member(self, hostname: str) -> None: + """Add a new member to replica set config inside MongoDB. + + Raises: + ConfigurationError, ConfigurationError, OperationFailure, NotReadyError + """ + rs_config = self.client.admin.command("replSetGetConfig") + rs_status = self.client.admin.command("replSetGetStatus") + + # When we add a new member, MongoDB transfer data from existing member to new. + # Such operation reduce performance of the cluster. To avoid huge performance + # degradation, before adding new members, it is needed to check that all other + # members finished init sync. + if self.is_any_sync(rs_status): + # it can take a while, we should defer + raise NotReadyError + + # Avoid reusing IDs, according to the doc + # https://www.mongodb.com/docs/manual/reference/replica-configuration/ + max_id = max([int(member["_id"]) for member in rs_config["config"]["members"]]) + new_member = {"_id": int(max_id + 1), "host": hostname} + + rs_config["config"]["version"] += 1 + rs_config["config"]["members"].extend([new_member]) + logger.debug("rs_config: %r", rs_config["config"]) + self.client.admin.command("replSetReconfig", rs_config["config"]) + + @retry( + stop=stop_after_attempt(20), + wait=wait_fixed(3), + reraise=True, + before=before_log(logger, logging.DEBUG), + ) + def remove_replset_member(self, hostname: str) -> None: + """Remove member from replica set config inside MongoDB. + + Raises: + ConfigurationError, ConfigurationError, OperationFailure, NotReadyError + """ + rs_config = self.client.admin.command("replSetGetConfig") + rs_status = self.client.admin.command("replSetGetStatus") + + # When we remove member, to avoid issues when majority members is removed, we need to + # remove next member only when MongoDB forget the previous removed member. + if self._is_any_removing(rs_status): + # removing from replicaset is fast operation, lets @retry(3 times with a 5sec timeout) + # before giving up. + raise NotReadyError + + # avoid downtime we need to reelect new primary if removable member is the primary. + logger.debug("primary: %r", self._is_primary(rs_status, hostname)) + if self._is_primary(rs_status, hostname): + self.client.admin.command("replSetStepDown", {"stepDownSecs": "60"}) + + rs_config["config"]["version"] += 1 + rs_config["config"]["members"][:] = [ + member + for member in rs_config["config"]["members"] + if hostname != self._hostname_from_hostport(member["host"]) + ] + logger.debug("rs_config: %r", dumps(rs_config["config"])) + self.client.admin.command("replSetReconfig", rs_config["config"]) + + def step_down_primary(self) -> None: + """Steps down the current primary, forcing a re-election.""" + self.client.admin.command("replSetStepDown", {"stepDownSecs": "60"}) + + def move_primary(self, new_primary_ip: str) -> None: + """Forcibly moves the primary to the new primary provided. + + Args: + new_primary_ip: ip address of the unit chosen to be the new primary. + """ + # Do not move a priary unless the cluster is in sync + rs_status = self.client.admin.command("replSetGetStatus") + if self.is_any_sync(rs_status): + # it can take a while, we should defer + raise NotReadyError + + is_move_successful = True + self.set_replicaset_election_priority(priority=0.5, ignore_member=new_primary_ip) + try: + for attempt in Retrying(stop=stop_after_delay(180), wait=wait_fixed(3)): + with attempt: + self.step_down_primary() + if self.primary() != new_primary_ip: + raise FailedToMovePrimaryError + except RetryError: + # catch all possible exceptions when failing to step down primary. We do this in order + # to ensure that we reset the replica set election priority. + is_move_successful = False + + # reset all replicas to the same priority + self.set_replicaset_election_priority(priority=1) + + if not is_move_successful: + raise FailedToMovePrimaryError + + def set_replicaset_election_priority(self, priority: int, ignore_member: str = None) -> None: + """Set the election priority for the entire replica set.""" + rs_config = self.client.admin.command("replSetGetConfig") + rs_config = rs_config["config"] + rs_config["version"] += 1 + + # keep track of the original configuration before setting the priority, reconfiguring the + # replica set can result in primary re-election, which would would like to avoid when + # possible. + original_rs_config = rs_config + + for member in rs_config["members"]: + if member["host"] == ignore_member: + continue + + member["priority"] = priority + + if original_rs_config == rs_config: + return + + logger.debug("rs_config: %r", rs_config) + self.client.admin.command("replSetReconfig", rs_config) + + def create_user(self, config: MongoDBConfiguration): + """Create user. + + Grant read and write privileges for specified database. + """ + self.client.admin.command( + "createUser", + config.username, + pwd=config.password, + roles=self._get_roles(config), + mechanisms=["SCRAM-SHA-256"], + ) + + def update_user(self, config: MongoDBConfiguration): + """Update grants on database.""" + self.client.admin.command( + "updateUser", + config.username, + roles=self._get_roles(config), + ) + + def set_user_password(self, username, password: str): + """Update the password.""" + self.client.admin.command( + "updateUser", + username, + pwd=password, + ) + + def create_role(self, role_name: str, privileges: dict, roles: dict = []): + """Creates a new role. + + Args: + role_name: name of the role to be added. + privileges: privileges to be associated with the role. + roles: List of roles from which this role inherits privileges. + """ + try: + self.client.admin.command( + "createRole", role_name, privileges=[privileges], roles=roles + ) + except OperationFailure as e: + if not e.code == 51002: # Role already exists + logger.error("Cannot add role. error=%r", e) + raise e + + @staticmethod + def _get_roles(config: MongoDBConfiguration) -> List[dict]: + """Generate roles List.""" + supported_roles = { + "admin": [ + {"role": "userAdminAnyDatabase", "db": "admin"}, + {"role": "readWriteAnyDatabase", "db": "admin"}, + {"role": "userAdmin", "db": "admin"}, + ], + "monitor": [ + {"role": "explainRole", "db": "admin"}, + {"role": "clusterMonitor", "db": "admin"}, + {"role": "read", "db": "local"}, + ], + "backup": [ + {"db": "admin", "role": "readWrite", "collection": ""}, + {"db": "admin", "role": "backup"}, + {"db": "admin", "role": "clusterMonitor"}, + {"db": "admin", "role": "restore"}, + {"db": "admin", "role": "pbmAnyAction"}, + ], + "default": [ + {"role": "readWrite", "db": config.database}, + ], + } + return [role_dict for role in config.roles for role_dict in supported_roles[role]] + + def drop_user(self, username: str): + """Drop user.""" + self.client.admin.command("dropUser", username) + + def get_users(self) -> Set[str]: + """Add a new member to replica set config inside MongoDB.""" + users_info = self.client.admin.command("usersInfo") + return set( + [ + user_obj["user"] + for user_obj in users_info["users"] + if re.match(r"^relation-\d+$", user_obj["user"]) + ] + ) + + def get_databases(self) -> Set[str]: + """Return list of all non-default databases.""" + system_dbs = ("admin", "local", "config") + databases = self.client.list_database_names() + return set([db for db in databases if db not in system_dbs]) + + def drop_database(self, database: str): + """Drop a non-default database.""" + system_dbs = ("admin", "local", "config") + if database in system_dbs: + return + self.client.drop_database(database) + + def _is_primary(self, rs_status: Dict, hostname: str) -> bool: + """Returns True if passed host is the replica set primary. + + Args: + hostname: host of interest. + rs_status: current state of replica set as reported by mongod. + """ + return any( + hostname == self._hostname_from_hostport(member["name"]) + and member["stateStr"] == "PRIMARY" + for member in rs_status["members"] + ) + + def primary(self) -> str: + """Returns primary replica host.""" + status = self.client.admin.command("replSetGetStatus") + + primary = None + # loop through all members in the replica set + for member in status["members"]: + # check replica's current state + if member["stateStr"] == "PRIMARY": + primary = self._hostname_from_hostport(member["name"]) + + return primary + + @staticmethod + def is_any_sync(rs_status: Dict) -> bool: + """Returns true if any replica set members are syncing data. + + Checks if any members in replica set are syncing data. Note it is recommended to run only + one sync in the cluster to not have huge performance degradation. + + Args: + rs_status: current state of replica set as reported by mongod. + """ + return any( + member["stateStr"] in ["STARTUP", "STARTUP2", "ROLLBACK", "RECOVERING"] + for member in rs_status["members"] + ) + + @staticmethod + def _is_any_removing(rs_status: Dict) -> bool: + """Returns true if any replica set members are removing now. + + Checks if any members in replica set are getting removed. It is recommended to run only one + removal in the cluster at a time as to not have huge performance degradation. + + Args: + rs_status: current state of replica set as reported by mongod. + """ + return any(member["stateStr"] == "REMOVED" for member in rs_status["members"]) + + @staticmethod + def _hostname_from_hostport(hostname: str) -> str: + """Return hostname part from MongoDB returned. + + MongoDB typically returns a value that contains both, hostname and port. + e.g. input: mongodb-1:27015 + Return hostname without changes if the port is not passed. + e.g. input: mongodb-1 + """ + return hostname.split(":")[0] diff --git a/lib/charms/mongodb/v1/mongos.py b/lib/charms/mongodb/v1/mongos.py new file mode 100644 index 00000000..8d216e07 --- /dev/null +++ b/lib/charms/mongodb/v1/mongos.py @@ -0,0 +1,486 @@ +"""Code for interactions with MongoDB.""" + +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +import logging +from dataclasses import dataclass +from typing import List, Optional, Set, Tuple +from urllib.parse import quote_plus + +from charms.mongodb.v1.mongodb import NotReadyError +from pymongo import MongoClient, collection +from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed + +from config import Config + +# The unique Charmhub library identifier, never change it +LIBID = "e20d5b19670d4c55a4934a21d3f3b29a" + +# Increment this major API version when introducing breaking changes +LIBAPI = 1 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 5 + +# path to store mongodb ketFile +logger = logging.getLogger(__name__) + +SHARD_AWARE_STATE = 1 + + +@dataclass +class MongosConfiguration: + """Class for mongos configuration. + + — database: database name. + — username: username. + — password: password. + — hosts: full list of hosts to connect to, needed for the URI. + - port: integer for the port to connect to connect to mongodb. + - tls_external: indicator for use of internal TLS connection. + - tls_internal: indicator for use of external TLS connection. + """ + + database: Optional[str] + username: str + password: str + hosts: Set[str] + port: int + roles: Set[str] + tls_external: bool + tls_internal: bool + + @property + def uri(self): + """Return URI concatenated from fields.""" + self.complete_hosts = self.hosts + + # mongos using Unix Domain Socket to communicate do not use port + if self.port: + self.complete_hosts = [f"{host}:{self.port}" for host in self.hosts] + + complete_hosts = ",".join(self.complete_hosts) + + # Auth DB should be specified while user connects to application DB. + auth_source = "" + if self.database != "admin": + auth_source = "authSource=admin" + return ( + f"mongodb://{quote_plus(self.username)}:" + f"{quote_plus(self.password)}@" + f"{complete_hosts}/{quote_plus(self.database)}?" + f"{auth_source}" + ) + + +class NotEnoughSpaceError(Exception): + """Raised when there isn't enough space to movePrimary.""" + + +class ShardNotInClusterError(Exception): + """Raised when shard is not present in cluster, but it is expected to be.""" + + +class ShardNotPlannedForRemovalError(Exception): + """Raised when it is expected that a shard is planned for removal, but it is not.""" + + +class NotDrainedError(Exception): + """Raised when a shard is still being drained.""" + + +class BalancerNotEnabledError(Exception): + """Raised when balancer process is not enabled.""" + + +class MongosConnection: + """In this class we create connection object to Mongos. + + Real connection is created on the first call to Mongos. + Delayed connectivity allows to firstly check database readiness + and reuse the same connection for an actual query later in the code. + + Connection is automatically closed when object destroyed. + Automatic close allows to have more clean code. + + Note that connection when used may lead to the following pymongo errors: ConfigurationError, + ConfigurationError, OperationFailure. It is suggested that the following pattern be adopted + when using MongoDBConnection: + + with MongoMongos(self._mongos_config) as mongo: + try: + mongo. + except ConfigurationError, OperationFailure: + + """ + + def __init__(self, config: MongosConfiguration, uri=None, direct=False): + """A MongoDB client interface. + + Args: + config: MongoDB Configuration object. + uri: allow using custom MongoDB URI, needed for replSet init. + direct: force a direct connection to a specific host, avoiding + reading replica set configuration and reconnection. + """ + if uri is None: + uri = config.uri + + self.client = MongoClient( + uri, + directConnection=direct, + connect=False, + serverSelectionTimeoutMS=1000, + connectTimeoutMS=2000, + ) + return + + def __enter__(self): + """Return a reference to the new connection.""" + return self + + def __exit__(self, object_type, value, traceback): + """Disconnect from MongoDB client.""" + self.client.close() + self.client = None + + def get_shard_members(self) -> Set[str]: + """Gets shard members. + + Returns: + A set of the shard members as reported by mongos. + + Raises: + ConfigurationError, OperationFailure + """ + shard_list = self.client.admin.command("listShards") + curr_members = [ + self._hostname_from_hostport(member["host"]) for member in shard_list["shards"] + ] + return set(curr_members) + + def add_shard(self, shard_name, shard_hosts, shard_port=Config.MONGODB_PORT): + """Adds shard to the cluster. + + Raises: + ConfigurationError, OperationFailure + """ + shard_hosts = [f"{host}:{shard_port}" for host in shard_hosts] + shard_hosts = ",".join(shard_hosts) + shard_url = f"{shard_name}/{shard_hosts}" + if shard_name in self.get_shard_members(): + logger.info("Skipping adding shard %s, shard is already in cluster", shard_name) + return + + logger.info("Adding shard %s", shard_name) + self.client.admin.command("addShard", shard_url) + + def pre_remove_checks(self, shard_name): + """Performs a series of checks for removing a shard from the cluster. + + Raises + ConfigurationError, OperationFailure, NotReadyError, ShardNotInClusterError, + BalencerNotEnabledError + """ + if shard_name not in self.get_shard_members(): + logger.info("Shard to remove is not in cluster.") + raise ShardNotInClusterError(f"Shard {shard_name} could not be removed") + + # It is necessary to call removeShard multiple times on a shard to guarantee removal. + # Allow re-removal of shards that are currently draining. + if self.is_any_draining(ignore_shard=shard_name): + cannot_remove_shard = ( + f"cannot remove shard {shard_name} from cluster, another shard is draining" + ) + logger.error(cannot_remove_shard) + raise NotReadyError(cannot_remove_shard) + + # check if enabled sh.getBalancerState() + balancer_state = self.client.admin.command("balancerStatus") + if balancer_state["mode"] != "off": + logger.info("Balancer is enabled, ready to remove shard.") + return + + # starting the balancer doesn't guarantee that is is running, wait until it starts up. + logger.info("Balancer process is not running, enabling it.") + self.start_and_wait_for_balancer() + + def start_and_wait_for_balancer(self) -> None: + """Turns on the balancer and waits for it to be running. + + Starting the balancer doesn't guarantee that is is running, wait until it starts up. + + Raises: + BalancerNotEnabledError + """ + self.client.admin.command("balancerStart") + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3), reraise=True): + with attempt: + balancer_state = self.client.admin.command("balancerStatus") + if balancer_state["mode"] == "off": + raise BalancerNotEnabledError("balancer is not enabled.") + + def remove_shard(self, shard_name: str) -> None: + """Removes shard from the cluster. + + Raises: + ConfigurationError, OperationFailure, NotReadyError, NotEnoughSpaceError, + ShardNotInClusterError, BalencerNotEnabledError + """ + self.pre_remove_checks(shard_name) + + # remove shard, process removal status, & check if fully removed + logger.info("Attempting to remove shard %s", shard_name) + removal_info = self.client.admin.command("removeShard", shard_name) + self._log_removal_info(removal_info, shard_name) + remaining_chunks = self._retrieve_remaining_chunks(removal_info) + if remaining_chunks: + logger.info("Waiting for all chunks to be drained from %s.", shard_name) + raise NotDrainedError() + + # MongoDB docs says to movePrimary only after all chunks have been drained from the shard. + logger.info("All chunks drained from shard: %s", shard_name) + databases_using_shard_as_primary = self.get_databases_for_shard(shard_name) + if databases_using_shard_as_primary: + logger.info( + "These databases: %s use Shard %s is a primary shard, moving primary.", + ", ".join(databases_using_shard_as_primary), + shard_name, + ) + self._move_primary(databases_using_shard_as_primary, old_primary=shard_name) + + # MongoDB docs says to re-run removeShard after running movePrimary + logger.info("removing shard: %s, after moving primary", shard_name) + removal_info = self.client.admin.command("removeShard", shard_name) + self._log_removal_info(removal_info, shard_name) + + if shard_name in self.get_shard_members(): + logger.info("Shard %s is still present in sharded cluster.", shard_name) + raise NotDrainedError() + + def _is_shard_draining(self, shard_name: str) -> bool: + """Reports if a given shard is currently in the draining state. + + Raises: + ConfigurationError, OperationFailure, ShardNotInClusterError, + ShardNotPlannedForRemovalError + """ + sc_status = self.client.admin.command("listShards") + for shard in sc_status["shards"]: + if shard["_id"] == shard_name: + if "draining" not in shard: + raise ShardNotPlannedForRemovalError( + f"Shard {shard_name} has not been marked for removal", + ) + return shard["draining"] + + raise ShardNotInClusterError( + f"Shard {shard_name} not in cluster, could not retrieve draining status" + ) + + def get_databases_for_shard(self, primary_shard) -> Optional[List[str]]: + """Returns a list of databases using the given shard as a primary shard. + + In Sharded MongoDB clusters, mongos selects the primary shard when creating a new database + by picking the shard in the cluster that has the least amount of data. This means that: + 1. There can be multiple primary shards in a cluster. + 2. Until there is data written to the cluster there is effectively no primary shard. + """ + databases_collection = self._get_databases_collection() + if databases_collection is None: + return + + return databases_collection.distinct("_id", {"primary": primary_shard}) + + def _get_databases_collection(self) -> collection.Collection: + """Returns the databases collection if present. + + The collection `databases` only gets created once data is written to the sharded cluster. + """ + config_db = self.client["config"] + if "databases" not in config_db.list_collection_names(): + logger.info("No data written to sharded cluster yet.") + return None + + return config_db["databases"] + + def is_any_draining(self, ignore_shard: str = "") -> bool: + """Returns true if any shard members is draining. + + Checks if any members in sharded cluster are draining data. + + Args: + sc_status: current state of shard cluster status as reported by mongos. + ignore_shard: shard to ignore + """ + sc_status = self.client.admin.command("listShards") + return any( + # check draining status of all shards except the one to be ignored. + shard.get("draining", False) if shard["_id"] != ignore_shard else False + for shard in sc_status["shards"] + ) + + @staticmethod + def _hostname_from_hostport(hostname: str) -> str: + """Return hostname part from MongoDB returned. + + mongos typically returns a value that contains both, hostname, hosts, and ports. + e.g. input: shard03/host7:27018,host8:27018,host9:27018 + Return shard name + e.g. output: shard03 + """ + return hostname.split("/")[0] + + def _log_removal_info(self, removal_info, shard_name): + """Logs removal information for a shard removal.""" + remaining_chunks = self._retrieve_remaining_chunks(removal_info) + dbs_to_move = ( + removal_info["dbsToMove"] + if "dbsToMove" in removal_info and removal_info["dbsToMove"] != [] + else ["None"] + ) + logger.info( + "Shard %s is draining status is: %s. Remaining chunks: %s. DBs to move: %s.", + shard_name, + removal_info["state"], + str(remaining_chunks), + ",".join(dbs_to_move), + ) + + @property + def is_ready(self) -> bool: + """Is mongos ready for services requests. + + Returns: + True if services is ready False otherwise. Retries over a period of 60 seconds times to + allow server time to start up. + + Raises: + ConfigurationError, ConfigurationError, OperationFailure + """ + try: + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + # The ping command is cheap and does not require auth. + self.client.admin.command("ping") + except RetryError: + return False + + return True + + def are_all_shards_aware(self) -> bool: + """Returns True if all shards are shard aware.""" + sc_status = self.client.admin.command("listShards") + for shard in sc_status["shards"]: + if shard["state"] != SHARD_AWARE_STATE: + return False + + return True + + def is_shard_aware(self, shard_name: str) -> bool: + """Returns True if provided shard is shard aware.""" + sc_status = self.client.admin.command("listShards") + for shard in sc_status["shards"]: + if shard["_id"] == shard_name: + return shard["state"] == SHARD_AWARE_STATE + + return False + + def _retrieve_remaining_chunks(self, removal_info) -> int: + """Parses the remaining chunks to remove from removeShard command.""" + # when chunks have finished draining, remaining chunks is still in the removal info, but + # marked as 0. If "remaining" is not present, in removal_info then the shard is not yet + # draining + if "remaining" not in removal_info: + raise NotDrainedError() + + return removal_info["remaining"]["chunks"] if "remaining" in removal_info else 0 + + def _move_primary(self, databases_to_move: List[str], old_primary: str) -> None: + """Moves all the provided databases to a new primary. + + Raises: + NotEnoughSpaceError, ConfigurationError, OperationFailure + """ + for database_name in databases_to_move: + db_size = self.get_db_size(database_name, old_primary) + new_shard, avail_space = self.get_shard_with_most_available_space( + shard_to_ignore=old_primary + ) + if db_size > avail_space: + no_space_on_new_primary = ( + f"Cannot move primary for database: {database_name}, new shard: {new_shard}", + f"does not have enough space. {db_size} > {avail_space}", + ) + logger.error(no_space_on_new_primary) + raise NotEnoughSpaceError(no_space_on_new_primary) + + # From MongoDB Docs: After starting movePrimary, do not perform any read or write + # operations against any unsharded collection in that database until the command + # completes. + logger.info( + "Moving primary on %s database to new primary: %s. Do NOT write to %s database.", + database_name, + new_shard, + database_name, + ) + # This command does not return until MongoDB completes moving all data. This can take + # a long time. + self.client.admin.command("movePrimary", database_name, to=new_shard) + logger.info( + "Successfully moved primary on %s database to new primary: %s", + database_name, + new_shard, + ) + + def get_db_size(self, database_name, primary_shard) -> int: + """Returns the size of a DB on a given shard in bytes.""" + database = self.client[database_name] + db_stats = database.command("dbStats") + + # sharded databases are spread across multiple shards, find the amount of storage used on + # the primary shard + for shard_name, shard_storage_info in db_stats["raw"].items(): + # shard names are of the format `shard-one/10.61.64.212:27017` + shard_name = shard_name.split("/")[0] + if shard_name != primary_shard: + continue + + return shard_storage_info["storageSize"] + + return 0 + + def get_shard_with_most_available_space(self, shard_to_ignore) -> Tuple[str, int]: + """Returns the shard in the cluster with the most available space and the space in bytes. + + Algorithm used was similar to that used in mongo in `selectShardForNewDatabase`: + https://github.com/mongodb/mongo/blob/6/0/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp#L68-L91 + """ + candidate_shard = None + candidate_free_space = -1 + available_storage = self.client.admin.command("dbStats", freeStorage=1) + + for shard_name, shard_storage_info in available_storage["raw"].items(): + # shard names are of the format `shard-one/10.61.64.212:27017` + shard_name = shard_name.split("/")[0] + if shard_name == shard_to_ignore: + continue + + current_free_space = shard_storage_info["freeStorageSize"] + if current_free_space > candidate_free_space: + candidate_shard = shard_name + candidate_free_space = current_free_space + + return (candidate_shard, candidate_free_space) + + def get_draining_shards(self) -> List[str]: + """Returns a list of the shards currently draining.""" + sc_status = self.client.admin.command("listShards") + draining_shards = [] + for shard in sc_status["shards"]: + if shard.get("draining", False): + draining_shards.append(shard["_id"]) + + return draining_shards diff --git a/lib/charms/mongodb/v1/users.py b/lib/charms/mongodb/v1/users.py new file mode 100644 index 00000000..2b5a059a --- /dev/null +++ b/lib/charms/mongodb/v1/users.py @@ -0,0 +1,118 @@ +"""Users configuration for MongoDB.""" + +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. +from typing import Set + +# The unique Charmhub library identifier, never change it +LIBID = "b74007eda21c453a89e4dcc6382aa2b3" + +# Increment this major API version when introducing breaking changes +LIBAPI = 1 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 1 + + +class MongoDBUser: + """Base class for MongoDB users.""" + + _username = "" + _password_key_name = "" + _database_name = "" + _roles = [] + _privileges = {} + _mongodb_role = "" + _hosts = [] + + def get_username(self) -> str: + """Returns the username of the user.""" + return self._username + + def get_password_key_name(self) -> str: + """Returns the key name for the password of the user.""" + return self._password_key_name + + def get_database_name(self) -> str: + """Returns the database of the user.""" + return self._database_name + + def get_roles(self) -> Set[str]: + """Returns the role of the user.""" + return self._roles + + def get_mongodb_role(self) -> str: + """Returns the MongoDB role of the user.""" + return self._mongodb_role + + def get_privileges(self) -> dict: + """Returns the privileges of the user.""" + return self._privileges + + def get_hosts(self) -> list: + """Returns the hosts of the user.""" + return self._hosts + + @staticmethod + def get_password_key_name_for_user(username: str) -> str: + """Returns the key name for the password of the user.""" + if username == OperatorUser.get_username(): + return OperatorUser.get_password_key_name() + elif username == MonitorUser.get_username(): + return MonitorUser.get_password_key_name() + elif username == BackupUser.get_username(): + return BackupUser.get_password_key_name() + else: + raise ValueError(f"Unknown user: {username}") + + +class _OperatorUser(MongoDBUser): + """Operator user for MongoDB.""" + + _username = "operator" + _password_key_name = f"{_username}-password" + _database_name = "admin" + _roles = ["default"] + _hosts = [] + + +class _MonitorUser(MongoDBUser): + """Monitor user for MongoDB.""" + + _username = "monitor" + _password_key_name = f"{_username}-password" + _database_name = "admin" + _roles = ["monitor"] + _privileges = { + "resource": {"db": "", "collection": ""}, + "actions": ["listIndexes", "listCollections", "dbStats", "dbHash", "collStats", "find"], + } + _mongodb_role = "explainRole" + _hosts = [ + "127.0.0.1" + ] # MongoDB Exporter can only connect to one replica - not the entire set. + + +class _BackupUser(MongoDBUser): + """Backup user for MongoDB.""" + + _username = "backup" + _password_key_name = f"{_username}-password" + _database_name = "" + _roles = ["backup"] + _mongodb_role = "pbmAnyAction" + _privileges = {"resource": {"anyResource": True}, "actions": ["anyAction"]} + _hosts = ["127.0.0.1"] # pbm cannot make a direct connection if multiple hosts are used + + +OperatorUser = _OperatorUser() +MonitorUser = _MonitorUser() +BackupUser = _BackupUser() + +# List of system usernames needed for correct work on the charm. +CHARM_USERS = [ + OperatorUser.get_username(), + BackupUser.get_username(), + MonitorUser.get_username(), +] diff --git a/metadata.yaml b/metadata.yaml index fb553f64..b2673797 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -30,21 +30,11 @@ requires: limit: 1 containers: - mongod: + mongos: resource: mongodb-image - mounts: - - storage: mongodb - location: /var/lib/mongodb resources: mongodb-image: type: oci-image description: OCI image for mongodb # TODO: Update sha whenever upstream rock changes upstream-source: ghcr.io/canonical/charmed-mongodb:6.0.6-22.04_edge@sha256:b4b3edb805b20de471da57802643bfadbf979f112d738bc540ab148d145ddcfe -storage: - mongodb: - type: filesystem - location: /var/lib/mongodb - mongodb-logs: - type: filesystem - location: /var/log/mongodb diff --git a/src/charm.py b/src/charm.py index 373b70c3..6e9b35dd 100755 --- a/src/charm.py +++ b/src/charm.py @@ -1,19 +1,499 @@ #!/usr/bin/env python3 -"""Charm code for mongos service on Kubernetes.""" +"""Charm code for `mongos` daemon.""" # Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. -from ops.charm import CharmBase -from ops.main import main +import json +from exceptions import MissingSecretError -class MongosCharm(CharmBase): - """A Juju Charm to deploy mongos on Kubernetes.""" +from ops.pebble import PathError, ProtocolError + + +from typing import Set, Optional, Dict + + +from charms.mongos.v0.set_status import MongosStatusHandler + +from charms.mongodb.v0.mongodb_secrets import SecretCache +from charms.mongodb.v0.mongodb_secrets import generate_secret_label +from charms.mongodb.v1.mongos import MongosConfiguration, MongosConnection +from charms.mongodb.v1.users import ( + MongoDBUser, +) + +from config import Config + +import ops +from ops.model import BlockedStatus, Container, Relation, ActiveStatus, Unit +from ops.charm import StartEvent, RelationDepartedEvent + +import logging + + +logger = logging.getLogger(__name__) + +APP_SCOPE = Config.Relations.APP_SCOPE +UNIT_SCOPE = Config.Relations.UNIT_SCOPE +ROOT_USER_GID = 0 +MONGO_USER = "snap_daemon" +ENV_VAR_PATH = "/etc/environment" +MONGOS_VAR = "MONGOS_ARGS" +CONFIG_ARG = "--configdb" +USER_ROLES_TAG = "extra-user-roles" +DATABASE_TAG = "database" +EXTERNAL_CONNECTIVITY_TAG = "external-connectivity" + + +class MissingConfigServerError(Exception): + """Raised when mongos expects to be connected to a config-server but is not.""" + + +class ExtraDataDirError: + """Raised when there is unexpected data in the data directory.""" + + +class MongosCharm(ops.CharmBase): + """Charm the service.""" def __init__(self, *args): - """Listen to juju events and pair them with their associated function.""" super().__init__(*args) + self.framework.observe( + self.on.mongos_pebble_ready, self._on_mongos_pebble_ready + ) + self.framework.observe(self.on.start, self._on_start) + self.framework.observe(self.on.update_status, self._on_update_status) + + self.role = Config.Role.MONGOS + self.secrets = SecretCache(self) + self.status = MongosStatusHandler(self) + + # BEGIN: hook functions + def _on_mongos_pebble_ready(self, event) -> None: + """Configure MongoDB pebble layer specification.""" + if not self.is_integrated_to_config_server(): + logger.info( + "mongos service not starting. Cannot start until application is integrated to a config-server." + ) + return + + # Get a reference the container attribute + container = self.unit.get_container(Config.CONTAINER_NAME) + if not container.can_connect(): + logger.debug("mongos container is not ready yet.") + event.defer() + return + + try: + # mongod needs keyFile and TLS certificates on filesystem + self._push_keyfile_to_workload(container) + self._pull_licenses(container) + self._set_data_dir_permissions(container) + + except (PathError, ProtocolError, MissingSecretError) as e: + logger.error("Cannot initialize workload: %r", e) + event.defer() + return + + # Add initial Pebble config layer using the Pebble API + container.add_layer(Config.CONTAINER_NAME, self._mongos_layer, combine=True) + + # Restart changed services and start startup-enabled services. + container.replan() + + def _on_start(self, event: StartEvent) -> None: + """Handle the start event.""" + # start hooks are fired before relation hooks and `mongos` requires a config-server in + # order to start. Wait to receive config-server info from the relation event before + # starting `mongos` daemon + self.status.set_and_share_status( + BlockedStatus("Missing relation to config-server.") + ) + + def _on_update_status(self, _): + """Handle the update status event""" + if self.unit.status == Config.Status.UNHEALTHY_UPGRADE: + return + + if not self.is_integrated_to_config_server(): + logger.info( + "Missing integration to config-server. mongos cannot run unless connected to config-server." + ) + self.status.set_and_share_status( + BlockedStatus("Missing relation to config-server.") + ) + return + + self.status.set_and_share_status(ActiveStatus()) + + # END: hook functions + + # BEGIN: helper functions + def is_integrated_to_config_server(self) -> bool: + """Returns True if the mongos application is integrated to a config-server.""" + return self.model.relations[Config.Relations.CLUSTER_RELATIONS_NAME] is not None + + def _get_mongos_config_for_user( + self, user: MongoDBUser, hosts: Set[str] + ) -> MongosConfiguration: + return MongosConfiguration( + database=user.get_database_name(), + username=user.get_username(), + password=self.get_secret(APP_SCOPE, user.get_password_key_name()), + hosts=hosts, + port=Config.MONGOS_PORT, + roles=user.get_roles(), + tls_external=None, # Future PR will support TLS + tls_internal=None, # Future PR will support TLS + ) + + def get_secret(self, scope: str, key: str) -> Optional[str]: + """Get secret from the secret storage.""" + label = generate_secret_label(self, scope) + if not (secret := self.secrets.get(label)): + return + + value = secret.get_content().get(key) + if value != Config.Secrets.SECRET_DELETED_LABEL: + return value + + def set_secret(self, scope: str, key: str, value: Optional[str]) -> Optional[str]: + """Set secret in the secret storage. + + Juju versions > 3.0 use `juju secrets`, this function first checks + which secret store is being used before setting the secret. + """ + if not value: + return self.remove_secret(scope, key) + + label = generate_secret_label(self, scope) + secret = self.secrets.get(label) + if not secret: + self.secrets.add(label, {key: value}, scope) + else: + content = secret.get_content() + content.update({key: value}) + secret.set_content(content) + return label + + def remove_secret(self, scope, key) -> None: + """Removing a secret.""" + label = generate_secret_label(self, scope) + secret = self.secrets.get(label) + + if not secret: + return + + content = secret.get_content() + + if not content.get(key) or content[key] == Config.Secrets.SECRET_DELETED_LABEL: + logger.error( + f"Non-existing secret {scope}:{key} was attempted to be removed." + ) + return + + content[key] = Config.Secrets.SECRET_DELETED_LABEL + secret.set_content(content) + + def restart_charm_services(self): + """Restart mongos service.""" + container = self.unit.get_container(Config.CONTAINER_NAME) + container.stop(Config.SERVICE_NAME) + + container.add_layer(Config.CONTAINER_NAME, self._mongod_layer, combine=True) + container.replan() + + self._connect_mongodb_exporter() + self._connect_pbm_agent() + + def set_database(self, database: str) -> None: + """Updates the database requested for the mongos user.""" + self.app_peer_data[DATABASE_TAG] = database + + if len(self.model.relations[Config.Relations.CLUSTER_RELATIONS_NAME]) == 0: + return + + # a mongos shard can only be related to one config server + config_server_rel = self.model.relations[ + Config.Relations.CLUSTER_RELATIONS_NAME + ][0] + self.cluster.database_requires.update_relation_data( + config_server_rel.id, {DATABASE_TAG: database} + ) + + def check_relation_broken_or_scale_down(self, event: RelationDepartedEvent) -> None: + """Checks relation departed event is the result of removed relation or scale down. + + Relation departed and relation broken events occur during scaling down or during relation + removal, only relation departed events have access to metadata to determine which case. + """ + if self.set_scaling_down(event): + logger.info( + "Scaling down the application, no need to process removed relation in broken hook." + ) + + def is_scaling_down(self, rel_id: int) -> bool: + """Returns True if the application is scaling down.""" + rel_departed_key = self._generate_relation_departed_key(rel_id) + return json.loads(self.unit_peer_data[rel_departed_key]) + + def has_departed_run(self, rel_id: int) -> bool: + """Returns True if the relation departed event has run.""" + rel_departed_key = self._generate_relation_departed_key(rel_id) + return rel_departed_key in self.unit_peer_data + + def set_scaling_down(self, event: RelationDepartedEvent) -> bool: + """Sets whether or not the current unit is scaling down.""" + # check if relation departed is due to current unit being removed. (i.e. scaling down the + # application.) + rel_departed_key = self._generate_relation_departed_key(event.relation.id) + scaling_down = event.departing_unit == self.unit + self.unit_peer_data[rel_departed_key] = json.dumps(scaling_down) + return scaling_down + + def proceed_on_broken_event(self, event) -> bool: + """Returns True if relation broken event should be acted on..""" + # Only relation_deparated events can check if scaling down + departed_relation_id = event.relation.id + if not self.has_departed_run(departed_relation_id): + logger.info( + "Deferring, must wait for relation departed hook to decide if relation should be removed." + ) + event.defer() + return False + + # check if were scaling down and add a log message + if self.is_scaling_down(departed_relation_id): + logger.info( + "Relation broken event occurring due to scale down, do not proceed to remove users." + ) + return False + + return True + + def get_mongos_host(self) -> str: + """Returns the host for mongos as a str. + + The host for mongos can be either the Unix Domain Socket or an IP address depending on how + the client wishes to connect to mongos (inside Juju or outside). + """ + return self.unit_host + + @staticmethod + def _generate_relation_departed_key(rel_id: int) -> str: + """Generates the relation departed key for a specified relation id.""" + return f"relation_{rel_id}_departed" + + def open_mongos_port(self) -> None: + """Opens the mongos port for TCP connections.""" + self.unit.open_port("tcp", Config.MONGOS_PORT) + + def is_role(self, role_name: str) -> bool: + """Checks if application is running in provided role.""" + return self.role == role_name + + def is_db_service_ready(self) -> bool: + """Returns True if the underlying database service is ready.""" + with MongosConnection(self.mongos_config) as mongos: + return mongos.is_ready + + def _push_keyfile_to_workload(self, container: Container) -> None: + """Upload the keyFile to a workload container.""" + keyfile = self.get_secret(APP_SCOPE, Config.Secrets.SECRET_KEYFILE_NAME) + if not keyfile: + raise MissingSecretError(f"No secret defined for {APP_SCOPE}, keyfile") + else: + self.push_file_to_unit( + container=container, + parent_dir=Config.MONGOD_CONF_DIR, + file_name=Config.TLS.KEY_FILE_NAME, + file_contents=keyfile, + ) + + @staticmethod + def _pull_licenses(container: Container) -> None: + """Pull licences from workload.""" + licenses = [ + "snap", + "rock", + "percona-server", + ] + + for license_name in licenses: + try: + license_file = container.pull( + path=Config.get_license_path(license_name) + ) + f = open(f"LICENSE_{license_name}", "x") + f.write(str(license_file.read())) + f.close() + except FileExistsError: + pass + + @staticmethod + def _set_data_dir_permissions(container: Container) -> None: + """Ensure the data directory for mongodb is writable for the "mongodb" user. + + Until the ability to set fsGroup and fsGroupChangePolicy via Pod securityContext + is available, we fix permissions incorrectly with chown. + """ + for path in [Config.DATA_DIR, Config.LOG_DIR, Config.LogRotate.LOG_STATUS_DIR]: + paths = container.list_files(path, itself=True) + if not len(paths) == 1: + raise ExtraDataDirError( + "list_files doesn't return only the directory itself" + ) + logger.debug(f"Data directory ownership: {paths[0].user}:{paths[0].group}") + if paths[0].user != Config.UNIX_USER or paths[0].group != Config.UNIX_GROUP: + container.exec( + f"chown {Config.UNIX_USER}:{Config.UNIX_GROUP} -R {path}".split() + ) + + def push_file_to_unit( + self, + parent_dir: str, + file_name: str, + file_contents: str, + container: Container = None, + ) -> None: + """Push the file on the container, with the right permissions.""" + container = container or self.unit.get_container(Config.CONTAINER_NAME) + container.push( + f"{parent_dir}/{file_name}", + file_contents, + make_dirs=True, + permissions=0o400, + user=Config.UNIX_USER, + group=Config.UNIX_GROUP, + ) + + def unit_host(self, unit: Unit) -> str: + """Create a DNS name for a MongoDB unit. + + Args: + unit_name: the juju unit name, e.g. "mongodb/1". + + Returns: + A string representing the hostname of the MongoDB unit. + """ + unit_id = unit.name.split("/")[1] + return f"{self.app.name}-{unit_id}.{self.app.name}-endpoints" + + # END: helper functions + + # BEGIN: properties + @property + def mongos_initialised(self) -> bool: + """Check if mongos is initialised.""" + return "mongos_initialised" in self.app_peer_data + + @mongos_initialised.setter + def mongos_initialised(self, value: bool): + """Set the mongos_initialised flag.""" + if value: + self.app_peer_data["mongos_initialised"] = str(value) + elif "mongos_initialised" in self.app_peer_data: + del self.app_peer_data["mongos_initialised"] + + @property + def _unit_ip(self) -> str: + """Returns the ip address of the unit.""" + return str(self.model.get_binding(Config.Relations.PEERS).network.bind_address) + + @property + def is_external_client(self) -> Optional[str]: + """Returns the connectivity mode which mongos should use. + + This is determined by checking the modes requested by the client(s). + + TODO: Future PR. This should be modified to work for many clients. + """ + if EXTERNAL_CONNECTIVITY_TAG not in self.app_peer_data: + return False + + return json.loads(self.app_peer_data.get(EXTERNAL_CONNECTIVITY_TAG)) + + @property + def database(self) -> Optional[str]: + """Returns a mapping of databases requested by integrated clients. + + TODO: Future PR. This should be modified to work for many clients. + """ + if not self._peers: + logger.info("Peer relation not joined yet.") + # TODO future PR implement relation interface between host application mongos and use + # host application name in generation of db name. + return "mongos-database" + + return self.app_peer_data.get(DATABASE_TAG, "mongos-database") + + @property + def extra_user_roles(self) -> Set[str]: + """Returns a mapping of user roles requested by integrated clients. + + TODO: Future PR. This should be modified to work for many clients. + """ + if not self._peers: + logger.info("Peer relation not joined yet.") + return None + + return self.app_peer_data.get(USER_ROLES_TAG, "default") + + @property + def mongos_config(self) -> MongosConfiguration: + """Generates a MongoDBConfiguration object for mongos in the deployment of MongoDB.""" + hosts = [self.get_mongos_host()] + # TODO: Future PR. Ensure that this works for external connections with NodePort + port = Config.MONGOS_PORT if self.is_external_client else None + external_ca, _ = self.tls.get_tls_files(internal=False) + internal_ca, _ = self.tls.get_tls_files(internal=True) + + return MongosConfiguration( + database=self.database, + username=self.get_secret(APP_SCOPE, Config.Secrets.USERNAME), + password=self.get_secret(APP_SCOPE, Config.Secrets.PASSWORD), + hosts=hosts, + port=port, + roles=self.extra_user_roles, + tls_external=external_ca is not None, + tls_internal=internal_ca is not None, + ) + + @property + def _peers(self) -> Relation | None: + """Fetch the peer relation. + + Returns: + An `ops.model.Relation` object representing the peer relation. + """ + return self.model.get_relation(Config.Relations.PEERS) + + @property + def unit_peer_data(self) -> Dict: + """Peer relation data object.""" + if not self._peers: + return {} + + return self._peers.data[self.unit] + + @property + def app_peer_data(self) -> Dict: + """Peer relation data object.""" + if not self._peers: + return {} + + return self._peers.data[self.app] + + @property + def upgrade_in_progress(self) -> bool: + """Returns true if an upgrade is currently in progress. + + TODO implement this function once upgrades are supported. + """ + return False + + # END: properties if __name__ == "__main__": - main(MongosCharm) + ops.main(MongosCharm) diff --git a/src/config.py b/src/config.py new file mode 100644 index 00000000..9ac7aa19 --- /dev/null +++ b/src/config.py @@ -0,0 +1,76 @@ +"""Configuration for Mongos Charm.""" +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +from typing import Literal +from ops.model import BlockedStatus + + +class Config: + """Configuration for MongoDB Charm.""" + + MONGOS_PORT = 27018 + MONGODB_PORT = 27017 + SUBSTRATE = "k8s" + CONTAINER_NAME = "mongos" + + class Relations: + """Relations related config for MongoDB Charm.""" + + APP_SCOPE = "app" + UNIT_SCOPE = "unit" + PEERS = "router-peers" + CLUSTER_RELATIONS_NAME = "cluster" + Scopes = Literal[APP_SCOPE, UNIT_SCOPE] + + class TLS: + """TLS related config for MongoDB Charm.""" + + KEY_FILE_NAME = "keyFile" + TLS_PEER_RELATION = "certificates" + SECRET_KEY_LABEL = "key-secret" + + EXT_PEM_FILE = "external-cert.pem" + EXT_CA_FILE = "external-ca.crt" + INT_PEM_FILE = "internal-cert.pem" + INT_CA_FILE = "internal-ca.crt" + SECRET_CA_LABEL = "ca-secret" + SECRET_CERT_LABEL = "cert-secret" + SECRET_CSR_LABEL = "csr-secret" + SECRET_CHAIN_LABEL = "chain-secret" + + class Secrets: + """Secrets related constants.""" + + SECRET_LABEL = "secret" + SECRET_CACHE_LABEL = "cache" + SECRET_KEYFILE_NAME = "keyfile" + SECRET_INTERNAL_LABEL = "internal-secret" + USERNAME = "username" + PASSWORD = "password" + SECRET_DELETED_LABEL = "None" + MAX_PASSWORD_LENGTH = 4096 + + class Status: + """Status related constants. + + TODO: move all status messages here. + """ + + STATUS_READY_FOR_UPGRADE = "status-shows-ready-for-upgrade" + + # TODO Future PR add more status messages here as constants + UNHEALTHY_UPGRADE = BlockedStatus("Unhealthy after upgrade.") + + class Role: + """Role config names for MongoDB Charm.""" + + CONFIG_SERVER = "config-server" + REPLICATION = "replication" + SHARD = "shard" + MONGOS = "mongos" + + @staticmethod + def get_license_path(license_name: str) -> str: + """Return the path to the license file.""" + return f"{Config.LICENSE_PATH}-{license_name}" diff --git a/src/exceptions.py b/src/exceptions.py new file mode 100644 index 00000000..eed87fd8 --- /dev/null +++ b/src/exceptions.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python3 +"""Charm code for `mongos` daemon.""" +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + + +class MongoError(Exception): + """Common parent for Mongo errors, allowing to catch them all at once.""" + + +class AdminUserCreationError(MongoError): + """Raised when a commands to create an admin user on MongoDB fail.""" + + +class ApplicationHostNotFoundError(MongoError): + """Raised when a queried host is not in the application peers or the current host.""" + + +class MongoSecretError(MongoError): + """Common parent for all Mongo Secret Exceptions.""" + + +class SecretNotAddedError(MongoSecretError): + """Raised when a Juju 3 secret couldn't be set or re-set.""" + + +class MissingSecretError(MongoSecretError): + """Could be raised when a Juju 3 mandatory secret couldn't be found.""" + + +class SecretAlreadyExistsError(MongoSecretError): + """A secret that we want to create already exists.""" diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py new file mode 100644 index 00000000..f0f215dd --- /dev/null +++ b/tests/integration/helpers.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +import json +from typing import Any, Dict, List, Optional +import logging + +from dateutil.parser import parse +from pytest_operator.plugin import OpsTest +from tenacity import Retrying, stop_after_delay, wait_fixed + +logger = logging.getLogger(__name__) + + +MONGOS_APP_NAME = "mongos" + + +class Status: + """Model class for status.""" + + def __init__(self, value: str, since: str, message: Optional[str] = None): + self.value = value + self.since = parse(since, ignoretz=True) + self.message = message + + +class Unit: + """Model class for a Unit, with properties widely used.""" + + def __init__( + self, + id: int, + name: str, + ip: str, + hostname: str, + is_leader: bool, + workload_status: Status, + agent_status: Status, + app_status: Status, + ): + self.id = id + self.name = name + self.ip = ip + self.hostname = hostname + self.is_leader = is_leader + self.workload_status = workload_status + self.agent_status = agent_status + self.app_status = app_status + + def dump(self) -> Dict[str, Any]: + """To json.""" + result = {} + for key, val in vars(self).items(): + result[key] = vars(val) if isinstance(val, Status) else val + return result + + +async def get_application_units(ops_test: OpsTest, app: str) -> List[Unit]: + """Get fully detailed units of an application.""" + # Juju incorrectly reports the IP addresses after the network is restored this is reported as a + # bug here: https://github.com/juju/python-libjuju/issues/738. Once this bug is resolved use of + # `get_unit_ip` should be replaced with `.public_address` + raw_app = await get_raw_application(ops_test, app) + units = [] + for u_name, unit in raw_app["units"].items(): + unit_id = int(u_name.split("/")[-1]) + if not unit.get("address", False): + # unit not ready yet... + continue + + unit = Unit( + id=unit_id, + name=u_name.replace("/", "-"), + ip=unit["address"], + hostname=await get_unit_hostname(ops_test, unit_id, app), + is_leader=unit.get("leader", False), + workload_status=Status( + value=unit["workload-status"]["current"], + since=unit["workload-status"]["since"], + message=unit["workload-status"].get("message"), + ), + agent_status=Status( + value=unit["juju-status"]["current"], + since=unit["juju-status"]["since"], + ), + app_status=Status( + value=raw_app["application-status"]["current"], + since=raw_app["application-status"]["since"], + message=raw_app["application-status"].get("message"), + ), + ) + + units.append(unit) + + return units + + +async def check_all_units_blocked_with_status( + ops_test: OpsTest, db_app_name: str, status: Optional[str] +) -> None: + # this is necessary because ops_model.units does not update the unit statuses + for unit in await get_application_units(ops_test, db_app_name): + assert ( + unit.workload_status.value == "blocked" + ), f"unit {unit.name} not in blocked state, in {unit.workload_status.value}" + if status: + assert ( + unit.workload_status.message == status + ), f"unit {unit.name} not in blocked state, in {unit.workload_status.value}" + + +async def get_unit_hostname(ops_test: OpsTest, unit_id: int, app: str) -> str: + """Get the hostname of a specific unit.""" + _, hostname, _ = await ops_test.juju("ssh", f"{app}/{unit_id}", "hostname") + return hostname.strip() + + +async def get_raw_application(ops_test: OpsTest, app: str) -> Dict[str, Any]: + """Get raw application details.""" + ret_code, stdout, stderr = await ops_test.juju( + *f"status --model {ops_test.model.info.name} {app} --format=json".split() + ) + if ret_code != 0: + logger.error(f"Invalid return [{ret_code=}]: {stderr=}") + raise Exception(f"[{ret_code=}] {stderr=}") + return json.loads(stdout)["applications"][app] + + +async def wait_for_mongos_units_blocked( + ops_test: OpsTest, db_app_name: str, status: Optional[str] = None, timeout=20 +) -> None: + """Waits for units of MongoDB to be in the blocked state. + + This is necessary because the MongoDB app can report a different status than the units. + """ + hook_interval_key = "update-status-hook-interval" + try: + old_interval = (await ops_test.model.get_config())[hook_interval_key] + await ops_test.model.set_config({hook_interval_key: "1m"}) + for attempt in Retrying( + stop=stop_after_delay(timeout), wait=wait_fixed(1), reraise=True + ): + with attempt: + await check_all_units_blocked_with_status(ops_test, db_app_name, status) + finally: + await ops_test.model.set_config({hook_interval_key: old_interval}) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 2108bc2b..2af5c074 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -8,7 +8,11 @@ import yaml from pytest_operator.plugin import OpsTest -MONGOS_APP_NAME = "mongos" +from .helpers import ( + wait_for_mongos_units_blocked, + MONGOS_APP_NAME, +) + METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) @@ -29,3 +33,17 @@ async def test_build_and_deploy(ops_test: OpsTest): application_name=MONGOS_APP_NAME, series="jammy", ) + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_waits_for_config_server(ops_test: OpsTest) -> None: + """Verifies that the application and unit are active.""" + + # verify that Charmed Mongos is blocked and reports incorrect credentials + await wait_for_mongos_units_blocked( + ops_test, + MONGOS_APP_NAME, + status="Missing relation to config-server.", + timeout=300, + )