diff --git a/lib/charms/mongodb/v0/mongos.py b/lib/charms/mongodb/v0/mongos.py new file mode 100644 index 000000000..50d9c8ceb --- /dev/null +++ b/lib/charms/mongodb/v0/mongos.py @@ -0,0 +1,170 @@ +"""Code for interactions with MongoDB.""" +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +import logging +from dataclasses import dataclass +from typing import Optional, Set +from urllib.parse import quote_plus + +from pymongo import MongoClient +from pymongo.errors import PyMongoError + +from config import Config + +# The unique Charmhub library identifier, never change it +LIBID = "e20d5b19670d4c55a4934a21d3f3b29a" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 1 + +# path to store mongodb ketFile +logger = logging.getLogger(__name__) + + +@dataclass +class MongosConfiguration: + """Class for mongos configuration. + + — database: database name. + — username: username. + — password: password. + — hosts: full list of hosts to connect to, needed for the URI. + - port: integer for the port to connect to connect to mongodb. + - tls_external: indicator for use of internal TLS connection. + - tls_internal: indicator for use of external TLS connection. + """ + + database: Optional[str] + username: str + password: str + hosts: Set[str] + port: int + roles: Set[str] + tls_external: bool + tls_internal: bool + + @property + def uri(self): + """Return URI concatenated from fields.""" + hosts = [f"{host}:{self.port}" for host in self.hosts] + hosts = ",".join(hosts) + # Auth DB should be specified while user connects to application DB. + auth_source = "" + if self.database != "admin": + auth_source = "&authSource=admin" + return ( + f"mongodb://{quote_plus(self.username)}:" + f"{quote_plus(self.password)}@" + f"{hosts}/{quote_plus(self.database)}?" + f"{auth_source}" + ) + + +class NotReadyError(PyMongoError): + """Raised when not all replica set members healthy or finished initial sync.""" + + +class MongosConnection: + """In this class we create connection object to Mongos. + + Real connection is created on the first call to Mongos. + Delayed connectivity allows to firstly check database readiness + and reuse the same connection for an actual query later in the code. + + Connection is automatically closed when object destroyed. + Automatic close allows to have more clean code. + + Note that connection when used may lead to the following pymongo errors: ConfigurationError, + ConfigurationError, OperationFailure. It is suggested that the following pattern be adopted + when using MongoDBConnection: + + with MongoMongos(self._mongos_config) as mongo: + try: + mongo. + except ConfigurationError, OperationFailure: + + """ + + def __init__(self, config: MongosConfiguration, uri=None, direct=False): + """A MongoDB client interface. + + Args: + config: MongoDB Configuration object. + uri: allow using custom MongoDB URI, needed for replSet init. + direct: force a direct connection to a specific host, avoiding + reading replica set configuration and reconnection. + """ + self.mongodb_config = config + + if uri is None: + uri = config.uri + + self.client = MongoClient( + uri, + directConnection=direct, + connect=False, + serverSelectionTimeoutMS=1000, + connectTimeoutMS=2000, + ) + return + + def __enter__(self): + """Return a reference to the new connection.""" + return self + + def __exit__(self, object_type, value, traceback): + """Disconnect from MongoDB client.""" + self.client.close() + self.client = None + + def get_shard_members(self) -> Set[str]: + """Gets shard members. + + Returns: + A set of the shard members as reported by mongos. + + Raises: + ConfigurationError, OperationFailure + """ + shard_list = self.client.admin.command("listShards") + curr_members = [ + self._hostname_from_hostport(member["host"]) for member in shard_list["shards"] + ] + return set(curr_members) + + def add_shard(self, shard_name, shard_hosts, shard_port=Config.MONGODB_PORT): + """Adds shard to the cluster. + + Raises: + ConfigurationError, OperationFailure + """ + shard_hosts = [f"{host}:{shard_port}" for host in shard_hosts] + shard_hosts = ",".join(shard_hosts) + shard_url = f"{shard_name}/{shard_hosts}" + # TODO Future PR raise error when number of shards currently adding are higher than the + # number of secondaries on the primary shard. This will be challenging, as there is no + # MongoDB command to retrieve the primary shard. Will likely need to be done via + # mongosh + + if shard_name in self.get_shard_members(): + logger.info("Skipping adding shard %s, shard is already in cluster", shard_name) + return + + logger.info("Adding shard %s", shard_name) + self.client.admin.command("addShard", shard_url) + + @staticmethod + def _hostname_from_hostport(hostname: str) -> str: + """Return hostname part from MongoDB returned. + + mongos typically returns a value that contains both, hostname, hosts, and ports. + e.g. input: shard03/host7:27018,host8:27018,host9:27018 + Return shard name + e.g. output: shard03 + """ + return hostname.split("/")[0] diff --git a/lib/charms/mongodb/v0/shards_interface.py b/lib/charms/mongodb/v0/shards_interface.py index 012716204..58e4a7a29 100644 --- a/lib/charms/mongodb/v0/shards_interface.py +++ b/lib/charms/mongodb/v0/shards_interface.py @@ -7,13 +7,15 @@ shards. """ import logging +from typing import List, Optional from charms.mongodb.v0.helpers import KEY_FILE from charms.mongodb.v0.mongodb import MongoDBConnection, NotReadyError, PyMongoError +from charms.mongodb.v0.mongos import MongosConnection from charms.mongodb.v0.users import MongoDBUser, OperatorUser -from ops.charm import CharmBase +from ops.charm import CharmBase, RelationBrokenEvent from ops.framework import Object -from ops.model import BlockedStatus, MaintenanceStatus, WaitingStatus +from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus, WaitingStatus from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed from config import Config @@ -29,7 +31,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 1 +LIBPATCH = 2 KEYFILE_KEY = "key-file" OPERATOR_PASSWORD_KEY = MongoDBUser.get_password_key_name_for_user(OperatorUser.get_username()) @@ -48,7 +50,11 @@ def __init__( self.framework.observe( charm.on[self.relation_name].relation_joined, self._on_relation_joined ) - # TODO Future PR, enable shard drainage by listening for relation departed events + self.framework.observe( + charm.on[self.relation_name].relation_changed, self._on_relation_event + ) + + # TODO Follow up PR, handle rotating passwords def _on_relation_joined(self, event): """Handles providing shards with secrets and adding shards to the config server.""" @@ -84,7 +90,67 @@ def _on_relation_joined(self, event): }, ) - # TODO Future PR, add shard to config server + def _on_relation_event(self, event): + """Handles adding, removing, and updating of shards.""" + if self.charm.is_role(Config.Role.REPLICATION): + self.unit.status = BlockedStatus("role replication does not support sharding") + logger.error("sharding interface not supported with config role=replication") + return + + if not self.charm.is_role(Config.Role.CONFIG_SERVER): + logger.info( + "skipping relation joined event ShardingRequirer is only be executed by config-server" + ) + return + + if not self.charm.unit.is_leader(): + return + + if not self.charm.db_initialised: + event.defer() + + departed_relation_id = None + if type(event) is RelationBrokenEvent: + departed_relation_id = event.relation.id + + try: + logger.info("Adding shards not present in cluster.") + self.add_shards(departed_relation_id) + # TODO Future PR, enable updating shards by listening for relation changed events + # TODO Future PR, enable shard drainage by listening for relation departed events + except PyMongoError as e: + logger.error("Deferring _on_relation_event for shards interface since: error=%r", e) + event.defer() + return + + def add_shards(self, departed_shard_id): + """Adds shards to cluster. + + raises: PyMongoError + """ + with MongosConnection(self.charm.mongos_config) as mongo: + cluster_shards = mongo.get_shard_members() + relation_shards = self._get_shards_from_relations(departed_shard_id) + + # TODO Future PR, limit number of shards add at a time, based on the number of + # replicas in the primary shard + for shard in relation_shards - cluster_shards: + try: + shard_hosts = self._get_shard_hosts(shard) + if not len(shard_hosts): + logger.info("host info for shard %s not yet added, skipping", shard) + continue + + self.charm.unit.status = MaintenanceStatus( + f"Adding shard {shard} to config-server" + ) + logger.info("Adding shard: %s ", shard) + mongo.add_shard(shard, shard_hosts) + except PyMongoError as e: + logger.error("Failed to add shard %s to the config server, error=%r", shard, e) + raise + + self.charm.unit.status = ActiveStatus("") def update_credentials(self, key: str, value: str) -> None: """Sends new credentials, for a key value pair across all shards.""" @@ -107,6 +173,32 @@ def _update_relation_data(self, relation_id: int, data: dict) -> None: if relation: relation.data[self.charm.model.app].update(data) + def _get_shards_from_relations(self, departed_shard_id: Optional[int]): + """Returns a list of the shards related to the config-server.""" + relations = self.model.relations[self.relation_name] + return set( + [ + self._get_shard_name_from_relation(relation) + for relation in relations + if relation.id != departed_shard_id + ] + ) + + def _get_shard_hosts(self, shard_name) -> List[str]: + """Retrieves the hosts for a specified shard.""" + relations = self.model.relations[self.relation_name] + for relation in relations: + if self._get_shard_name_from_relation(relation) == shard_name: + hosts = [] + for unit in relation.units: + hosts.append(relation.data[unit].get("private-address")) + + return hosts + + def _get_shard_name_from_relation(self, relation): + """Returns the name of a shard for a specified relation.""" + return relation.app.name + class ConfigServerRequirer(Object): """Manage relations between the config server and the shard, on the shard's side.""" @@ -170,7 +262,7 @@ def _on_relation_changed(self, event): ) return - # TODO future PR, leader unit verifies shard was added to cluster + # TODO future PR, leader unit verifies shard was added to cluster (update-status hook) def update_operator_password(self, new_password: str) -> None: """Updates the password for the operator user. @@ -237,3 +329,19 @@ def update_keyfile(self, key_file_contents: str) -> None: self.charm.set_secret( Config.Relations.APP_SCOPE, Config.Secrets.SECRET_KEYFILE_NAME, key_file_contents ) + + def _update_relation_data(self, relation_id: int, data: dict) -> None: + """Updates a set of key-value pairs in the relation. + + This function writes in the application data bag, therefore, only the leader unit can call + it. + + Args: + relation_id: the identifier for a particular relation. + data: dict containing the key-value pairs + that should be updated in the relation. + """ + if self.charm.unit.is_leader(): + relation = self.charm.model.get_relation(self.relation_name, relation_id) + if relation: + relation.data[self.charm.model.app].update(data) diff --git a/src/charm.py b/src/charm.py index 27c8c3cad..b8e12d1de 100755 --- a/src/charm.py +++ b/src/charm.py @@ -35,6 +35,7 @@ from charms.mongodb.v0.mongodb_provider import MongoDBProvider from charms.mongodb.v0.mongodb_tls import MongoDBTLS from charms.mongodb.v0.mongodb_vm_legacy_provider import MongoDBLegacyProvider +from charms.mongodb.v0.mongos import MongosConfiguration from charms.mongodb.v0.shards_interface import ConfigServerRequirer, ShardingProvider from charms.mongodb.v0.users import ( CHARM_USERS, @@ -192,6 +193,11 @@ def _replica_set_hosts(self): """ return json.loads(self.app_peer_data.get("replica_set_hosts", "[]")) + @property + def mongos_config(self) -> MongoDBConfiguration: + """Generates a MongoDBConfiguration object for mongos in the deployment of MongoDB.""" + return self._get_mongos_config_for_user(OperatorUser, set(self._unit_ips)) + @property def mongodb_config(self) -> MongoDBConfiguration: """Generates a MongoDBConfiguration object for this deployment of MongoDB.""" @@ -751,6 +757,23 @@ def _is_user_created(self, user: MongoDBUser) -> bool: def _set_user_created(self, user: MongoDBUser) -> None: self.app_peer_data[f"{user.get_username()}-user-created"] = "True" + def _get_mongos_config_for_user( + self, user: MongoDBUser, hosts: Set[str] + ) -> MongosConfiguration: + external_ca, _ = self.tls.get_tls_files(UNIT_SCOPE) + internal_ca, _ = self.tls.get_tls_files(APP_SCOPE) + + return MongosConfiguration( + database=user.get_database_name(), + username=user.get_username(), + password=self.get_secret(APP_SCOPE, user.get_password_key_name()), + hosts=hosts, + port=Config.MONGOS_PORT, + roles=user.get_roles(), + tls_external=external_ca is not None, + tls_internal=internal_ca is not None, + ) + def _get_mongodb_config_for_user( self, user: MongoDBUser, hosts: Set[str] ) -> MongoDBConfiguration: