diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index cc2496ffb..1cf412ae2 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -100,7 +100,6 @@ jobs: with: provider: lxd juju-channel: 3.1/stable - bootstrap-options: "--agent-version 3.1.6" - name: Download packed charm(s) uses: actions/download-artifact@v3 with: diff --git a/lib/charms/mongodb/v0/config_server_interface.py b/lib/charms/mongodb/v0/config_server_interface.py index 65242d9b3..70df7aa37 100644 --- a/lib/charms/mongodb/v0/config_server_interface.py +++ b/lib/charms/mongodb/v0/config_server_interface.py @@ -14,7 +14,7 @@ ) from charms.mongodb.v1.helpers import add_args_to_env, get_mongos_args from charms.mongodb.v1.mongos import MongosConnection -from ops.charm import CharmBase, EventBase +from ops.charm import CharmBase, EventBase, RelationBrokenEvent from ops.framework import Object from ops.model import ActiveStatus, MaintenanceStatus, WaitingStatus @@ -35,7 +35,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 2 +LIBPATCH = 5 class ClusterProvider(Object): @@ -54,11 +54,21 @@ def __init__( charm.on[self.relation_name].relation_changed, self._on_relation_changed ) - # TODO Future PRs handle scale down - # TODO Future PRs handle changing of units/passwords to be propagated to mongos + self.framework.observe( + charm.on[self.relation_name].relation_departed, + self.charm.check_relation_broken_or_scale_down, + ) + self.framework.observe( + charm.on[self.relation_name].relation_broken, self._on_relation_broken + ) def pass_hook_checks(self, event: EventBase) -> bool: """Runs the pre-hooks checks for ClusterProvider, returns True if all pass.""" + if not self.charm.db_initialised: + logger.info("Deferring %s. db is not initialised.", type(event)) + event.defer() + return False + if not self.charm.is_role(Config.Role.CONFIG_SERVER): logger.info( "Skipping %s. ShardingProvider is only be executed by config-server", type(event) @@ -68,11 +78,6 @@ def pass_hook_checks(self, event: EventBase) -> bool: if not self.charm.unit.is_leader(): return False - if not self.charm.db_initialised: - logger.info("Deferring %s. db is not initialised.", type(event)) - event.defer() - return False - return True def _on_relation_changed(self, event) -> None: @@ -97,6 +102,26 @@ def _on_relation_changed(self, event) -> None: }, ) + def _on_relation_broken(self, event) -> None: + # Only relation_deparated events can check if scaling down + departed_relation_id = event.relation.id + if not self.charm.has_departed_run(departed_relation_id): + logger.info( + "Deferring, must wait for relation departed hook to decide if relation should be removed." + ) + event.defer() + return + + if not self.pass_hook_checks(event): + logger.info("Skipping relation broken event: hook checks did not pass") + return + + if not self.charm.proceed_on_broken_event(event): + logger.info("Skipping relation broken event, broken event due to scale down") + return + + self.charm.client_relations.oversee_users(departed_relation_id, event) + def update_config_server_db(self, event): """Provides related mongos applications with new config server db.""" if not self.pass_hook_checks(event): @@ -157,7 +182,13 @@ def __init__( self.framework.observe( charm.on[self.relation_name].relation_changed, self._on_relation_changed ) - # TODO Future PRs handle scale down + self.framework.observe( + charm.on[self.relation_name].relation_departed, + self.charm.check_relation_broken_or_scale_down, + ) + self.framework.observe( + charm.on[self.relation_name].relation_broken, self._on_relation_broken + ) def _on_database_created(self, event) -> None: if not self.charm.unit.is_leader(): @@ -202,6 +233,30 @@ def _on_relation_changed(self, event) -> None: self.charm.unit.status = ActiveStatus() + def _on_relation_broken(self, event: RelationBrokenEvent) -> None: + # Only relation_deparated events can check if scaling down + if not self.charm.has_departed_run(event.relation.id): + logger.info( + "Deferring, must wait for relation departed hook to decide if relation should be removed." + ) + event.defer() + return + + if not self.charm.proceed_on_broken_event(event): + logger.info("Skipping relation broken event, broken event due to scale down") + return + + self.charm.stop_mongos_service() + logger.info("Stopped mongos daemon") + + if not self.charm.unit.is_leader(): + return + + logger.info("Database and user removed for mongos application") + self.charm.remove_secret(Config.Relations.APP_SCOPE, Config.Secrets.USERNAME) + self.charm.remove_secret(Config.Relations.APP_SCOPE, Config.Secrets.PASSWORD) + self.charm.remove_connection_info() + # BEGIN: helper functions def is_mongos_running(self) -> bool: diff --git a/lib/charms/mongodb/v0/mongodb_secrets.py b/lib/charms/mongodb/v0/mongodb_secrets.py index 6a1589c66..13d7f058b 100644 --- a/lib/charms/mongodb/v0/mongodb_secrets.py +++ b/lib/charms/mongodb/v0/mongodb_secrets.py @@ -6,7 +6,7 @@ from ops import Secret, SecretInfo from ops.charm import CharmBase -from ops.model import SecretNotFoundError +from ops.model import ModelError, SecretNotFoundError from config import Config from exceptions import SecretAlreadyExistsError @@ -93,7 +93,21 @@ def get_content(self) -> Dict[str, str]: """Getting cached secret content.""" if not self._secret_content: if self.meta: - self._secret_content = self.meta.get_content() + try: + self._secret_content = self.meta.get_content(refresh=True) + except (ValueError, ModelError) as err: + # https://bugs.launchpad.net/juju/+bug/2042596 + # Only triggered when 'refresh' is set + known_model_errors = [ + "ERROR either URI or label should be used for getting an owned secret but not both", + "ERROR secret owner cannot use --refresh", + ] + if isinstance(err, ModelError) and not any( + msg in str(err) for msg in known_model_errors + ): + raise + # Due to: ValueError: Secret owner cannot use refresh=True + self._secret_content = self.meta.get_content() return self._secret_content def set_content(self, content: Dict[str, str]) -> None: diff --git a/lib/charms/mongodb/v1/helpers.py b/lib/charms/mongodb/v1/helpers.py index 7f66faad1..e3668718e 100644 --- a/lib/charms/mongodb/v1/helpers.py +++ b/lib/charms/mongodb/v1/helpers.py @@ -44,11 +44,22 @@ MONGO_SHELL = "charmed-mongodb.mongosh" DATA_DIR = "/var/lib/mongodb" +LOG_DIR = "/var/log/mongodb" +LOG_TO_SYSLOG = True CONF_DIR = "/etc/mongod" MONGODB_LOG_FILENAME = "mongodb.log" logger = logging.getLogger(__name__) +def _get_logging_options(snap_install: bool) -> str: + # TODO sending logs to syslog until we have a separate mount point for logs + if LOG_TO_SYSLOG: + return "" + # in k8s the default logging options that are used for the vm charm are ignored and logs are + # the output of the container. To enable logging to a file it must be set explicitly + return f"--logpath={LOG_DIR}/{MONGODB_LOG_FILENAME}" if snap_install else "" + + # noinspection GrazieInspection def get_create_user_cmd(config: MongoDBConfiguration, mongo_path=MONGO_SHELL) -> List[str]: """Creates initial admin user for MongoDB. @@ -130,9 +141,7 @@ def get_mongod_args( """ full_data_dir = f"{MONGODB_COMMON_DIR}{DATA_DIR}" if snap_install else DATA_DIR full_conf_dir = f"{MONGODB_SNAP_DATA_DIR}{CONF_DIR}" if snap_install else CONF_DIR - # in k8s the default logging options that are used for the vm charm are ignored and logs are - # the output of the container. To enable logging to a file it must be set explicitly - logging_options = "" if snap_install else f"--logpath={full_data_dir}/{MONGODB_LOG_FILENAME}" + logging_options = _get_logging_options(snap_install) cmd = [ # bind to localhost and external interfaces "--bind_ip_all", @@ -143,6 +152,8 @@ def get_mongod_args( # for simplicity we run the mongod daemon on shards, configsvrs, and replicas on the same # port f"--port={Config.MONGODB_PORT}", + "--auditDestination=syslog", # TODO sending logs to syslog until we have a separate mount point for logs + f"--auditFormat={Config.AuditLog.FORMAT}", logging_options, ] if auth: diff --git a/lib/charms/mongodb/v1/mongodb_provider.py b/lib/charms/mongodb/v1/mongodb_provider.py index 691997a8b..ff9bf688c 100644 --- a/lib/charms/mongodb/v1/mongodb_provider.py +++ b/lib/charms/mongodb/v1/mongodb_provider.py @@ -31,7 +31,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 3 +LIBPATCH = 4 logger = logging.getLogger(__name__) REL_NAME = "database" @@ -87,6 +87,11 @@ def __init__(self, charm: CharmBase, substrate="k8s", relation_name: str = "data def pass_hook_checks(self) -> bool: """Runs the pre-hooks checks for MongoDBProvider, returns True if all pass.""" + # We shouldn't try to create or update users if the database is not + # initialised. We will create users as part of initialisation. + if not self.charm.db_initialised: + return False + if not self.charm.is_relation_feasible(self.relation_name): logger.info("Skipping code for relations.") return False @@ -100,11 +105,6 @@ def pass_hook_checks(self) -> bool: if not self.charm.unit.is_leader(): return False - # We shouldn't try to create or update users if the database is not - # initialised. We will create users as part of initialisation. - if not self.charm.db_initialised: - return False - return True def _on_relation_event(self, event): diff --git a/lib/charms/mongodb/v1/shards_interface.py b/lib/charms/mongodb/v1/shards_interface.py index da7cc8320..b9a876aec 100644 --- a/lib/charms/mongodb/v1/shards_interface.py +++ b/lib/charms/mongodb/v1/shards_interface.py @@ -51,7 +51,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 2 +LIBPATCH = 5 KEYFILE_KEY = "key-file" HOSTS_KEY = "host" OPERATOR_PASSWORD_KEY = MongoDBUser.get_password_key_name_for_user(OperatorUser.get_username()) @@ -121,6 +121,11 @@ def _on_relation_joined(self, event): def pass_hook_checks(self, event: EventBase) -> bool: """Runs the pre-hooks checks for ShardingProvider, returns True if all pass.""" + if not self.charm.db_initialised: + logger.info("Deferring %s. db is not initialised.", type(event)) + event.defer() + return False + if not self.charm.is_relation_feasible(self.relation_name): logger.info("Skipping event %s , relation not feasible.", type(event)) return False @@ -134,11 +139,6 @@ def pass_hook_checks(self, event: EventBase) -> bool: if not self.charm.unit.is_leader(): return False - if not self.charm.db_initialised: - logger.info("Deferring %s. db is not initialised.", type(event)) - event.defer() - return False - # adding/removing shards while a backup/restore is in progress can be disastrous pbm_status = self.charm.backups.get_pbm_status() if isinstance(pbm_status, MaintenanceStatus): @@ -146,29 +146,18 @@ def pass_hook_checks(self, event: EventBase) -> bool: event.defer() return False - return True - - def _proceed_on_broken_event(self, event) -> int: - """Returns relation_id if relation broken event occurred due to a removed relation.""" - departed_relation_id = None - - # Only relation_deparated events can check if scaling down - departed_relation_id = event.relation.id - if not self.charm.has_departed_run(departed_relation_id): - logger.info( - "Deferring, must wait for relation departed hook to decide if relation should be removed." - ) - event.defer() - return + if isinstance(event, RelationBrokenEvent): + if not self.charm.has_departed_run(event.relation.id): + logger.info( + "Deferring, must wait for relation departed hook to decide if relation should be removed." + ) + event.defer() + return False - # check if were scaling down and add a log message - if self.charm.is_scaling_down(event.relation.id): - logger.info( - "Relation broken event occurring due to scale down, do not proceed to remove users." - ) - return + if not self.charm.proceed_on_broken_event(event): + return False - return departed_relation_id + return True def _on_relation_event(self, event): """Handles adding and removing of shards. @@ -181,9 +170,7 @@ def _on_relation_event(self, event): departed_relation_id = None if isinstance(event, RelationBrokenEvent): - departed_relation_id = self._proceed_on_broken_event(event) - if not departed_relation_id: - return + departed_relation_id = event.relation.id try: logger.info("Adding/Removing shards not present in cluster.") @@ -520,6 +507,11 @@ def _on_relation_changed(self, event): def pass_hook_checks(self, event): """Runs the pre-hooks checks for ConfigServerRequirer, returns True if all pass.""" + if not self.charm.db_initialised: + logger.info("Deferring %s. db is not initialised.", type(event)) + event.defer() + return False + if not self.charm.is_relation_feasible(self.relation_name): logger.info("Skipping event %s , relation not feasible.", type(event)) return False @@ -528,11 +520,6 @@ def pass_hook_checks(self, event): logger.info("skipping %s is only be executed by shards", type(event)) return False - if not self.charm.db_initialised: - logger.info("Deferring %s. db is not initialised.", type(event)) - event.defer() - return False - mongos_hosts = event.relation.data[event.relation.app].get(HOSTS_KEY, None) if isinstance(event, RelationBrokenEvent) and not mongos_hosts: logger.info("Config-server relation never set up, no need to process broken event.") diff --git a/src/charm.py b/src/charm.py index 288f5b953..6c0477577 100755 --- a/src/charm.py +++ b/src/charm.py @@ -1319,10 +1319,30 @@ def set_scaling_down(self, event: RelationDepartedEvent) -> bool: # check if relation departed is due to current unit being removed. (i.e. scaling down the # application.) rel_departed_key = self._generate_relation_departed_key(event.relation.id) - scaling_down = json.dumps(event.departing_unit == self.unit) - self.unit_peer_data[rel_departed_key] = scaling_down + scaling_down = event.departing_unit == self.unit + self.unit_peer_data[rel_departed_key] = json.dumps(scaling_down) return scaling_down + def proceed_on_broken_event(self, event) -> bool: + """Returns relation_id if relation broken event occurred due to a removed relation.""" + # Only relation_deparated events can check if scaling down + departed_relation_id = event.relation.id + if not self.has_departed_run(departed_relation_id): + logger.info( + "Deferring, must wait for relation departed hook to decide if relation should be removed." + ) + event.defer() + return False + + # check if were scaling down and add a log message + if self.is_scaling_down(departed_relation_id): + logger.info( + "Relation broken event occurring due to scale down, do not proceed to remove users." + ) + return False + + return True + @staticmethod def _generate_relation_departed_key(rel_id: int) -> str: """Generates the relation departed key for a specified relation id.""" diff --git a/src/config.py b/src/config.py index 95263189a..f0295efc9 100644 --- a/src/config.py +++ b/src/config.py @@ -16,21 +16,21 @@ class Config: MONGODB_SNAP_DATA_DIR = "/var/snap/charmed-mongodb/current" MONGOD_CONF_DIR = f"{MONGODB_SNAP_DATA_DIR}/etc/mongod" MONGOD_CONF_FILE_PATH = f"{MONGOD_CONF_DIR}/mongod.conf" - SNAP_PACKAGES = [("charmed-mongodb", "6/edge", 87)] - - class Role: - """Role config names for MongoDB Charm.""" - - CONFIG_SERVER = "config-server" - REPLICATION = "replication" - SHARD = "shard" + SNAP_PACKAGES = [("charmed-mongodb", "6/edge", 93)] + # Keep these alphabetically sorted class Actions: """Actions related config for MongoDB Charm.""" PASSWORD_PARAM_NAME = "password" USERNAME_PARAM_NAME = "username" + class AuditLog: + """Audit log related configuration.""" + + FORMAT = "JSON" + FILE_NAME = "audit.log" + class Backup: """Backup related config for MongoDB Charm.""" @@ -80,6 +80,13 @@ class Relations: DB_RELATIONS = [OBSOLETE_RELATIONS_NAME, NAME] Scopes = Literal[APP_SCOPE, UNIT_SCOPE] + class Role: + """Role config names for MongoDB Charm.""" + + CONFIG_SERVER = "config-server" + REPLICATION = "replication" + SHARD = "shard" + class Secrets: """Secrets related constants.""" diff --git a/tests/integration/backup_tests/helpers.py b/tests/integration/backup_tests/helpers.py index a5ff7a633..7ffcc4edd 100644 --- a/tests/integration/backup_tests/helpers.py +++ b/tests/integration/backup_tests/helpers.py @@ -9,6 +9,7 @@ from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed from ..ha_tests import helpers as ha_helpers +from ..helpers import get_app_name S3_APP_NAME = "s3-integrator" TIMEOUT = 10 * 60 @@ -62,30 +63,12 @@ async def create_and_verify_backup(ops_test: OpsTest) -> None: async def get_leader_unit(ops_test: OpsTest, db_app_name=None) -> ops.model.Unit: """Returns the leader unit of the database charm.""" - db_app_name = db_app_name or await app_name(ops_test) + db_app_name = db_app_name or await get_app_name(ops_test) for unit in ops_test.model.applications[db_app_name].units: if await unit.is_leader_from_status(): return unit -async def app_name(ops_test: OpsTest) -> str: - """Returns the name of the cluster running MongoDB. - - This is important since not all deployments of the MongoDB charm have the application name - "mongodb". - - Note: if multiple clusters are running MongoDB this will return the one first found. - """ - status = await ops_test.model.get_status() - for app in ops_test.model.applications: - # note that format of the charm field is not exactly "mongodb" but instead takes the form - # of `local:focal/mongodb-6` - if "mongodb" in status["applications"][app]["charm"]: - return app - - return None - - async def count_logical_backups(db_unit: ops.model.Unit) -> int: """Count the number of logical backups.""" action = await db_unit.run_action(action_name="list-backups") @@ -142,11 +125,11 @@ def is_relation_joined(ops_test: OpsTest, endpoint_one: str, endpoint_two: str) async def insert_unwanted_data(ops_test: OpsTest) -> None: """Inserts the data into the MongoDB cluster via primary replica.""" - app = await app_name(ops_test) - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] + app_name = await get_app_name(ops_test) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] primary = (await ha_helpers.replica_set_primary(ip_addresses, ops_test)).public_address - password = await ha_helpers.get_password(ops_test, app) - client = MongoClient(ha_helpers.unit_uri(primary, password, app), directConnection=True) + password = await ha_helpers.get_password(ops_test, app_name) + client = MongoClient(ha_helpers.unit_uri(primary, password, app_name), directConnection=True) db = client["new-db"] test_collection = db["test_collection"] test_collection.insert_one({"unwanted_data": "bad data 1"}) diff --git a/tests/integration/backup_tests/test_backups.py b/tests/integration/backup_tests/test_backups.py index b7cb64d48..4d73a6040 100644 --- a/tests/integration/backup_tests/test_backups.py +++ b/tests/integration/backup_tests/test_backups.py @@ -2,6 +2,7 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. import asyncio +import logging import secrets import string import time @@ -10,6 +11,8 @@ from pytest_operator.plugin import OpsTest from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed +from tests.integration.helpers import get_app_name + from ..ha_tests import helpers as ha_helpers from . import helpers @@ -18,6 +21,8 @@ ENDPOINT = "s3-credentials" NEW_CLUSTER = "new-mongodb" +logger = logging.getLogger(__name__) + @pytest.fixture() async def continuous_writes_to_db(ops_test: OpsTest): @@ -43,7 +48,7 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: """Build and deploy one unit of MongoDB.""" # it is possible for users to provide their own cluster for testing. Hence check if there # is a pre-existing cluster. - if not await helpers.app_name(ops_test): + if not await get_app_name(ops_test): db_charm = await ops_test.build_charm(".") await ops_test.model.deploy(db_charm, num_units=3) @@ -56,7 +61,7 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: @pytest.mark.abort_on_fail async def test_blocked_incorrect_creds(ops_test: OpsTest) -> None: """Verifies that the charm goes into blocked status when s3 creds are incorrect.""" - db_app_name = await helpers.app_name(ops_test) + db_app_name = await get_app_name(ops_test) # set incorrect s3 credentials s3_integrator_unit = ops_test.model.applications[S3_APP_NAME].units[0] @@ -73,11 +78,12 @@ async def test_blocked_incorrect_creds(ops_test: OpsTest) -> None: ) # verify that Charmed MongoDB is blocked and reports incorrect credentials - await asyncio.gather( - ops_test.model.wait_for_idle(apps=[S3_APP_NAME], status="active"), - ops_test.model.wait_for_idle(apps=[db_app_name], status="blocked", idle_period=20), - ) - db_unit = ops_test.model.applications[db_app_name].units[0] + async with ops_test.fast_forward(): + await asyncio.gather( + ops_test.model.wait_for_idle(apps=[S3_APP_NAME], status="active"), + ops_test.model.wait_for_idle(apps=[db_app_name], status="blocked", idle_period=20), + ) + db_unit = ops_test.model.applications[db_app_name].units[0] assert db_unit.workload_status_message == "s3 credentials are incorrect." @@ -85,16 +91,18 @@ async def test_blocked_incorrect_creds(ops_test: OpsTest) -> None: @pytest.mark.abort_on_fail async def test_blocked_incorrect_conf(ops_test: OpsTest) -> None: """Verifies that the charm goes into blocked status when s3 config options are incorrect.""" - db_app_name = await helpers.app_name(ops_test) + db_app_name = await get_app_name(ops_test) # set correct AWS credentials for s3 storage but incorrect configs await helpers.set_credentials(ops_test, cloud="AWS") # wait for both applications to be idle with the correct statuses - await asyncio.gather( - ops_test.model.wait_for_idle(apps=[S3_APP_NAME], status="active"), - ops_test.model.wait_for_idle(apps=[db_app_name], status="blocked", idle_period=20), - ) + async with ops_test.fast_forward(): + await asyncio.gather( + ops_test.model.wait_for_idle(apps=[S3_APP_NAME], status="active"), + ops_test.model.wait_for_idle(apps=[db_app_name], status="blocked", idle_period=20), + ) + db_unit = ops_test.model.applications[db_app_name].units[0] assert db_unit.workload_status_message == "s3 configurations are incompatible." @@ -102,7 +110,7 @@ async def test_blocked_incorrect_conf(ops_test: OpsTest) -> None: @pytest.mark.abort_on_fail async def test_ready_correct_conf(ops_test: OpsTest) -> None: """Verifies charm goes into active status when s3 config and creds options are correct.""" - db_app_name = await helpers.app_name(ops_test) + db_app_name = await get_app_name(ops_test) choices = string.ascii_letters + string.digits unique_path = "".join([secrets.choice(choices) for _ in range(4)]) configuration_parameters = { @@ -124,16 +132,19 @@ async def test_ready_correct_conf(ops_test: OpsTest) -> None: @pytest.mark.abort_on_fail async def test_create_and_list_backups(ops_test: OpsTest) -> None: - db_unit = await helpers.get_leader_unit(ops_test) - + db_app_name = await get_app_name(ops_test) + leader_unit = await helpers.get_leader_unit(ops_test, db_app_name=db_app_name) + await helpers.set_credentials(ops_test, cloud="AWS") # verify backup list works - action = await db_unit.run_action(action_name="list-backups") + logger.error("!!!!! test_create_and_list_backups >>> %s", leader_unit) + action = await leader_unit.run_action(action_name="list-backups") list_result = await action.wait() + logger.error("!!!!! test_create_and_list_backups >>> %s", list_result.results) backups = list_result.results["backups"] assert backups, "backups not outputted" # verify backup is started - action = await db_unit.run_action(action_name="create-backup") + action = await leader_unit.run_action(action_name="create-backup") backup_result = await action.wait() assert "backup started" in backup_result.results["backup-status"], "backup didn't start" @@ -144,7 +155,7 @@ async def test_create_and_list_backups(ops_test: OpsTest) -> None: try: for attempt in Retrying(stop=stop_after_delay(20), wait=wait_fixed(3)): with attempt: - backups = await helpers.count_logical_backups(db_unit) + backups = await helpers.count_logical_backups(leader_unit) assert backups == 1 except RetryError: assert backups == 1, "Backup not created." @@ -158,7 +169,7 @@ async def test_multi_backup(ops_test: OpsTest, continuous_writes_to_db) -> None: from AWS to GCP. This test verifies that the first backup in AWS is made, the second backup in GCP is made, and that before the second backup is made that pbm correctly resyncs. """ - db_app_name = await helpers.app_name(ops_test) + db_app_name = await get_app_name(ops_test) db_unit = await helpers.get_leader_unit(ops_test) # create first backup once ready @@ -244,7 +255,7 @@ async def test_restore(ops_test: OpsTest, add_writes_to_db) -> None: assert number_writes > 0, "no writes to backup" # create a backup in the AWS bucket - db_app_name = await helpers.app_name(ops_test) + db_app_name = await get_app_name(ops_test) db_unit = await helpers.get_leader_unit(ops_test) prev_backups = await helpers.count_logical_backups(db_unit) action = await db_unit.run_action(action_name="create-backup") @@ -293,7 +304,7 @@ async def test_restore(ops_test: OpsTest, add_writes_to_db) -> None: @pytest.mark.parametrize("cloud_provider", ["AWS", "GCP"]) async def test_restore_new_cluster(ops_test: OpsTest, add_writes_to_db, cloud_provider): # configure test for the cloud provider - db_app_name = await helpers.app_name(ops_test) + db_app_name = await get_app_name(ops_test) await helpers.set_credentials(ops_test, cloud=cloud_provider) if cloud_provider == "AWS": configuration_parameters = { @@ -380,7 +391,7 @@ async def test_restore_new_cluster(ops_test: OpsTest, add_writes_to_db, cloud_pr @pytest.mark.abort_on_fail async def test_update_backup_password(ops_test: OpsTest) -> None: """Verifies that after changing the backup password the pbm tool is updated and functional.""" - db_app_name = await helpers.app_name(ops_test) + db_app_name = await get_app_name(ops_test) db_unit = await helpers.get_leader_unit(ops_test) # wait for charm to be idle before setting password diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index fde1f58cc..becbe9d99 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -2,6 +2,7 @@ # See LICENSE file for licensing details. import json +import logging import subprocess import time from datetime import datetime @@ -25,6 +26,9 @@ wait_fixed, ) +from ..helpers import get_app_name, get_unit_ip, instance_ip + +# TODO move these to a separate file for constants \ config METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) PORT = 27017 APP_NAME = METADATA["name"] @@ -37,6 +41,8 @@ EXPORTER_PROC = "/usr/bin/mongodb_exporter" GREP_PROC = "grep" +logger = logging.getLogger(__name__) + class ProcessError(Exception): """Raised when a process fails.""" @@ -46,33 +52,33 @@ class ProcessRunningError(Exception): """Raised when a process is running when it is not expected to be.""" -def replica_set_client(replica_ips: List[str], password: str, app=APP_NAME) -> MongoClient: +def replica_set_client(replica_ips: List[str], password: str, app_name: str) -> MongoClient: """Generates the replica set URI for multiple IP addresses. Args: replica_ips: list of ips hosting the replica set. password: password of database. - app: name of application which hosts the cluster. + app_name: name of application which hosts the cluster. """ hosts = ["{}:{}".format(replica_ip, PORT) for replica_ip in replica_ips] hosts = ",".join(hosts) - replica_set_uri = f"mongodb://operator:" f"{password}@" f"{hosts}/admin?replicaSet={app}" + replica_set_uri = f"mongodb://operator:" f"{password}@" f"{hosts}/admin?replicaSet={app_name}" return MongoClient(replica_set_uri) -async def fetch_replica_set_members(replica_ips: List[str], ops_test: OpsTest): +async def fetch_replica_set_members(replica_ips: List[str], ops_test: OpsTest, app_name: str): """Fetches the IPs listed as replica set members in the MongoDB replica set configuration. Args: replica_ips: list of ips hosting the replica set. ops_test: reference to deployment. - app: name of application which has the cluster. + app_name: name of application which has the cluster. """ # connect to replica set uri - app = await app_name(ops_test) - password = await get_password(ops_test, app) - client = replica_set_client(replica_ips, password, app) + app_name = app_name or await get_app_name(ops_test) + password = await get_password(ops_test, app_name) + client = replica_set_client(replica_ips, password, app_name) # get ips from MongoDB replica set configuration rs_config = client.admin.command("replSetGetConfig") @@ -86,43 +92,44 @@ async def fetch_replica_set_members(replica_ips: List[str], ops_test: OpsTest): return member_ips -def unit_uri(ip_address: str, password, app=APP_NAME) -> str: +def unit_uri(ip_address: str, password, app_name=APP_NAME) -> str: """Generates URI that is used by MongoDB to connect to a single replica. Args: ip_address: ip address of replica/unit password: password of database. - app: name of application which has the cluster. + app_name: name of application which has the cluster. """ - return f"mongodb://operator:" f"{password}@" f"{ip_address}:{PORT}/admin?replicaSet={app}" + return f"mongodb://operator:" f"{password}@" f"{ip_address}:{PORT}/admin?replicaSet={app_name}" -async def get_password(ops_test: OpsTest, app, down_unit=None) -> str: +# TODO remove this duplicate with helpers.py +async def get_password(ops_test: OpsTest, app_name, down_unit=None) -> str: """Use the charm action to retrieve the password from provided unit. Returns: String with the password stored on the peer relation databag. """ # some tests disable the network for units, so find a unit that is available - for unit in ops_test.model.applications[app].units: + for unit in ops_test.model.applications[app_name].units: if not unit.name == down_unit: unit_id = unit.name.split("/")[1] break - action = await ops_test.model.units.get(f"{app}/{unit_id}").run_action("get-password") + action = await ops_test.model.units.get(f"{app_name}/{unit_id}").run_action("get-password") action = await action.wait() return action.results["password"] async def fetch_primary( - replica_set_hosts: List[str], ops_test: OpsTest, down_unit=None, app=None + replica_set_hosts: List[str], ops_test: OpsTest, down_unit=None, app_name=None ) -> str: """Returns IP address of current replica set primary.""" # connect to MongoDB client - app = app or await app_name(ops_test) + app_name = app_name or await get_app_name(ops_test) - password = await get_password(ops_test, app, down_unit) - client = replica_set_client(replica_set_hosts, password, app) + password = await get_password(ops_test, app_name, down_unit) + client = replica_set_client(replica_set_hosts, password, app_name) # grab the replica set status try: @@ -143,13 +150,16 @@ async def fetch_primary( return primary -async def count_primaries(ops_test: OpsTest) -> int: +# TODO remove duplication with common helpers +async def count_primaries(ops_test: OpsTest, password: str = None, app_name: str = None) -> int: """Returns the number of primaries in a replica set.""" # connect to MongoDB client - app = await app_name(ops_test) - password = await get_password(ops_test, app) - replica_set_hosts = [unit.public_address for unit in ops_test.model.applications[app].units] - client = replica_set_client(replica_set_hosts, password, app) + app_name = app_name or await get_app_name(ops_test) + password = password or await get_password(ops_test, app_name) + replica_set_hosts = [ + unit.public_address for unit in ops_test.model.applications[app_name].units + ] + client = replica_set_client(replica_set_hosts, password, app_name) # grab the replica set status try: @@ -178,29 +188,29 @@ async def replica_set_primary( replica_set_hosts: List[str], ops_test: OpsTest, down_unit=None, - app=None, + app_name=None, ) -> Optional[ops.model.Unit]: """Returns the primary of the replica set. Retrying 5 times to give the replica set time to elect a new primary, also checks against the valid_ips to verify that the primary is not outdated. """ - app = app or await app_name(ops_test) - primary_ip = await fetch_primary(replica_set_hosts, ops_test, down_unit, app) + app_name = app_name or await get_app_name(ops_test) + primary_ip = await fetch_primary(replica_set_hosts, ops_test, down_unit, app_name) # return None if primary is no longer in the replica set if primary_ip is not None and primary_ip not in replica_set_hosts: return None - for unit in ops_test.model.applications[app].units: + for unit in ops_test.model.applications[app_name].units: if unit.public_address == str(primary_ip): return unit -async def retrieve_entries(ops_test, app, db_name, collection_name, query_field): +async def retrieve_entries(ops_test, app_name, db_name, collection_name, query_field): """Retries entries from a specified collection within a specified database.""" - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] - password = await get_password(ops_test, app) - client = replica_set_client(ip_addresses, password, app) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] + password = await get_password(ops_test, app_name) + client = replica_set_client(ip_addresses, password, app_name) db = client[db_name] test_collection = db[collection_name] @@ -215,45 +225,27 @@ async def retrieve_entries(ops_test, app, db_name, collection_name, query_field) return cluster_entries -async def find_unit(ops_test: OpsTest, leader: bool) -> ops.model.Unit: +async def find_unit(ops_test: OpsTest, leader: bool, app_name=None) -> ops.model.Unit: """Helper function identifies the a unit, based on need for leader or non-leader.""" ret_unit = None - app = await app_name(ops_test) - for unit in ops_test.model.applications[app].units: + app_name = app_name or await get_app_name(ops_test) + for unit in ops_test.model.applications[app_name].units: if await unit.is_leader_from_status() == leader: ret_unit = unit return ret_unit -async def app_name(ops_test: OpsTest) -> str: - """Returns the name of the cluster running MongoDB. - - This is important since not all deployments of the MongoDB charm have the application name - "mongodb". - - Note: if multiple clusters are running MongoDB this will return the one first found. - """ - status = await ops_test.model.get_status() - for app in ops_test.model.applications: - # note that format of the charm field is not exactly "mongodb" but instead takes the form - # of `local:focal/mongodb-6` - if "mongodb" in status["applications"][app]["charm"]: - return app - - return None - - async def clear_db_writes(ops_test: OpsTest) -> bool: """Stop the DB process and remove any writes to the test collection.""" await stop_continous_writes(ops_test) # remove collection from database - app = await app_name(ops_test) - password = await get_password(ops_test, app) - hosts = [unit.public_address for unit in ops_test.model.applications[app].units] + app_name = await get_app_name(ops_test) + password = await get_password(ops_test, app_name) + hosts = [unit.public_address for unit in ops_test.model.applications[app_name].units] hosts = ",".join(hosts) - connection_string = f"mongodb://operator:{password}@{hosts}/admin?replicaSet={app}" + connection_string = f"mongodb://operator:{password}@{hosts}/admin?replicaSet={app_name}" client = MongoClient(connection_string) db = client["new-db"] @@ -274,11 +266,11 @@ async def start_continous_writes(ops_test: OpsTest, starting_number: int) -> Non In the future this should be put in a dummy charm. """ - app = await app_name(ops_test) - password = await get_password(ops_test, app) - hosts = [unit.public_address for unit in ops_test.model.applications[app].units] + app_name = await get_app_name(ops_test) + password = await get_password(ops_test, app_name) + hosts = [unit.public_address for unit in ops_test.model.applications[app_name].units] hosts = ",".join(hosts) - connection_string = f"mongodb://operator:{password}@{hosts}/admin?replicaSet={app}" + connection_string = f"mongodb://operator:{password}@{hosts}/admin?replicaSet={app_name}" # run continuous writes in the background. subprocess.Popen( @@ -291,7 +283,7 @@ async def start_continous_writes(ops_test: OpsTest, starting_number: int) -> Non ) -async def stop_continous_writes(ops_test: OpsTest, down_unit=None) -> int: +async def stop_continous_writes(ops_test: OpsTest, down_unit=None, app_name=None) -> int: """Stops continuous writes to MongoDB and returns the last written value. In the future this should be put in a dummy charm. @@ -302,11 +294,11 @@ async def stop_continous_writes(ops_test: OpsTest, down_unit=None) -> int: # wait for process to be killed proc.communicate() - app = await app_name(ops_test) - password = await get_password(ops_test, app, down_unit) - hosts = [unit.public_address for unit in ops_test.model.applications[app].units] + app_name = app_name or await get_app_name(ops_test) + password = await get_password(ops_test, app_name, down_unit) + hosts = [unit.public_address for unit in ops_test.model.applications[app_name].units] hosts = ",".join(hosts) - connection_string = f"mongodb://operator:{password}@{hosts}/admin?replicaSet={app}" + connection_string = f"mongodb://operator:{password}@{hosts}/admin?replicaSet={app_name}" client = MongoClient(connection_string) db = client["new-db"] @@ -318,13 +310,13 @@ async def stop_continous_writes(ops_test: OpsTest, down_unit=None) -> int: return last_written_value -async def count_writes(ops_test: OpsTest, down_unit=None) -> int: +async def count_writes(ops_test: OpsTest, down_unit=None, app_name=None) -> int: """New versions of pymongo no longer support the count operation, instead find is used.""" - app = await app_name(ops_test) - password = await get_password(ops_test, app, down_unit) - hosts = [unit.public_address for unit in ops_test.model.applications[app].units] + app_name = app_name or await get_app_name(ops_test) + password = await get_password(ops_test, app_name, down_unit) + hosts = [unit.public_address for unit in ops_test.model.applications[app_name].units] hosts = ",".join(hosts) - connection_string = f"mongodb://operator:{password}@{hosts}/admin?replicaSet={app}" + connection_string = f"mongodb://operator:{password}@{hosts}/admin?replicaSet={app_name}" client = MongoClient(connection_string) db = client["new-db"] @@ -334,13 +326,13 @@ async def count_writes(ops_test: OpsTest, down_unit=None) -> int: return count -async def secondary_up_to_date(ops_test: OpsTest, unit_ip, expected_writes) -> bool: +async def secondary_up_to_date(ops_test: OpsTest, unit_ip, expected_writes, app_name=None) -> bool: """Checks if secondary is up to date with the cluster. Retries over the period of one minute to give secondary adequate time to copy over data. """ - app = await app_name(ops_test) - password = await get_password(ops_test, app) + app_name = app_name or await get_app_name(ops_test) + password = await get_password(ops_test, app_name) connection_string = f"mongodb://operator:{password}@{unit_ip}:{PORT}/admin?" client = MongoClient(connection_string, directConnection=True) @@ -407,29 +399,29 @@ def storage_id(ops_test, unit_name): return line.split()[1] -async def add_unit_with_storage(ops_test, app, storage): +async def add_unit_with_storage(ops_test, app_name, storage): """Adds unit with storage. Note: this function exists as a temporary solution until this issue is resolved: https://github.com/juju/python-libjuju/issues/695 """ - expected_units = len(ops_test.model.applications[app].units) + 1 - prev_units = [unit.name for unit in ops_test.model.applications[app].units] + expected_units = len(ops_test.model.applications[app_name].units) + 1 + prev_units = [unit.name for unit in ops_test.model.applications[app_name].units] model_name = ops_test.model.info.name - add_unit_cmd = f"add-unit {app} --model={model_name} --attach-storage={storage}".split() + add_unit_cmd = f"add-unit {app_name} --model={model_name} --attach-storage={storage}".split() await ops_test.juju(*add_unit_cmd) - await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1000) + await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) assert ( - len(ops_test.model.applications[app].units) == expected_units + len(ops_test.model.applications[app_name].units) == expected_units ), "New unit not added to model" # verify storage attached - curr_units = [unit.name for unit in ops_test.model.applications[app].units] + curr_units = [unit.name for unit in ops_test.model.applications[app_name].units] new_unit = list(set(curr_units) - set(prev_units))[0] assert storage_id(ops_test, new_unit) == storage, "unit added with incorrect storage" # return a reference to newly added unit - for unit in ops_test.model.applications[app].units: + for unit in ops_test.model.applications[app_name].units: if unit.name == new_unit: return unit @@ -467,26 +459,26 @@ async def reused_storage(ops_test: OpsTest, unit_name, removal_time) -> bool: return False -async def insert_focal_to_cluster(ops_test: OpsTest) -> None: +async def insert_focal_to_cluster(ops_test: OpsTest, app_name=None) -> None: """Inserts the Focal Fossa data into the MongoDB cluster via primary replica.""" - app = await app_name(ops_test) - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] + app_name = app_name or await get_app_name(ops_test) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] primary = (await replica_set_primary(ip_addresses, ops_test)).public_address - password = await get_password(ops_test, app) - client = MongoClient(unit_uri(primary, password, app), directConnection=True) + password = await get_password(ops_test, app_name) + client = MongoClient(unit_uri(primary, password, app_name), directConnection=True) db = client["new-db"] test_collection = db["test_ubuntu_collection"] test_collection.insert_one({"release_name": "Focal Fossa", "version": 20.04, "LTS": True}) client.close() -async def kill_unit_process(ops_test: OpsTest, unit_name: str, kill_code: str): +async def kill_unit_process(ops_test: OpsTest, unit_name: str, kill_code: str, app_name=None): """Kills the DB process on the unit according to the provided kill code.""" # killing the only replica can be disastrous - app = await app_name(ops_test) - if len(ops_test.model.applications[app].units) < 2: - await ops_test.model.applications[app].add_unit(count=1) - await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1000) + app_name = app_name or await get_app_name(ops_test) + if len(ops_test.model.applications[app_name].units) < 2: + await ops_test.model.applications[app_name].add_unit(count=1) + await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) kill_cmd = f"exec --unit {unit_name} -- pkill --signal {kill_code} -f {DB_PROCESS}" return_code, _, _ = await ops_test.juju(*kill_cmd.split()) @@ -496,11 +488,11 @@ async def kill_unit_process(ops_test: OpsTest, unit_name: str, kill_code: str): ) -async def mongod_ready(ops_test, unit_ip) -> bool: +async def mongod_ready(ops_test, unit_ip, app_name=None) -> bool: """Verifies replica is running and available.""" - app = await app_name(ops_test) - password = await get_password(ops_test, app) - client = MongoClient(unit_uri(unit_ip, password, app), directConnection=True) + app_name = app_name or await get_app_name(ops_test) + password = await get_password(ops_test, app_name) + client = MongoClient(unit_uri(unit_ip, password, app_name), directConnection=True) try: for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3)): with attempt: @@ -514,10 +506,10 @@ async def mongod_ready(ops_test, unit_ip) -> bool: return True -async def db_step_down(ops_test: OpsTest, old_primary_unit: str, sigterm_time: int): +async def db_step_down(ops_test: OpsTest, old_primary_unit: str, sigterm_time: int, app_name=None): # loop through all units that aren't the old primary - app = await app_name(ops_test) - for unit in ops_test.model.applications[app].units: + app_name = app_name or await get_app_name(ops_test) + for unit in ops_test.model.applications[app_name].units: # verify log file exists on this machine search_file = f"exec --unit {unit.name} ls {MONGODB_LOG_PATH}" return_code, _, _ = await ops_test.juju(*search_file.split()) @@ -549,14 +541,14 @@ async def db_step_down(ops_test: OpsTest, old_primary_unit: str, sigterm_time: i return False -async def all_db_processes_down(ops_test: OpsTest) -> bool: +async def all_db_processes_down(ops_test: OpsTest, app_name=None) -> bool: """Verifies that all units of the charm do not have the DB process running.""" - app = await app_name(ops_test) + app_name = app_name or await get_app_name(ops_test) try: for attempt in Retrying(stop=stop_after_attempt(60), wait=wait_fixed(3)): with attempt: - for unit in ops_test.model.applications[app].units: + for unit in ops_test.model.applications[app_name].units: search_db_process = f"exec --unit {unit.name} pgrep -x mongod" _, processes, _ = await ops_test.juju(*search_db_process.split()) # splitting processes by "\n" results in one or more empty lines, hence we @@ -664,28 +656,29 @@ async def start_mongod(ops_test: OpsTest, unit) -> None: @retry(stop=stop_after_attempt(8), wait=wait_fixed(15)) -async def verify_replica_set_configuration(ops_test: OpsTest) -> None: +async def verify_replica_set_configuration(ops_test: OpsTest, app_name=None) -> None: """Verifies presence of primary, replica set members, and number of primaries.""" - app = await app_name(ops_test) + app_name = app_name or await get_app_name(ops_test) # `get_unit_ip` is used instead of `.public_address` because of a bug in python-libjuju that # incorrectly reports the IP addresses after the network is restored this is reported as a # bug here: https://github.com/juju/python-libjuju/issues/738 . Once this bug is resolved use # of `get_unit_ip` should be replaced with `.public_address` ip_addresses = [ - await get_unit_ip(ops_test, unit.name) for unit in ops_test.model.applications[app].units + await get_unit_ip(ops_test, unit.name) + for unit in ops_test.model.applications[app_name].units ] # verify presence of primary - new_primary = await replica_set_primary(ip_addresses, ops_test) + new_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) assert new_primary.name, "primary not elected." # verify all units are running under the same replset - member_ips = await fetch_replica_set_members(ip_addresses, ops_test) + member_ips = await fetch_replica_set_members(ip_addresses, ops_test, app_name=app_name) assert set(member_ips) == set(ip_addresses), "all members not running under the same replset" # verify there is only one primary assert ( - await count_primaries(ops_test) == 1 + await count_primaries(ops_test, app_name=app_name) == 1 ), "there are more than one primary in the replica set." @@ -750,37 +743,6 @@ def is_machine_reachable_from(origin_machine: str, target_machine: str) -> bool: return False -async def unit_hostname(ops_test: OpsTest, unit_name: str) -> str: - """Get hostname for a unit. - - Args: - ops_test: The ops test object passed into every test case - unit_name: The name of the unit to be tested - - Returns: - The machine/container hostname - """ - _, raw_hostname, _ = await ops_test.juju("ssh", unit_name, "hostname") - return raw_hostname.strip() - - -def instance_ip(model: str, instance: str) -> str: - """Translate juju instance name to IP. - - Args: - model: The name of the model - instance: The name of the instance - - Returns: - The (str) IP address of the instance - """ - output = subprocess.check_output(f"juju machines --model {model}".split()) - - for line in output.decode("utf8").splitlines(): - if instance in line: - return line.split()[2] - - @retry(stop=stop_after_attempt(60), wait=wait_fixed(15)) def wait_network_restore(model_name: str, hostname: str, old_ip: str) -> None: """Wait until network is restored. @@ -794,18 +756,45 @@ def wait_network_restore(model_name: str, hostname: str, old_ip: str) -> None: raise Exception("Network not restored, IP address has not changed yet.") -async def get_unit_ip(ops_test: OpsTest, unit_name: str) -> str: - """Wrapper for getting unit ip. +async def scale_and_verify(ops_test: OpsTest, count: int, remove_leader: bool = False): + if count == 0: + logger.warning("Skipping scale up/down by 0") + return - Juju incorrectly reports the IP addresses after the network is restored this is reported as a - bug here: https://github.com/juju/python-libjuju/issues/738 . Once this bug is resolved use of - `get_unit_ip` should be replaced with `.public_address` + app_name = await get_app_name(ops_test) - Args: - ops_test: The ops test object passed into every test case - unit_name: The name of the unit to be tested + if count > 0: + logger.info(f"Scaling up by {count} units") + await ops_test.model.applications[app_name].add_units(count) + else: + logger.info(f"Scaling down by {abs(count)} units") + # find leader unit + leader_unit = await find_unit(ops_test, leader=True) + units_to_remove = [] + for unit in ops_test.model.applications[app_name].units: + if not remove_leader and unit.name == leader_unit.name: + continue + if len(units_to_remove) < abs(count): + units_to_remove.append(unit.name) + + logger.info(f"Units to remove {units_to_remove}") + await ops_test.model.applications[app_name].destroy_units(*units_to_remove) + logger.info("Waiting for idle") + await ops_test.model.wait_for_idle( + apps=[app_name], + status="active", + timeout=1000, + ) + logger.info("Validating replica set has primary") + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] + primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) - Returns: - The (str) ip of the unit - """ - return instance_ip(ops_test.model.info.name, await unit_hostname(ops_test, unit_name)) + assert primary is not None, "Replica set has no primary" + + +async def verify_writes(ops_test: OpsTest, app_name=None): + # verify that no writes to the db were missed + app_name = app_name or await get_app_name(ops_test) + total_expected_writes = await stop_continous_writes(ops_test) + actual_writes = await count_writes(ops_test, app_name) + assert total_expected_writes["number"] == actual_writes, "writes to the db were missed." diff --git a/tests/integration/ha_tests/test_ha.py b/tests/integration/ha_tests/test_ha.py index 36c2d937b..83bdcc690 100644 --- a/tests/integration/ha_tests/test_ha.py +++ b/tests/integration/ha_tests/test_ha.py @@ -3,6 +3,7 @@ # See LICENSE file for licensing details. import asyncio import logging +import os import time import pytest @@ -10,7 +11,50 @@ from pytest_operator.plugin import OpsTest from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed -from . import helpers +from ..helpers import ( + check_or_scale_app, + get_app_name, + get_unit_ip, + instance_ip, + unit_hostname, + unit_uri, +) +from .helpers import ( + MONGODB_LOG_PATH, + add_unit_with_storage, + all_db_processes_down, + clear_db_writes, + count_primaries, + count_writes, + cut_network_from_unit, + db_step_down, + fetch_replica_set_members, + find_unit, + get_controller_machine, + get_password, + insert_focal_to_cluster, + is_machine_reachable_from, + kill_unit_process, + mongod_ready, + replica_set_client, + replica_set_primary, + restore_network_for_unit, + retrieve_entries, + reused_storage, + scale_and_verify, + secondary_up_to_date, + start_continous_writes, + start_mongod, + stop_continous_writes, + stop_mongod, + storage_id, + storage_type, + update_restart_delay, + update_service_logging, + verify_replica_set_configuration, + verify_writes, + wait_network_restore, +) ANOTHER_DATABASE_APP_NAME = "another-database-a" MEDIAN_REELECTION_TIME = 12 @@ -19,70 +63,23 @@ logger = logging.getLogger(__name__) -@pytest.fixture() -async def continuous_writes(ops_test: OpsTest): - """Starts continuous write operations to MongoDB for test and clears writes at end of test.""" - await helpers.start_continous_writes(ops_test, 1) - yield - await helpers.clear_db_writes(ops_test) - - -@pytest.fixture() -async def reset_restart_delay(ops_test: OpsTest): - """Resets service file delay on all units.""" - yield - app = await helpers.app_name(ops_test) - for unit in ops_test.model.applications[app].units: - await helpers.update_restart_delay(ops_test, unit, ORIGINAL_RESTART_DELAY) - - -@pytest.fixture() -async def change_logging(ops_test: OpsTest): - """Enables appending logging for a test and resets the logging at the end of the test.""" - app = await helpers.app_name(ops_test) - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] - primary = await helpers.replica_set_primary(ip_addresses, ops_test) - - for unit in ops_test.model.applications[app].units: - # tests which use this fixture restart the primary. Therefore the primary should not be - # restarted as to leave the restart testing to the test itself. - if unit.name == primary.name: - continue - - # must restart unit to ensure that changes to logging are made - await helpers.stop_mongod(ops_test, unit) - await helpers.update_service_logging(ops_test, unit, logging=True) - await helpers.start_mongod(ops_test, unit) - - # sleep long enough for the mongod to start up correctly - time.sleep(15) - yield - - app = await helpers.app_name(ops_test) - for unit in ops_test.model.applications[app].units: - # must restart unit to ensure that changes to logging are made - await helpers.stop_mongod(ops_test, unit) - await helpers.update_service_logging(ops_test, unit, logging=False) - await helpers.start_mongod(ops_test, unit) - - # sleep long enough for the mongod to start up correctly - time.sleep(15) - - # remove the log file as to not clog up space on the replicas. - rm_cmd = f"exec --unit {unit.name} rm {helpers.MONGODB_LOG_PATH}" - await ops_test.juju(*rm_cmd.split()) - - +@pytest.mark.skipif( + os.environ.get("PYTEST_SKIP_DEPLOY", False), + reason="skipping deploy, model expected to be provided.", +) @pytest.mark.abort_on_fail async def test_build_and_deploy(ops_test: OpsTest) -> None: """Build and deploy one unit of MongoDB.""" # it is possible for users to provide their own cluster for HA testing. Hence check if there # is a pre-existing cluster. - if await helpers.app_name(ops_test): + required_units = 3 + user_app_name = await get_app_name(ops_test) + if user_app_name: + await check_or_scale_app(ops_test, user_app_name, required_units) return my_charm = await ops_test.build_charm(".") - await ops_test.model.deploy(my_charm, num_units=3) + await ops_test.model.deploy(my_charm, num_units=required_units) await ops_test.model.wait_for_idle() @@ -93,35 +90,35 @@ async def test_storage_re_use(ops_test, continuous_writes): properly uses the storage that was provided. (ie. doesn't just re-sync everything from primary, but instead computes a diff between current storage and primary storage.) """ - app = await helpers.app_name(ops_test) - if helpers.storage_type(ops_test, app) == "rootfs": + app_name = await get_app_name(ops_test) + if storage_type(ops_test, app_name) == "rootfs": pytest.skip( "reuse of storage can only be used on deployments with persistent storage not on rootfs deployments" ) # removing the only replica can be disastrous - if len(ops_test.model.applications[app].units) < 2: - await ops_test.model.applications[app].add_unit(count=1) - await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1000) + if len(ops_test.model.applications[app_name].units) < 2: + await ops_test.model.applications[app_name].add_unit(count=1) + await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) # remove a unit and attach it's storage to a new unit - unit = ops_test.model.applications[app].units[0] - unit_storage_id = helpers.storage_id(ops_test, unit.name) - expected_units = len(ops_test.model.applications[app].units) - 1 + unit = ops_test.model.applications[app_name].units[0] + unit_storage_id = storage_id(ops_test, unit.name) + expected_units = len(ops_test.model.applications[app_name].units) - 1 removal_time = time.time() await ops_test.model.destroy_unit(unit.name) await ops_test.model.wait_for_idle( - apps=[app], status="active", timeout=1000, wait_for_exact_units=expected_units + apps=[app_name], status="active", timeout=1000, wait_for_exact_units=expected_units ) - new_unit = await helpers.add_unit_with_storage(ops_test, app, unit_storage_id) + new_unit = await add_unit_with_storage(ops_test, app_name, unit_storage_id) - assert await helpers.reused_storage( + assert await reused_storage( ops_test, new_unit.public_address, removal_time ), "attached storage not properly reused by MongoDB." # verify that the no writes were skipped - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes @@ -133,28 +130,29 @@ async def test_add_units(ops_test: OpsTest, continuous_writes) -> None: MongoDB replica set configuration. """ # add units and wait for idle - app = await helpers.app_name(ops_test) - expected_units = len(ops_test.model.applications[app].units) + 2 - await ops_test.model.applications[app].add_unit(count=2) + app_name = await get_app_name(ops_test) + expected_units = len(ops_test.model.applications[app_name].units) + 2 + await ops_test.model.applications[app_name].add_unit(count=2) await ops_test.model.wait_for_idle( - apps=[app], status="active", timeout=1000, wait_for_exact_units=expected_units + apps=[app_name], status="active", timeout=1000, wait_for_exact_units=expected_units ) # grab unit ips - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] # connect to replica set uri and get replica set members - member_ips = await helpers.fetch_replica_set_members(ip_addresses, ops_test) + member_ips = await fetch_replica_set_members(ip_addresses, ops_test, app_name=app_name) # verify that the replica set members have the correct units assert set(member_ips) == set(ip_addresses) # verify that the no writes were skipped - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes +@pytest.mark.abort_on_fail @pytest.mark.abort_on_fail async def test_scale_down_capablities(ops_test: OpsTest, continuous_writes) -> None: """Tests clusters behavior when scaling down a minority and removing a primary replica. @@ -171,12 +169,12 @@ async def test_scale_down_capablities(ops_test: OpsTest, continuous_writes) -> N 5. deleting a non-leader unit is properly handled. """ deleted_unit_ips = [] - app = await helpers.app_name(ops_test) + app_name = await get_app_name(ops_test) units_to_remove = [] - minority_count = int(len(ops_test.model.applications[app].units) / 2) + minority_count = int(len(ops_test.model.applications[app_name].units) / 2) # find leader unit - leader_unit = await helpers.find_unit(ops_test, leader=True) + leader_unit = await find_unit(ops_test, leader=True, app_name=app_name) minority_count -= 1 # verify that we have a leader @@ -186,7 +184,7 @@ async def test_scale_down_capablities(ops_test: OpsTest, continuous_writes) -> N # find non-leader units to remove such that the largest minority possible is removed. avail_units = [] - for unit in ops_test.model.applications[app].units: + for unit in ops_test.model.applications[app_name].units: if not unit.name == leader_unit.name: avail_units.append(unit) @@ -196,20 +194,20 @@ async def test_scale_down_capablities(ops_test: OpsTest, continuous_writes) -> N units_to_remove.append(unit_to_remove.name) # destroy units simultaneously - expected_units = len(ops_test.model.applications[app].units) - len(units_to_remove) + expected_units = len(ops_test.model.applications[app_name].units) - len(units_to_remove) await ops_test.model.destroy_units(*units_to_remove) # wait for app to be active after removal of units await ops_test.model.wait_for_idle( - apps=[app], status="active", timeout=1000, wait_for_exact_units=expected_units + apps=[app_name], status="active", timeout=1000, wait_for_exact_units=expected_units ) # grab unit ips - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] # check that the replica set with the remaining units has a primary try: - primary = await helpers.replica_set_primary(ip_addresses, ops_test) + primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) except RetryError: primary = None @@ -222,28 +220,28 @@ async def test_scale_down_capablities(ops_test: OpsTest, continuous_writes) -> N ), "replica set primary is not one of the available units" # verify that the configuration of mongodb no longer has the deleted ip - member_ips = await helpers.fetch_replica_set_members(ip_addresses, ops_test) + member_ips = await fetch_replica_set_members(ip_addresses, ops_test, app_name=app_name) assert set(member_ips) == set(ip_addresses), "mongod config contains deleted units" # verify that the no writes were skipped - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes async def test_replication_across_members(ops_test: OpsTest, continuous_writes) -> None: """Check consistency, ie write to primary, read data from secondaries.""" # first find primary, write to primary, then read from each unit - await helpers.insert_focal_to_cluster(ops_test) - app = await helpers.app_name(ops_test) - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] - primary = await helpers.replica_set_primary(ip_addresses, ops_test) - password = await helpers.get_password(ops_test, app) + await insert_focal_to_cluster(ops_test) + app_name = await get_app_name(ops_test) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] + primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) + password = await get_password(ops_test, app_name) secondaries = set(ip_addresses) - set([primary.public_address]) for secondary in secondaries: - client = MongoClient(helpers.unit_uri(secondary, password, app), directConnection=True) + client = MongoClient(unit_uri(secondary, password, app_name), directConnection=True) db = client["new-db"] test_collection = db["test_ubuntu_collection"] @@ -253,15 +251,16 @@ async def test_replication_across_members(ops_test: OpsTest, continuous_writes) client.close() # verify that the no writes were skipped - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes async def test_unique_cluster_dbs(ops_test: OpsTest, continuous_writes) -> None: """Verify unique clusters do not share DBs.""" # first find primary, write to primary, - await helpers.insert_focal_to_cluster(ops_test) + app_name = await get_app_name(ops_test) + await insert_focal_to_cluster(ops_test, app_name=app_name) # deploy new cluster my_charm = await ops_test.build_charm(".") @@ -273,24 +272,24 @@ async def test_unique_cluster_dbs(ops_test: OpsTest, continuous_writes) -> None: unit.public_address for unit in ops_test.model.applications[ANOTHER_DATABASE_APP_NAME].units ] - password = await helpers.get_password(ops_test, app=ANOTHER_DATABASE_APP_NAME) - client = helpers.replica_set_client(ip_addresses, password, app=ANOTHER_DATABASE_APP_NAME) + password = await get_password(ops_test, app_name=ANOTHER_DATABASE_APP_NAME) + client = replica_set_client(ip_addresses, password, app_name=ANOTHER_DATABASE_APP_NAME) db = client["new-db"] test_collection = db["test_ubuntu_collection"] test_collection.insert_one({"release_name": "Jammy Jelly", "version": 22.04, "LTS": False}) client.close() - cluster_1_entries = await helpers.retrieve_entries( + cluster_1_entries = await retrieve_entries( ops_test, - app=ANOTHER_DATABASE_APP_NAME, + app_name=ANOTHER_DATABASE_APP_NAME, db_name="new-db", collection_name="test_ubuntu_collection", query_field="release_name", ) - cluster_2_entries = await helpers.retrieve_entries( + cluster_2_entries = await retrieve_entries( ops_test, - app=helpers.APP_NAME, + app_name=app_name, db_name="new-db", collection_name="test_ubuntu_collection", query_field="release_name", @@ -300,8 +299,8 @@ async def test_unique_cluster_dbs(ops_test: OpsTest, continuous_writes) -> None: assert len(common_entries) == 0, "Writes from one cluster are replicated to another cluster." # verify that the no writes were skipped - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes @@ -310,23 +309,25 @@ async def test_replication_member_scaling(ops_test: OpsTest, continuous_writes) Verify newly members have replicated data and newly removed members are gone without data. """ - # first find primary, write to primary, - await helpers.insert_focal_to_cluster(ops_test) + app_name = await get_app_name(ops_test) - app = await helpers.app_name(ops_test) + # first find primary, write to primary, + await insert_focal_to_cluster(ops_test, app_name=app_name) original_ip_addresses = [ - unit.public_address for unit in ops_test.model.applications[app].units + unit.public_address for unit in ops_test.model.applications[app_name].units ] - expected_units = len(ops_test.model.applications[app].units) + 1 - await ops_test.model.applications[app].add_unit(count=1) + expected_units = len(ops_test.model.applications[app_name].units) + 1 + await ops_test.model.applications[app_name].add_unit(count=1) await ops_test.model.wait_for_idle( - apps=[app], status="active", timeout=1000, wait_for_exact_units=expected_units + apps=[app_name], status="active", timeout=1000, wait_for_exact_units=expected_units ) - new_ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] + new_ip_addresses = [ + unit.public_address for unit in ops_test.model.applications[app_name].units + ] new_member_ip = list(set(new_ip_addresses) - set(original_ip_addresses))[0] - password = await helpers.get_password(ops_test, app) - client = MongoClient(helpers.unit_uri(new_member_ip, password, app), directConnection=True) + password = await get_password(ops_test, app_name) + client = MongoClient(unit_uri(new_member_ip, password, app_name), directConnection=True) # check for replicated data while retrying to give time for replica to copy over data. try: @@ -343,123 +344,122 @@ async def test_replication_member_scaling(ops_test: OpsTest, continuous_writes) client.close() # verify that the no writes were skipped - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes async def test_kill_db_process(ops_test, continuous_writes): # locate primary unit - app = await helpers.app_name(ops_test) - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] - primary = await helpers.replica_set_primary(ip_addresses, ops_test) + app_name = await get_app_name(ops_test) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] + primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) - await helpers.kill_unit_process(ops_test, primary.name, kill_code="SIGKILL") + await kill_unit_process(ops_test, primary.name, kill_code="SIGKILL", app_name=app_name) # verify new writes are continuing by counting the number of writes before and after a 5 second # wait - writes = await helpers.count_writes(ops_test) + writes = await count_writes(ops_test, app_name=app_name) time.sleep(5) - more_writes = await helpers.count_writes(ops_test) + more_writes = await count_writes(ops_test, app_name=app_name) assert more_writes > writes, "writes not continuing to DB" # sleep for twice the median election time time.sleep(MEDIAN_REELECTION_TIME * 2) # verify that db service got restarted and is ready - assert await helpers.mongod_ready(ops_test, primary.public_address) + assert await mongod_ready(ops_test, primary.public_address, app_name=app_name) # verify that a new primary gets elected (ie old primary is secondary) - new_primary = await helpers.replica_set_primary(ip_addresses, ops_test) + new_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) assert new_primary.name != primary.name # verify that no writes to the db were missed - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes, "writes to the db were missed." # verify that old primary is up to date. - assert await helpers.secondary_up_to_date( - ops_test, primary.public_address, total_expected_writes["number"] + assert await secondary_up_to_date( + ops_test, primary.public_address, total_expected_writes["number"], app_name=app_name ), "secondary not up to date with the cluster after restarting." async def test_freeze_db_process(ops_test, continuous_writes): # locate primary unit - app = await helpers.app_name(ops_test) - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] - primary = await helpers.replica_set_primary(ip_addresses, ops_test) - await helpers.kill_unit_process(ops_test, primary.name, kill_code="SIGSTOP") + app_name = await get_app_name(ops_test) + password = await get_password(ops_test, app_name) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] + primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) + await kill_unit_process(ops_test, primary.name, kill_code="SIGSTOP", app_name=app_name) # sleep for twice the median election time time.sleep(MEDIAN_REELECTION_TIME * 2) # verify that a new primary gets elected - new_primary = await helpers.replica_set_primary(ip_addresses, ops_test) + new_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) assert new_primary.name != primary.name # verify new writes are continuing by counting the number of writes before and after a 5 second # wait - writes = await helpers.count_writes(ops_test) + writes = await count_writes(ops_test, app_name=app_name) time.sleep(5) - more_writes = await helpers.count_writes(ops_test) - + more_writes = await count_writes(ops_test, app_name=app_name) # un-freeze the old primary - await helpers.kill_unit_process(ops_test, primary.name, kill_code="SIGCONT") + await kill_unit_process(ops_test, primary.name, kill_code="SIGCONT", app_name=app_name) # check this after un-freezing the old primary so that if this check fails we still "turned # back on" the mongod process assert more_writes > writes, "writes not continuing to DB" # verify that db service got restarted and is ready - assert await helpers.mongod_ready(ops_test, primary.public_address) + assert await mongod_ready(ops_test, primary.public_address, app_name=app_name) # verify all units are running under the same replset - member_ips = await helpers.fetch_replica_set_members(ip_addresses, ops_test) + member_ips = await fetch_replica_set_members(ip_addresses, ops_test, app_name=app_name) assert set(member_ips) == set(ip_addresses), "all members not running under the same replset" # verify there is only one primary after un-freezing old primary assert ( - await helpers.count_primaries(ops_test) == 1 + await count_primaries(ops_test, password=password, app_name=app_name) == 1 ), "there are more than one primary in the replica set." # verify that the old primary does not "reclaim" primary status after un-freezing old primary - new_primary = await helpers.replica_set_primary(ip_addresses, ops_test) + new_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) assert new_primary.name != primary.name, "un-frozen primary should be secondary." # verify that no writes were missed. - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert actual_writes == total_expected_writes["number"], "db writes missing." - # verify that old primary is up to date. - assert await helpers.secondary_up_to_date( - ops_test, primary.public_address, actual_writes + assert await secondary_up_to_date( + ops_test, primary.public_address, actual_writes, app_name=app_name ), "secondary not up to date with the cluster after restarting." async def test_restart_db_process(ops_test, continuous_writes, change_logging): # locate primary unit - app = await helpers.app_name(ops_test) - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] - old_primary = await helpers.replica_set_primary(ip_addresses, ops_test) + app_name = await get_app_name(ops_test) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] + old_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) # send SIGTERM, we expect `systemd` to restart the process sig_term_time = time.time() - await helpers.kill_unit_process(ops_test, old_primary.name, kill_code="SIGTERM") + await kill_unit_process(ops_test, old_primary.name, kill_code="SIGTERM", app_name=app_name) # verify new writes are continuing by counting the number of writes before and after a 5 second # wait - writes = await helpers.count_writes(ops_test) + writes = await count_writes(ops_test, app_name=app_name) time.sleep(5) - more_writes = await helpers.count_writes(ops_test) + more_writes = await count_writes(ops_test, app_name=app_name) assert more_writes > writes, "writes not continuing to DB" # verify that db service got restarted and is ready - assert await helpers.mongod_ready(ops_test, old_primary.public_address) + assert await mongod_ready(ops_test, old_primary.public_address, app_name=app_name) # verify that a new primary gets elected (ie old primary is secondary) - new_primary = await helpers.replica_set_primary(ip_addresses, ops_test) + new_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) assert new_primary.name != old_primary.name # verify that a stepdown was performed on restart. SIGTERM should send a graceful restart and @@ -467,146 +467,149 @@ async def test_restart_db_process(ops_test, continuous_writes, change_logging): try: for attempt in Retrying(stop=stop_after_delay(30), wait=wait_fixed(3)): with attempt: - assert await helpers.db_step_down( - ops_test, old_primary.name, sig_term_time + assert await db_step_down( + ops_test, old_primary.name, sig_term_time, app_name=app_name ), "old primary departed without stepping down." except RetryError: False, "old primary departed without stepping down." # verify that no writes were missed - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes # verify that old primary is up to date. - assert await helpers.secondary_up_to_date( - ops_test, old_primary.public_address, total_expected_writes["number"] + assert await secondary_up_to_date( + ops_test, old_primary.public_address, total_expected_writes["number"], app_name=app_name ), "secondary not up to date with the cluster after restarting." async def test_full_cluster_crash(ops_test: OpsTest, continuous_writes, reset_restart_delay): - app = await helpers.app_name(ops_test) + app_name = await get_app_name(ops_test) # update all units to have a new RESTART_DELAY, Modifying the Restart delay to 3 minutes # should ensure enough time for all replicas to be down at the same time. - for unit in ops_test.model.applications[app].units: - await helpers.update_restart_delay(ops_test, unit, RESTART_DELAY) + for unit in ops_test.model.applications[app_name].units: + await update_restart_delay(ops_test, unit, RESTART_DELAY) # kill all units "simultaneously" await asyncio.gather( *[ - helpers.kill_unit_process(ops_test, unit.name, kill_code="SIGKILL") - for unit in ops_test.model.applications[app].units + kill_unit_process(ops_test, unit.name, kill_code="SIGKILL", app_name=app_name) + for unit in ops_test.model.applications[app_name].units ] ) # This test serves to verify behavior when all replicas are down at the same time that when # they come back online they operate as expected. This check verifies that we meet the criteria # of all replicas being down at the same time. - assert await helpers.all_db_processes_down(ops_test), "Not all units down at the same time." + assert await all_db_processes_down( + ops_test, app_name=app_name + ), "Not all units down at the same time." # sleep for twice the median election time and the restart delay time.sleep(MEDIAN_REELECTION_TIME * 2 + RESTART_DELAY) # verify all units are up and running - for unit in ops_test.model.applications[app].units: - assert await helpers.mongod_ready( - ops_test, unit.public_address + for unit in ops_test.model.applications[app_name].units: + assert await mongod_ready( + ops_test, unit.public_address, app_name=app_name ), f"unit {unit.name} not restarted after cluster crash." # verify new writes are continuing by counting the number of writes before and after a 5 second # wait - writes = await helpers.count_writes(ops_test) + writes = await count_writes(ops_test, app_name=app_name) time.sleep(5) - more_writes = await helpers.count_writes(ops_test) + more_writes = await count_writes(ops_test, app_name=app_name) assert more_writes > writes, "writes not continuing to DB" # verify presence of primary, replica set member configuration, and number of primaries - await helpers.verify_replica_set_configuration(ops_test) + await verify_replica_set_configuration(ops_test, app_name=app_name) # verify that no writes to the db were missed - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) # verify that no writes were missed. assert actual_writes == total_expected_writes["number"], "db writes missing." async def test_full_cluster_restart(ops_test: OpsTest, continuous_writes, reset_restart_delay): - app = await helpers.app_name(ops_test) + app_name = await get_app_name(ops_test) # update all units to have a new RESTART_DELAY, Modifying the Restart delay to 3 minutes # should ensure enough time for all replicas to be down at the same time. - for unit in ops_test.model.applications[app].units: - await helpers.update_restart_delay(ops_test, unit, RESTART_DELAY) + for unit in ops_test.model.applications[app_name].units: + await update_restart_delay(ops_test, unit, RESTART_DELAY) # kill all units "simultaneously" await asyncio.gather( *[ - helpers.kill_unit_process(ops_test, unit.name, kill_code="SIGTERM") - for unit in ops_test.model.applications[app].units + kill_unit_process(ops_test, unit.name, kill_code="SIGTERM", app_name=app_name) + for unit in ops_test.model.applications[app_name].units ] ) # This test serves to verify behavior when all replicas are down at the same time that when # they come back online they operate as expected. This check verifies that we meet the criteria # of all replicas being down at the same time. - assert await helpers.all_db_processes_down(ops_test), "Not all units down at the same time." + assert await all_db_processes_down( + ops_test, app_name=app_name + ), "Not all units down at the same time." # sleep for twice the median election time and the restart delay time.sleep(MEDIAN_REELECTION_TIME * 2 + RESTART_DELAY) # verify all units are up and running - for unit in ops_test.model.applications[app].units: - assert await helpers.mongod_ready( - ops_test, unit.public_address + for unit in ops_test.model.applications[app_name].units: + assert await mongod_ready( + ops_test, unit.public_address, app_name=app_name ), f"unit {unit.name} not restarted after cluster crash." # verify new writes are continuing by counting the number of writes before and after a 5 second # wait - writes = await helpers.count_writes(ops_test) + writes = await count_writes(ops_test, app_name=app_name) time.sleep(5) - more_writes = await helpers.count_writes(ops_test) + more_writes = await count_writes(ops_test, app_name=app_name) assert more_writes > writes, "writes not continuing to DB" # verify presence of primary, replica set member configuration, and number of primaries - await helpers.verify_replica_set_configuration(ops_test) + await verify_replica_set_configuration(ops_test, app_name=app_name) # verify that no writes to the db were missed - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes, "writes to the db were missed." async def test_network_cut(ops_test, continuous_writes): # locate primary unit - app = await helpers.app_name(ops_test) - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] - primary = await helpers.replica_set_primary(ip_addresses, ops_test) - all_units = ops_test.model.applications[app].units + app_name = await get_app_name(ops_test) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] + primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) + all_units = ops_test.model.applications[app_name].units model_name = ops_test.model.info.name - primary_hostname = await helpers.unit_hostname(ops_test, primary.name) - primary_unit_ip = await helpers.get_unit_ip(ops_test, primary.name) + primary_hostname = await unit_hostname(ops_test, primary.name) + primary_unit_ip = await get_unit_ip(ops_test, primary.name) # before cutting network verify that connection is possible - assert await helpers.mongod_ready( - ops_test, - primary.public_address, + assert await mongod_ready( + ops_test, primary.public_address, app_name=app_name ), f"Connection to host {primary.public_address} is not possible" - helpers.cut_network_from_unit(primary_hostname) + cut_network_from_unit(primary_hostname) # verify machine is not reachable from peer units for unit in set(all_units) - {primary}: - hostname = await helpers.unit_hostname(ops_test, unit.name) - assert not helpers.is_machine_reachable_from( + hostname = await unit_hostname(ops_test, unit.name) + assert not is_machine_reachable_from( hostname, primary_hostname ), "unit is reachable from peer" # verify machine is not reachable from controller - controller = await helpers.get_controller_machine(ops_test) - assert not helpers.is_machine_reachable_from( + controller = await get_controller_machine(ops_test) + assert not is_machine_reachable_from( controller, primary_hostname ), "unit is reachable from controller" @@ -615,43 +618,46 @@ async def test_network_cut(ops_test, continuous_writes): # verify new writes are continuing by counting the number of writes before and after a 5 second # wait - writes = await helpers.count_writes(ops_test, down_unit=primary.name) + writes = await count_writes(ops_test, down_unit=primary.name, app_name=app_name) time.sleep(5) - more_writes = await helpers.count_writes(ops_test, down_unit=primary.name) + more_writes = await count_writes(ops_test, down_unit=primary.name, app_name=app_name) assert more_writes > writes, "writes not continuing to DB" # verify that a new primary got elected - new_primary = await helpers.replica_set_primary(ip_addresses, ops_test, down_unit=primary.name) + new_primary = await replica_set_primary( + ip_addresses, ops_test, down_unit=primary.name, app_name=app_name + ) assert new_primary.name != primary.name # verify that no writes to the db were missed - total_expected_writes = await helpers.stop_continous_writes(ops_test, down_unit=primary.name) - actual_writes = await helpers.count_writes(ops_test, down_unit=primary.name) + total_expected_writes = await stop_continous_writes( + ops_test, down_unit=primary.name, app_name=app_name + ) + actual_writes = await count_writes(ops_test, down_unit=primary.name, app_name=app_name) assert total_expected_writes["number"] == actual_writes, "writes to the db were missed." # restore network connectivity to old primary - helpers.restore_network_for_unit(primary_hostname) + restore_network_for_unit(primary_hostname) # wait until network is reestablished for the unit - helpers.wait_network_restore(model_name, primary_hostname, primary_unit_ip) + wait_network_restore(model_name, primary_hostname, primary_unit_ip) # self healing is performed with update status hook async with ops_test.fast_forward(): - await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1000) + await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) # verify we have connection to the old primary - new_ip = helpers.instance_ip(model_name, primary_hostname) - assert await helpers.mongod_ready( - ops_test, - new_ip, + new_ip = instance_ip(model_name, primary_hostname) + assert await mongod_ready( + ops_test, new_ip, app_name=app_name ), f"Connection to host {new_ip} is not possible" # verify presence of primary, replica set member configuration, and number of primaries - await helpers.verify_replica_set_configuration(ops_test) + await verify_replica_set_configuration(ops_test, app_name=app_name) # verify that old primary is up to date. - assert await helpers.secondary_up_to_date( - ops_test, new_ip, total_expected_writes["number"] + assert await secondary_up_to_date( + ops_test, new_ip, total_expected_writes["number"], app_name=app_name ), "secondary not up to date with the cluster after restarting." @@ -662,7 +668,7 @@ async def test_scale_up_down(ops_test: OpsTest, continuous_writes): scales = [3, -3, 4, -4, 5, -5, 6, -6, 7, -7] for count in scales: await scale_and_verify(ops_test, count=count) - await _verify_writes(ops_test) + await verify_writes(ops_test) @pytest.mark.abort_on_fail @@ -672,47 +678,65 @@ async def test_scale_up_down_removing_leader(ops_test: OpsTest, continuous_write scales = [3, -3, 4, -4, 5, -5, 6, -6, 7, -7] for count in scales: await scale_and_verify(ops_test, count=count, remove_leader=True) - await _verify_writes(ops_test) + await verify_writes(ops_test) -async def scale_and_verify(ops_test: OpsTest, count: int, remove_leader: bool = False): - if count == 0: - logger.warning("Skipping scale up/down by 0") - return +# TODO put this into a separate file +# Fixtures start - app = await helpers.app_name(ops_test) - - if count > 0: - logger.info(f"Scaling up by {count} units") - await ops_test.model.applications[app].add_units(count) - else: - logger.info(f"Scaling down by {abs(count)} units") - # find leader unit - leader_unit = await helpers.find_unit(ops_test, leader=True) - units_to_remove = [] - for unit in ops_test.model.applications[app].units: - if not remove_leader and unit.name == leader_unit.name: - continue - if len(units_to_remove) < abs(count): - units_to_remove.append(unit.name) - - logger.info(f"Units to remove {units_to_remove}") - await ops_test.model.applications[app].destroy_units(*units_to_remove) - logger.info("Waiting for idle") - await ops_test.model.wait_for_idle( - apps=[app], - status="active", - timeout=1000, - ) - logger.info("Validating replica set has primary") - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] - primary = await helpers.replica_set_primary(ip_addresses, ops_test, app=app) - assert primary is not None, "Replica set has no primary" +@pytest.fixture() +async def continuous_writes(ops_test: OpsTest): + """Starts continuous write operations to MongoDB for test and clears writes at end of test.""" + await start_continous_writes(ops_test, 1) + yield + await clear_db_writes(ops_test) -async def _verify_writes(ops_test: OpsTest): - # verify that no writes to the db were missed - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) - assert total_expected_writes["number"] == actual_writes, "writes to the db were missed." +@pytest.fixture() +async def reset_restart_delay(ops_test: OpsTest): + """Resets service file delay on all units.""" + yield + app_name = await get_app_name(ops_test) + for unit in ops_test.model.applications[app_name].units: + await update_restart_delay(ops_test, unit, ORIGINAL_RESTART_DELAY) + + +@pytest.fixture() +async def change_logging(ops_test: OpsTest): + """Enables appending logging for a test and resets the logging at the end of the test.""" + app_name = await get_app_name(ops_test) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] + primary = await replica_set_primary(ip_addresses, ops_test) + + for unit in ops_test.model.applications[app_name].units: + # tests which use this fixture restart the primary. Therefore the primary should not be + # restarted as to leave the restart testing to the test itself. + if unit.name == primary.name: + continue + + # must restart unit to ensure that changes to logging are made + await stop_mongod(ops_test, unit) + await update_service_logging(ops_test, unit, logging=True) + await start_mongod(ops_test, unit) + + # sleep long enough for the mongod to start up correctly + time.sleep(15) + yield + + app_name = await get_app_name(ops_test) + for unit in ops_test.model.applications[app_name].units: + # must restart unit to ensure that changes to logging are made + await stop_mongod(ops_test, unit) + await update_service_logging(ops_test, unit, logging=False) + await start_mongod(ops_test, unit) + + # sleep long enough for the mongod to start up correctly + time.sleep(15) + + # remove the log file as to not clog up space on the replicas. + rm_cmd = f"exec --unit {unit.name} rm {MONGODB_LOG_PATH}" + await ops_test.juju(*rm_cmd.split()) + + +# Fixtures end diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 3ce39568f..ddedaf408 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -2,8 +2,10 @@ # See LICENSE file for licensing details. import json +import logging +import subprocess from pathlib import Path -from typing import Dict, Optional +from typing import Dict, List, Optional import ops import yaml @@ -12,11 +14,15 @@ from tenacity import retry, retry_if_result, stop_after_attempt, wait_exponential METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) -PORT = 27017 APP_NAME = METADATA["name"] +PORT = 27017 UNIT_IDS = [0, 1, 2] SERIES = "jammy" +logger = logging.getLogger(__name__) + +logger = logging.getLogger(__name__) + def unit_uri(ip_address: str, password, app=APP_NAME) -> str: """Generates URI that is used by MongoDB to connect to a single replica. @@ -29,21 +35,28 @@ def unit_uri(ip_address: str, password, app=APP_NAME) -> str: return f"mongodb://operator:" f"{password}@" f"{ip_address}:{PORT}/admin?replicaSet={app}" -async def get_password(ops_test: OpsTest, app=APP_NAME, username="operator") -> str: +async def get_password(ops_test: OpsTest, username="operator", app_name=None) -> str: """Use the charm action to retrieve the password from provided unit. Returns: String with the password stored on the peer relation databag. """ + app_name = app_name or await get_app_name(ops_test) + # can retrieve from any unit running unit so we pick the first - unit_name = ops_test.model.applications[app].units[0].name + unit_name = ops_test.model.applications[app_name].units[0].name unit_id = unit_name.split("/")[1] - action = await ops_test.model.units.get(f"{app}/{unit_id}").run_action( + action = await ops_test.model.units.get(f"{app_name}/{unit_id}").run_action( "get-password", **{"username": username} ) action = await action.wait() - return action.results["password"] + try: + password = action.results["password"] + return password + except KeyError: + logger.error("Failed to get password. Action %s. Results %s", action, action.results) + return None @retry( @@ -51,18 +64,21 @@ async def get_password(ops_test: OpsTest, app=APP_NAME, username="operator") -> stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=30), ) -def count_primaries(ops_test: OpsTest, password: str) -> int: +async def count_primaries(ops_test: OpsTest, password: str, app_name=None) -> int: """Counts the number of primaries in a replica set. Will retry counting when the number of primaries is 0 at most 5 times. """ + app_name = app_name or await get_app_name(ops_test) number_of_primaries = 0 for unit_id in UNIT_IDS: # get unit - unit = ops_test.model.applications[APP_NAME].units[unit_id] + unit = ops_test.model.applications[app_name].units[unit_id] # connect to mongod - client = MongoClient(unit_uri(unit.public_address, password), directConnection=True) + client = MongoClient( + unit_uri(unit.public_address, password, app_name), directConnection=True + ) # check primary status if client.is_primary: @@ -71,10 +87,11 @@ def count_primaries(ops_test: OpsTest, password: str) -> int: return number_of_primaries -async def find_unit(ops_test: OpsTest, leader: bool, app=APP_NAME) -> ops.model.Unit: +async def find_unit(ops_test: OpsTest, leader: bool, app_name=None) -> ops.model.Unit: """Helper function identifies the a unit, based on need for leader or non-leader.""" + app_name = app_name or await get_app_name(ops_test) ret_unit = None - for unit in ops_test.model.applications[app].units: + for unit in ops_test.model.applications[app_name].units: if await unit.is_leader_from_status() == leader: ret_unit = unit @@ -83,7 +100,8 @@ async def find_unit(ops_test: OpsTest, leader: bool, app=APP_NAME) -> ops.model. async def get_leader_id(ops_test: OpsTest) -> int: """Returns the unit number of the juju leader unit.""" - for unit in ops_test.model.applications[APP_NAME].units: + app_name = await get_app_name(ops_test) + for unit in ops_test.model.applications[app_name].units: if await unit.is_leader_from_status(): return int(unit.name.split("/")[1]) return -1 @@ -97,7 +115,8 @@ async def set_password( Returns: String with the password stored on the peer relation databag. """ - action = await ops_test.model.units.get(f"{APP_NAME}/{unit_id}").run_action( + app_name = await get_app_name(ops_test) + action = await ops_test.model.units.get(f"{app_name}/{unit_id}").run_action( "set-password", **{"username": username, "password": password} ) action = await action.wait() @@ -139,7 +158,6 @@ async def get_application_relation_data( # Filter the data based on the relation name. relation_data = [v for v in data[unit_name]["relation-info"] if v["endpoint"] == relation_name] - if relation_id: # Filter the data based on the relation id. relation_data = [v for v in relation_data if v["relation-id"] == relation_id] @@ -189,3 +207,102 @@ async def get_secret_content(ops_test, secret_id) -> Dict[str, str]: _, stdout, _ = await ops_test.juju(*complete_command.split()) data = json.loads(stdout) return data[secret_id]["content"]["Data"] + + +async def check_or_scale_app(ops_test: OpsTest, user_app_name: str, required_units: int) -> None: + """A helper function that scales existing cluster if necessary.""" + # check if we need to scale + current_units = len(ops_test.model.applications[user_app_name].units) + + if current_units == required_units: + return + elif current_units > required_units: + for i in range(0, current_units): + unit_to_remove = [ops_test.model.applications[user_app_name].units[i].name] + await ops_test.model.destroy_units(*unit_to_remove) + await ops_test.model.wait_for_idle() + else: + units_to_add = required_units - current_units + await ops_test.model.applications[user_app_name].add_unit(count=units_to_add) + await ops_test.model.wait_for_idle() + + +async def get_app_name(ops_test: OpsTest, test_deployments: List[str] = []) -> str: + """Returns the name of the cluster running MongoDB. + + This is important since not all deployments of the MongoDB charm have the application name + "mongodb". + + Note: if multiple clusters are running MongoDB this will return the one first found. + """ + status = await ops_test.model.get_status() + for app in ops_test.model.applications: + # note that format of the charm field is not exactly "mongodb" but instead takes the form + # of `local:focal/mongodb-6` + if "mongodb" in status["applications"][app]["charm"]: + logger.debug("Found mongodb app named '%s'", app) + + if app in test_deployments: + logger.debug("mongodb app named '%s', was deployed by the test, not by user", app) + continue + + return app + + return None + + +async def unit_hostname(ops_test: OpsTest, unit_name: str) -> str: + """Get hostname for a unit. + + Args: + ops_test: The ops test object passed into every test case + unit_name: The name of the unit to be tested + + Returns: + The machine/container hostname + """ + _, raw_hostname, _ = await ops_test.juju("ssh", unit_name, "hostname") + return raw_hostname.strip() + + +def instance_ip(model: str, instance: str) -> str: + """Translate juju instance name to IP. + + Args: + model: The name of the model + instance: The name of the instance + + Returns: + The (str) IP address of the instance + """ + output = subprocess.check_output(f"juju machines --model {model}".split()) + + for line in output.decode("utf8").splitlines(): + if instance in line: + return line.split()[2] + + +async def get_unit_ip(ops_test: OpsTest, unit_name: str) -> str: + """Wrapper for getting unit ip. + + Juju incorrectly reports the IP addresses after the network is restored this is reported as a + bug here: https://github.com/juju/python-libjuju/issues/738 . Once this bug is resolved use of + `get_unit_ip` should be replaced with `.public_address` + + Args: + ops_test: The ops test object passed into every test case + unit_name: The name of the unit to be tested + + Returns: + The (str) ip of the unit + """ + return instance_ip(ops_test.model.info.name, await unit_hostname(ops_test, unit_name)) + + +def audit_log_line_sanity_check(entry) -> bool: + fields = ["atype", "ts", "local", "remote", "users", "roles", "param", "result"] + for field in fields: + if entry.get(field) is None: + logger.error("Field '%s' not found in audit log entry \"%s\"", field, entry) + return False + return True diff --git a/tests/integration/metrics_tests/test_metrics.py b/tests/integration/metrics_tests/test_metrics.py index 090d8b570..d7a9eec5c 100644 --- a/tests/integration/metrics_tests/test_metrics.py +++ b/tests/integration/metrics_tests/test_metrics.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. +import os import time import ops @@ -9,31 +10,30 @@ from pytest_operator.plugin import OpsTest from ..ha_tests import helpers as ha_helpers -from ..helpers import find_unit +from ..helpers import ( + UNIT_IDS, + check_or_scale_app, + find_unit, + get_app_name, + get_unit_ip, + unit_hostname, +) MONGODB_EXPORTER_PORT = 9216 MEDIAN_REELECTION_TIME = 12 -async def verify_endpoints(ops_test: OpsTest, unit: ops.model.Unit) -> str: - """Verifies mongodb endpoint is functional on a given unit.""" - http = urllib3.PoolManager() - - unit_address = await ha_helpers.get_unit_ip(ops_test, unit.name) - mongodb_exporter_url = f"http://{unit_address}:{MONGODB_EXPORTER_PORT}/metrics" - mongo_resp = http.request("GET", mongodb_exporter_url) - - assert mongo_resp.status == 200 - - # if configured correctly there should be more than one mongodb metric present - mongodb_metrics = mongo_resp._body.decode("utf8") - assert mongodb_metrics.count("mongo") > 1 - - +@pytest.mark.skipif( + os.environ.get("PYTEST_SKIP_DEPLOY", False), + reason="skipping deploy, model expected to be provided.", +) @pytest.mark.abort_on_fail async def test_build_and_deploy(ops_test: OpsTest) -> None: """Build and deploy one unit of MongoDB.""" - if await ha_helpers.app_name(ops_test): + app_name = await get_app_name(ops_test) + if app_name: + return await check_or_scale_app(ops_test, app_name, len(UNIT_IDS)) + if await get_app_name(ops_test): return my_charm = await ops_test.build_charm(".") @@ -43,8 +43,8 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: async def test_endpoints(ops_test: OpsTest): """Sanity check that endpoints are running.""" - app = await ha_helpers.app_name(ops_test) - application = ops_test.model.applications[app] + app_name = await get_app_name(ops_test) + application = ops_test.model.applications[app_name] for unit in application.units: await verify_endpoints(ops_test, unit) @@ -52,24 +52,24 @@ async def test_endpoints(ops_test: OpsTest): async def test_endpoints_new_password(ops_test: OpsTest): """Verify that endpoints still function correctly after the monitor user password changes.""" - app = await ha_helpers.app_name(ops_test) - application = ops_test.model.applications[app] + app_name = await get_app_name(ops_test) + application = ops_test.model.applications[app_name] leader_unit = await find_unit(ops_test, leader=True) action = await leader_unit.run_action("set-password", **{"username": "monitor"}) action = await action.wait() # wait for non-leader units to receive relation changed event. time.sleep(3) - await ops_test.model.wait_for_idle(apps=[app], status="active", idle_period=15) + await ops_test.model.wait_for_idle(apps=[app_name], status="active", idle_period=15) for unit in application.units: await verify_endpoints(ops_test, unit) async def test_endpoints_network_cut(ops_test: OpsTest): """Verify that endpoint still function correctly after a network cut.""" - app = await ha_helpers.app_name(ops_test) - unit = ops_test.model.applications[app].units[0] - hostname = await ha_helpers.unit_hostname(ops_test, unit.name) - unit_ip = await ha_helpers.get_unit_ip(ops_test, unit.name) + app_name = await get_app_name(ops_test) + unit = ops_test.model.applications[app_name].units[0] + hostname = await unit_hostname(ops_test, unit.name) + unit_ip = await get_unit_ip(ops_test, unit.name) ha_helpers.cut_network_from_unit(hostname) # sleep for twice the median election time @@ -79,3 +79,21 @@ async def test_endpoints_network_cut(ops_test: OpsTest): ha_helpers.restore_network_for_unit(hostname) ha_helpers.wait_network_restore(ops_test.model.info.name, hostname, unit_ip) await verify_endpoints(ops_test, unit) + + +# helpers + + +async def verify_endpoints(ops_test: OpsTest, unit: ops.model.Unit) -> str: + """Verifies mongodb endpoint is functional on a given unit.""" + http = urllib3.PoolManager() + + unit_address = await get_unit_ip(ops_test, unit.name) + mongodb_exporter_url = f"http://{unit_address}:{MONGODB_EXPORTER_PORT}/metrics" + mongo_resp = http.request("GET", mongodb_exporter_url) + + assert mongo_resp.status == 200 + + # if configured correctly there should be more than one mongodb metric present + mongodb_metrics = mongo_resp._body.decode("utf8") + assert mongodb_metrics.count("mongo") > 1 diff --git a/tests/integration/relation_tests/new_relations/test_charm_relations.py b/tests/integration/relation_tests/new_relations/test_charm_relations.py index f354b9795..6be5f1aa9 100644 --- a/tests/integration/relation_tests/new_relations/test_charm_relations.py +++ b/tests/integration/relation_tests/new_relations/test_charm_relations.py @@ -13,6 +13,7 @@ from tenacity import RetryError from ...ha_tests.helpers import replica_set_primary +from ...helpers import check_or_scale_app, get_app_name from .helpers import ( get_application_relation_data, get_connection_string, @@ -29,7 +30,7 @@ MULTIPLE_DATABASE_CLUSTERS_RELATION_NAME = "multiple-database-clusters" ALIASED_MULTIPLE_DATABASE_CLUSTERS_RELATION_NAME = "aliased-multiple-database-clusters" ANOTHER_DATABASE_APP_NAME = "another-database" -APP_NAMES = [APPLICATION_APP_NAME, DATABASE_APP_NAME, ANOTHER_DATABASE_APP_NAME] +APP_NAMES = [APPLICATION_APP_NAME, ANOTHER_DATABASE_APP_NAME] @pytest.mark.abort_on_fail @@ -37,31 +38,62 @@ async def test_deploy_charms(ops_test: OpsTest, application_charm, database_char """Deploy both charms (application and database) to use in the tests.""" # Deploy both charms (2 units for each application to test that later they correctly # set data in the relation application databag using only the leader unit). - await asyncio.gather( - ops_test.model.deploy( - application_charm, - application_name=APPLICATION_APP_NAME, - num_units=2, - ), - ops_test.model.deploy( - database_charm, - application_name=DATABASE_APP_NAME, - num_units=2, - ), - ops_test.model.deploy( - database_charm, - application_name=ANOTHER_DATABASE_APP_NAME, - ), + required_units = 2 + app_name = await get_app_name(ops_test) + if app_name == ANOTHER_DATABASE_APP_NAME: + assert ( + False + ), f"provided MongoDB application, cannot be named {ANOTHER_DATABASE_APP_NAME}, this name is reserved for this test." + + if app_name: + await asyncio.gather( + ops_test.model.deploy( + application_charm, + application_name=APPLICATION_APP_NAME, + num_units=required_units, + ), + check_or_scale_app(ops_test, app_name, required_units), + ops_test.model.deploy( + database_charm, + application_name=ANOTHER_DATABASE_APP_NAME, + ), + ) + else: + await asyncio.gather( + ops_test.model.deploy( + application_charm, + application_name=APPLICATION_APP_NAME, + num_units=required_units, + ), + ops_test.model.deploy( + database_charm, + application_name=DATABASE_APP_NAME, + num_units=required_units, + ), + ops_test.model.deploy( + database_charm, + application_name=ANOTHER_DATABASE_APP_NAME, + ), + ) + if app_name: + APP_NAMES.append(app_name) + else: + APP_NAMES.append(DATABASE_APP_NAME) + await ops_test.model.wait_for_idle( + apps=APP_NAMES, status="active", wait_for_at_least_units=required_units ) - await ops_test.model.wait_for_idle(apps=APP_NAMES, status="active", wait_for_at_least_units=1) @pytest.mark.abort_on_fail async def test_database_relation_with_charm_libraries(ops_test: OpsTest): """Test basic functionality of database relation interface.""" # Relate the charms and wait for them exchanging some connection data. - await ops_test.model.add_relation( - f"{APPLICATION_APP_NAME}:{FIRST_DATABASE_RELATION_NAME}", DATABASE_APP_NAME + db_app_name = ( + await get_app_name(ops_test, test_deployments=[ANOTHER_DATABASE_APP_NAME]) + or DATABASE_APP_NAME + ) + await ops_test.model.integrate( + f"{APPLICATION_APP_NAME}:{FIRST_DATABASE_RELATION_NAME}", db_app_name ) await ops_test.model.wait_for_idle(apps=APP_NAMES, status="active") @@ -106,15 +138,19 @@ async def test_database_relation_with_charm_libraries(ops_test: OpsTest): async def test_app_relation_metadata_change(ops_test: OpsTest) -> None: """Verifies that the app metadata changes with db relation joined and departed events.""" # verify application metadata is correct before adding/removing units. + db_app_name = ( + await get_app_name(ops_test, test_deployments=[ANOTHER_DATABASE_APP_NAME]) + or DATABASE_APP_NAME + ) try: await verify_application_data( - ops_test, APPLICATION_APP_NAME, DATABASE_APP_NAME, FIRST_DATABASE_RELATION_NAME + ops_test, APPLICATION_APP_NAME, db_app_name, FIRST_DATABASE_RELATION_NAME ) except RetryError: assert False, "Hosts are not correct in application data." # verify application metadata is correct after adding units. - await ops_test.model.applications[DATABASE_APP_NAME].add_units(count=2) + await ops_test.model.applications[db_app_name].add_units(count=2) await ops_test.model.wait_for_idle( apps=APP_NAMES, status="active", @@ -123,7 +159,7 @@ async def test_app_relation_metadata_change(ops_test: OpsTest) -> None: try: await verify_application_data( - ops_test, APPLICATION_APP_NAME, DATABASE_APP_NAME, FIRST_DATABASE_RELATION_NAME + ops_test, APPLICATION_APP_NAME, db_app_name, FIRST_DATABASE_RELATION_NAME ) except RetryError: assert False, "Hosts not updated in application data after adding units." @@ -131,14 +167,14 @@ async def test_app_relation_metadata_change(ops_test: OpsTest) -> None: # verify application metadata is correct after removing the pre-existing units. This is # this is important since we want to test that the application related will work with # only the newly added units from above. - await ops_test.model.applications[DATABASE_APP_NAME].destroy_units(f"{DATABASE_APP_NAME}/0") + await ops_test.model.applications[db_app_name].destroy_units(f"{db_app_name}/0") await ops_test.model.wait_for_idle( apps=APP_NAMES, status="active", timeout=1000, ) - await ops_test.model.applications[DATABASE_APP_NAME].destroy_units(f"{DATABASE_APP_NAME}/1") + await ops_test.model.applications[db_app_name].destroy_units(f"{db_app_name}/1") await ops_test.model.wait_for_idle( apps=APP_NAMES, status="active", @@ -147,7 +183,7 @@ async def test_app_relation_metadata_change(ops_test: OpsTest) -> None: try: await verify_application_data( - ops_test, APPLICATION_APP_NAME, DATABASE_APP_NAME, FIRST_DATABASE_RELATION_NAME + ops_test, APPLICATION_APP_NAME, db_app_name, FIRST_DATABASE_RELATION_NAME ) except RetryError: assert False, "Hosts not updated in application data after removing units." @@ -160,7 +196,7 @@ async def test_app_relation_metadata_change(ops_test: OpsTest) -> None: ) ip_addresses = endpoints_str.split(",") try: - primary = await replica_set_primary(ip_addresses, ops_test, app=DATABASE_APP_NAME) + primary = await replica_set_primary(ip_addresses, ops_test, app_name=db_app_name) except RetryError: assert False, "replica set has no primary" @@ -245,10 +281,14 @@ async def test_two_applications_doesnt_share_the_same_relation_data( ) await ops_test.model.wait_for_idle(apps=all_app_names, status="active") + db_app_name = ( + await get_app_name(ops_test, test_deployments=[ANOTHER_DATABASE_APP_NAME]) + or DATABASE_APP_NAME + ) # Relate the new application with the database # and wait for them exchanging some connection data. - await ops_test.model.add_relation( - f"{another_application_app_name}:{FIRST_DATABASE_RELATION_NAME}", DATABASE_APP_NAME + await ops_test.model.integrate( + f"{another_application_app_name}:{FIRST_DATABASE_RELATION_NAME}", db_app_name ) await ops_test.model.wait_for_idle(apps=all_app_names, status="active") @@ -267,10 +307,14 @@ async def test_an_application_can_connect_to_multiple_database_clusters(ops_test """Test that an application can connect to different clusters of the same database.""" # Relate the application with both database clusters # and wait for them exchanging some connection data. - first_cluster_relation = await ops_test.model.add_relation( - f"{APPLICATION_APP_NAME}:{MULTIPLE_DATABASE_CLUSTERS_RELATION_NAME}", DATABASE_APP_NAME + db_app_name = ( + await get_app_name(ops_test, test_deployments=[ANOTHER_DATABASE_APP_NAME]) + or DATABASE_APP_NAME ) - second_cluster_relation = await ops_test.model.add_relation( + first_cluster_relation = await ops_test.model.integrate( + f"{APPLICATION_APP_NAME}:{MULTIPLE_DATABASE_CLUSTERS_RELATION_NAME}", db_app_name + ) + second_cluster_relation = await ops_test.model.integrate( f"{APPLICATION_APP_NAME}:{MULTIPLE_DATABASE_CLUSTERS_RELATION_NAME}", ANOTHER_DATABASE_APP_NAME, ) @@ -301,12 +345,16 @@ async def test_an_application_can_connect_to_multiple_aliased_database_clusters( # """Test that an application can connect to different clusters of the same database.""" # Relate the application with both database clusters # and wait for them exchanging some connection data. + db_app_name = ( + await get_app_name(ops_test, test_deployments=[ANOTHER_DATABASE_APP_NAME]) + or DATABASE_APP_NAME + ) await asyncio.gather( - ops_test.model.add_relation( + ops_test.model.integrate( f"{APPLICATION_APP_NAME}:{ALIASED_MULTIPLE_DATABASE_CLUSTERS_RELATION_NAME}", - DATABASE_APP_NAME, + db_app_name, ), - ops_test.model.add_relation( + ops_test.model.integrate( f"{APPLICATION_APP_NAME}:{ALIASED_MULTIPLE_DATABASE_CLUSTERS_RELATION_NAME}", ANOTHER_DATABASE_APP_NAME, ), @@ -336,8 +384,12 @@ async def test_an_application_can_connect_to_multiple_aliased_database_clusters( async def test_an_application_can_request_multiple_databases(ops_test: OpsTest, application_charm): """Test that an application can request additional databases using the same interface.""" # Relate the charms using another relation and wait for them exchanging some connection data. - await ops_test.model.add_relation( - f"{APPLICATION_APP_NAME}:{SECOND_DATABASE_RELATION_NAME}", DATABASE_APP_NAME + db_app_name = ( + await get_app_name(ops_test, test_deployments=[ANOTHER_DATABASE_APP_NAME]) + or DATABASE_APP_NAME + ) + await ops_test.model.integrate( + f"{APPLICATION_APP_NAME}:{SECOND_DATABASE_RELATION_NAME}", db_app_name ) await ops_test.model.wait_for_idle(apps=APP_NAMES, status="active") @@ -359,9 +411,12 @@ async def test_removed_relation_no_longer_has_access(ops_test: OpsTest): connection_string = await get_connection_string( ops_test, APPLICATION_APP_NAME, FIRST_DATABASE_RELATION_NAME ) - - await ops_test.model.applications[DATABASE_APP_NAME].remove_relation( - f"{APPLICATION_APP_NAME}:{FIRST_DATABASE_RELATION_NAME}", f"{DATABASE_APP_NAME}:database" + db_app_name = ( + await get_app_name(ops_test, test_deployments=[ANOTHER_DATABASE_APP_NAME]) + or DATABASE_APP_NAME + ) + await ops_test.model.applications[db_app_name].remove_relation( + f"{APPLICATION_APP_NAME}:{FIRST_DATABASE_RELATION_NAME}", f"{db_app_name}:database" ) await ops_test.model.wait_for_idle(apps=APP_NAMES, status="active") diff --git a/tests/integration/sharding_tests/helpers.py b/tests/integration/sharding_tests/helpers.py index d8f9f8ae5..fac50041b 100644 --- a/tests/integration/sharding_tests/helpers.py +++ b/tests/integration/sharding_tests/helpers.py @@ -16,7 +16,7 @@ async def generate_mongodb_client(ops_test: OpsTest, app_name: str, mongos: bool): """Returns a MongoDB client for mongos/mongod.""" hosts = [unit.public_address for unit in ops_test.model.applications[app_name].units] - password = await get_password(ops_test, app_name) + password = await get_password(ops_test, app_name=app_name) port = MONGOS_PORT if mongos else MONGOD_PORT hosts = [f"{host}:{port}" for host in hosts] hosts = ",".join(hosts) diff --git a/tests/integration/sharding_tests/test_sharding_race_conds.py b/tests/integration/sharding_tests/test_sharding_race_conds.py new file mode 100644 index 000000000..f21a97851 --- /dev/null +++ b/tests/integration/sharding_tests/test_sharding_race_conds.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +import pytest +from pytest_operator.plugin import OpsTest + +from .helpers import generate_mongodb_client, has_correct_shards + +SHARD_ONE_APP_NAME = "shard-one" +SHARD_TWO_APP_NAME = "shard-two" +SHARD_THREE_APP_NAME = "shard-three" +SHARD_APPS = [SHARD_ONE_APP_NAME, SHARD_TWO_APP_NAME] +CONFIG_SERVER_APP_NAME = "config-server-one" +SHARD_REL_NAME = "sharding" +CONFIG_SERVER_REL_NAME = "config-server" +MONGODB_KEYFILE_PATH = "/var/snap/charmed-mongodb/current/etc/mongod/keyFile" + +TIMEOUT = 60 * 10 + + +@pytest.mark.abort_on_fail +async def test_build_and_deploy(ops_test: OpsTest) -> None: + """Build and deploy a sharded cluster.""" + my_charm = await ops_test.build_charm(".") + await ops_test.model.deploy( + my_charm, + num_units=2, + config={"role": "config-server"}, + application_name=CONFIG_SERVER_APP_NAME, + ) + await ops_test.model.deploy( + my_charm, num_units=2, config={"role": "shard"}, application_name=SHARD_ONE_APP_NAME + ) + await ops_test.model.deploy( + my_charm, num_units=2, config={"role": "shard"}, application_name=SHARD_TWO_APP_NAME + ) + await ops_test.model.deploy( + my_charm, num_units=2, config={"role": "shard"}, application_name=SHARD_THREE_APP_NAME + ) + + +@pytest.mark.abort_on_fail +async def test_immediate_relate(ops_test: OpsTest) -> None: + """Tests the immediate integration of cluster components works without error.""" + await ops_test.model.integrate( + f"{SHARD_ONE_APP_NAME}:{SHARD_REL_NAME}", + f"{CONFIG_SERVER_APP_NAME}:{CONFIG_SERVER_REL_NAME}", + ) + await ops_test.model.integrate( + f"{SHARD_TWO_APP_NAME}:{SHARD_REL_NAME}", + f"{CONFIG_SERVER_APP_NAME}:{CONFIG_SERVER_REL_NAME}", + ) + await ops_test.model.integrate( + f"{SHARD_THREE_APP_NAME}:{SHARD_REL_NAME}", + f"{CONFIG_SERVER_APP_NAME}:{CONFIG_SERVER_REL_NAME}", + ) + + await ops_test.model.wait_for_idle( + apps=[ + CONFIG_SERVER_APP_NAME, + SHARD_ONE_APP_NAME, + SHARD_TWO_APP_NAME, + SHARD_THREE_APP_NAME, + ], + idle_period=20, + status="active", + timeout=TIMEOUT, + raise_on_error=False, + ) + + mongos_client = await generate_mongodb_client( + ops_test, app_name=CONFIG_SERVER_APP_NAME, mongos=True + ) + + # verify sharded cluster config + assert has_correct_shards( + mongos_client, + expected_shards=[SHARD_ONE_APP_NAME, SHARD_TWO_APP_NAME, SHARD_THREE_APP_NAME], + ), "Config server did not process config properly" diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index a3c6cd250..69f1f2e1d 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -5,7 +5,9 @@ import json import logging import os +import subprocess import time +from subprocess import check_output from uuid import uuid4 import pytest @@ -15,13 +17,15 @@ from pytest_operator.plugin import OpsTest from tenacity import RetryError -from .ha_tests.helpers import app_name, kill_unit_process +from .ha_tests.helpers import kill_unit_process from .helpers import ( - APP_NAME, PORT, UNIT_IDS, + audit_log_line_sanity_check, + check_or_scale_app, count_primaries, find_unit, + get_app_name, get_leader_id, get_password, set_password, @@ -30,7 +34,6 @@ logger = logging.getLogger(__name__) -ANOTHER_DATABASE_APP_NAME = "another-database-a" MEDIAN_REELECTION_TIME = 12 @@ -42,6 +45,12 @@ @pytest.mark.abort_on_fail async def test_build_and_deploy(ops_test: OpsTest) -> None: """Build and deploy one unit of MongoDB.""" + # it is possible for users to provide their own cluster for testing. Hence check if there + # is a pre-existing cluster. + app_name = await get_app_name(ops_test) + if app_name: + return await check_or_scale_app(ops_test, app_name, len(UNIT_IDS)) + my_charm = await ops_test.build_charm(".") await ops_test.model.deploy(my_charm, num_units=len(UNIT_IDS)) await ops_test.model.wait_for_idle() @@ -50,17 +59,19 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: @pytest.mark.abort_on_fail async def test_status(ops_test: OpsTest) -> None: """Verifies that the application and unit are active.""" - await ops_test.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000) - assert len(ops_test.model.applications[APP_NAME].units) == len(UNIT_IDS) + app_name = await get_app_name(ops_test) + await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) + assert len(ops_test.model.applications[app_name].units) == len(UNIT_IDS) @pytest.mark.parametrize("unit_id", UNIT_IDS) async def test_unit_is_running_as_replica_set(ops_test: OpsTest, unit_id: int) -> None: """Tests that mongodb is running as a replica set for the application unit.""" # connect to mongo replica set - unit = ops_test.model.applications[APP_NAME].units[unit_id] + app_name = await get_app_name(ops_test) + unit = ops_test.model.applications[app_name].units[unit_id] connection = unit.public_address + ":" + str(PORT) - client = MongoClient(connection, replicaset="mongodb") + client = MongoClient(connection, replicaset=app_name) # check mongo replica set is ready try: @@ -74,15 +85,18 @@ async def test_unit_is_running_as_replica_set(ops_test: OpsTest, unit_id: int) - async def test_leader_is_primary_on_deployment(ops_test: OpsTest) -> None: """Tests that right after deployment that the primary unit is the leader.""" + app_name = await get_app_name(ops_test) # grab leader unit - leader_unit = await find_unit(ops_test, leader=True) + leader_unit = await find_unit(ops_test, leader=True, app_name=app_name) # verify that we have a leader assert leader_unit is not None, "No unit is leader" # connect to mongod - password = await get_password(ops_test) - client = MongoClient(unit_uri(leader_unit.public_address, password), directConnection=True) + password = await get_password(ops_test, app_name=app_name) + client = MongoClient( + unit_uri(leader_unit.public_address, password, app_name), directConnection=True + ) # verify primary status assert client.is_primary, "Leader is not primary" @@ -91,9 +105,10 @@ async def test_leader_is_primary_on_deployment(ops_test: OpsTest) -> None: async def test_exactly_one_primary(ops_test: OpsTest) -> None: """Tests that there is exactly one primary in the deployed units.""" + app_name = await get_app_name(ops_test) try: - password = await get_password(ops_test) - number_of_primaries = count_primaries(ops_test, password) + password = await get_password(ops_test, app_name=app_name) + number_of_primaries = await count_primaries(ops_test, password, app_name=app_name) except RetryError: number_of_primaries = 0 @@ -106,11 +121,14 @@ async def test_exactly_one_primary(ops_test: OpsTest) -> None: async def test_get_primary_action(ops_test: OpsTest) -> None: """Tests that action get-primary outputs the correct unit with the primary replica.""" # determine which unit is the primary + app_name = await get_app_name(ops_test) expected_primary = None - for unit in ops_test.model.applications[APP_NAME].units: + for unit in ops_test.model.applications[app_name].units: # connect to mongod password = await get_password(ops_test) - client = MongoClient(unit_uri(unit.public_address, password), directConnection=True) + client = MongoClient( + unit_uri(unit.public_address, password, app_name), directConnection=True + ) # check primary status if client.is_primary: @@ -122,7 +140,7 @@ async def test_get_primary_action(ops_test: OpsTest) -> None: # check if get-primary returns the correct primary unit regardless of # which unit the action is run on - for unit in ops_test.model.applications[APP_NAME].units: + for unit in ops_test.model.applications[app_name].units: # use get-primary action to find primary action = await unit.run_action("get-primary") action = await action.wait() @@ -143,10 +161,13 @@ async def test_set_password_action(ops_test: OpsTest) -> None: assert new_password != old_password new_password_reported = await get_password(ops_test) assert new_password == new_password_reported + user_app_name = await get_app_name(ops_test) # verify that the password is updated in mongod by inserting into the collection. try: - client = MongoClient(unit_uri(unit.public_address, new_password), directConnection=True) + client = MongoClient( + unit_uri(unit.public_address, new_password, user_app_name), directConnection=True + ) client["new-db"].list_collection_names() except PyMongoError as e: assert False, f"Failed to access collection with new password, error: {e}" @@ -164,7 +185,9 @@ async def test_set_password_action(ops_test: OpsTest) -> None: # verify that the password is updated in mongod by inserting into the collection. try: - client = MongoClient(unit_uri(unit.public_address, "safe_pass"), directConnection=True) + client = MongoClient( + unit_uri(unit.public_address, "safe_pass", user_app_name), directConnection=True + ) client["new-db"].list_collection_names() except PyMongoError as e: assert False, f"Failed to access collection with new password, error: {e}" @@ -174,13 +197,14 @@ async def test_set_password_action(ops_test: OpsTest) -> None: async def test_monitor_user(ops_test: OpsTest) -> None: """Test verifies that the monitor user can perform operations such as 'rs.conf()'.""" - unit = ops_test.model.applications[APP_NAME].units[0] - password = await get_password(ops_test, "mongodb", "monitor") + app_name = await get_app_name(ops_test) + unit = ops_test.model.applications[app_name].units[0] + password = await get_password(ops_test, username="monitor") replica_set_hosts = [ - unit.public_address for unit in ops_test.model.applications["mongodb"].units + unit.public_address for unit in ops_test.model.applications[app_name].units ] hosts = ",".join(replica_set_hosts) - replica_set_uri = f"mongodb://monitor:{password}@{hosts}/admin?replicaSet=mongodb" + replica_set_uri = f"mongodb://monitor:{password}@{hosts}/admin?replicaSet={app_name}" admin_mongod_cmd = f"{MONGO_SHELL} '{replica_set_uri}' --eval 'rs.conf()'" check_monitor_cmd = f"exec --unit {unit.name} -- {admin_mongod_cmd}" @@ -270,13 +294,13 @@ async def test_exactly_one_primary_reported_by_juju(ops_test: OpsTest) -> None: async def get_unit_messages(): """Collects unit status messages.""" - app = await app_name(ops_test) + app_name = await get_app_name(ops_test) unit_messages = {} async with ops_test.fast_forward(): time.sleep(20) - for unit in ops_test.model.applications[app].units: + for unit in ops_test.model.applications[app_name].units: unit_messages[unit.entity_id] = unit.workload_status_message return unit_messages @@ -314,3 +338,24 @@ def juju_reports_one_primary(unit_messages): # cleanup, remove killed unit await ops_test.model.destroy_unit(target_unit) + + +@pytest.mark.skip("Skipping until write to log files enabled") +async def test_audit_log(ops_test: OpsTest) -> None: + """Test that audit log was created and contains actual audit data.""" + app_name = await get_app_name(ops_test) + leader_unit = await find_unit(ops_test, leader=True, app_name=app_name) + audit_log_snap_path = "/var/snap/charmed-mongodb/common/var/log/mongodb/audit.log" + audit_log = check_output( + f"JUJU_MODEL={ops_test.model_full_name} juju ssh {leader_unit.name} 'sudo cat {audit_log_snap_path}'", + stderr=subprocess.PIPE, + shell=True, + universal_newlines=True, + ) + + for line in audit_log.splitlines(): + if not len(line): + continue + item = json.loads(line) + # basic sanity check + assert audit_log_line_sanity_check(item), "Audit sanity log check failed for first line" diff --git a/tests/integration/tls_tests/helpers.py b/tests/integration/tls_tests/helpers.py index a3fd4065d..3523f1ee2 100644 --- a/tests/integration/tls_tests/helpers.py +++ b/tests/integration/tls_tests/helpers.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. +import json import logging from datetime import datetime @@ -9,9 +10,15 @@ from pytest_operator.plugin import OpsTest from tenacity import RetryError, Retrying, stop_after_attempt, wait_exponential -from ..ha_tests.helpers import app_name -from ..helpers import get_password +from ..helpers import ( + get_app_name, + get_application_relation_data, + get_password, + get_secret_content, + get_secret_id, +) +# TODO move this to a separate constants file PORT = 27017 MONGODB_SNAP_DATA_DIR = "/var/snap/charmed-mongodb/current" @@ -21,6 +28,8 @@ INTERNAL_CERT_PATH = f"{MONGOD_CONF_DIR}/internal-ca.crt" EXTERNAL_PEM_PATH = f"{MONGOD_CONF_DIR}/external-cert.pem" +TLS_RELATION_NAME = "certificates" + logger = logging.getLogger(__name__) @@ -28,13 +37,15 @@ class ProcessError(Exception): """Raised when a process fails.""" -async def mongo_tls_command(ops_test: OpsTest) -> str: +async def mongo_tls_command(ops_test: OpsTest, app_name=None) -> str: """Generates a command which verifies TLS status.""" - app = await app_name(ops_test) - replica_set_hosts = [unit.public_address for unit in ops_test.model.applications[app].units] - password = await get_password(ops_test, app) + app_name = app_name or await get_app_name(ops_test) + replica_set_hosts = [ + unit.public_address for unit in ops_test.model.applications[app_name].units + ] + password = await get_password(ops_test, app_name=app_name) hosts = ",".join(replica_set_hosts) - replica_set_uri = f"mongodb://operator:" f"{password}@" f"{hosts}/admin?replicaSet={app}" + replica_set_uri = f"mongodb://operator:" f"{password}@" f"{hosts}/admin?replicaSet={app_name}" return ( f"{MONGO_SHELL} '{replica_set_uri}' --eval 'rs.status()'" @@ -43,7 +54,7 @@ async def mongo_tls_command(ops_test: OpsTest) -> str: ) -async def check_tls(ops_test: OpsTest, unit: ops.model.Unit, enabled: bool) -> bool: +async def check_tls(ops_test: OpsTest, unit: ops.model.Unit, enabled: bool, app_name=None) -> bool: """Returns whether TLS is enabled on the specific PostgreSQL instance. Args: @@ -59,7 +70,7 @@ async def check_tls(ops_test: OpsTest, unit: ops.model.Unit, enabled: bool) -> b stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=30) ): with attempt: - mongod_tls_check = await mongo_tls_command(ops_test) + mongod_tls_check = await mongo_tls_command(ops_test, app_name=app_name) check_tls_cmd = f"exec --unit {unit.name} -- {mongod_tls_check}" return_code, _, _ = await ops_test.juju(*check_tls_cmd.split()) tls_enabled = return_code == 0 @@ -132,3 +143,58 @@ async def scp_file_preserve_ctime(ops_test: OpsTest, unit_name: str, path: str) ) return f"{filename}" + + +async def check_certs_correctly_distributed( + ops_test: OpsTest, unit: ops.Unit, app_name=None +) -> None: + """Comparing expected vs distributed certificates. + + Verifying certificates downloaded on the charm against the ones distributed by the TLS operator + """ + app_name = app_name or await get_app_name(ops_test) + app_secret_id = await get_secret_id(ops_test, app_name) + unit_secret_id = await get_secret_id(ops_test, unit.name) + app_secret_content = await get_secret_content(ops_test, app_secret_id) + unit_secret_content = await get_secret_content(ops_test, unit_secret_id) + app_current_crt = app_secret_content["csr-secret"] + unit_current_crt = unit_secret_content["csr-secret"] + + # Get the values for certs from the relation, as provided by TLS Charm + certificates_raw_data = await get_application_relation_data( + ops_test, app_name, TLS_RELATION_NAME, "certificates" + ) + certificates_data = json.loads(certificates_raw_data) + + external_item = [ + data + for data in certificates_data + if data["certificate_signing_request"].rstrip() == unit_current_crt.rstrip() + ][0] + internal_item = [ + data + for data in certificates_data + if data["certificate_signing_request"].rstrip() == app_current_crt.rstrip() + ][0] + + # Get a local copy of the external cert + external_copy_path = await scp_file_preserve_ctime(ops_test, unit.name, EXTERNAL_CERT_PATH) + + # Get the external cert value from the relation + relation_external_cert = "\n".join(external_item["chain"]) + + # CHECK: Compare if they are the same + with open(external_copy_path) as f: + external_contents_file = f.read() + assert relation_external_cert == external_contents_file + + # Get a local copy of the internal cert + internal_copy_path = await scp_file_preserve_ctime(ops_test, unit.name, INTERNAL_CERT_PATH) + + # Get the external cert value from the relation + relation_internal_cert = "\n".join(internal_item["chain"]) + + # CHECK: Compare if they are the same + with open(internal_copy_path) as f: + internal_contents_file = f.read() + assert relation_internal_cert == internal_contents_file diff --git a/tests/integration/tls_tests/test_tls.py b/tests/integration/tls_tests/test_tls.py index 9b932c606..8ef6c32b1 100644 --- a/tests/integration/tls_tests/test_tls.py +++ b/tests/integration/tls_tests/test_tls.py @@ -2,107 +2,69 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. -import json +import os +from pathlib import Path import pytest -from ops import Unit +import yaml from pytest_operator.plugin import OpsTest -from ..helpers import get_application_relation_data, get_secret_content, get_secret_id +from ..helpers import UNIT_IDS, check_or_scale_app, get_app_name from .helpers import ( EXTERNAL_CERT_PATH, INTERNAL_CERT_PATH, + check_certs_correctly_distributed, check_tls, - scp_file_preserve_ctime, time_file_created, time_process_started, ) -MONGO_COMMON_DIR = "/var/snap/charmed-mongodb/common" TLS_CERTIFICATES_APP_NAME = "tls-certificates-operator" -TLS_RELATION_NAME = "certificates" -DATABASE_APP_NAME = "mongodb" + +DATABASE_METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) +PORT = 27017 +DATABASE_APP_NAME = DATABASE_METADATA["name"] TLS_TEST_DATA = "tests/integration/tls_tests/data" DB_SERVICE = "snap.charmed-mongodb.mongod.service" -async def check_certs_correctly_distributed(ops_test: OpsTest, unit: Unit) -> None: - """Comparing expected vs distributed certificates. - - Verifying certificates downloaded on the charm against the ones distributed by the TLS operator - """ - app_secret_id = await get_secret_id(ops_test, DATABASE_APP_NAME) - unit_secret_id = await get_secret_id(ops_test, unit.name) - app_secret_content = await get_secret_content(ops_test, app_secret_id) - unit_secret_content = await get_secret_content(ops_test, unit_secret_id) - app_current_crt = app_secret_content["csr-secret"] - unit_current_crt = unit_secret_content["csr-secret"] - - # Get the values for certs from the relation, as provided by TLS Charm - certificates_raw_data = await get_application_relation_data( - ops_test, DATABASE_APP_NAME, TLS_RELATION_NAME, "certificates" - ) - certificates_data = json.loads(certificates_raw_data) - - external_item = [ - data - for data in certificates_data - if data["certificate_signing_request"].rstrip() == unit_current_crt.rstrip() - ][0] - internal_item = [ - data - for data in certificates_data - if data["certificate_signing_request"].rstrip() == app_current_crt.rstrip() - ][0] - - # Get a local copy of the external cert - external_copy_path = await scp_file_preserve_ctime(ops_test, unit.name, EXTERNAL_CERT_PATH) - - # Get the external cert value from the relation - relation_external_cert = "\n".join(external_item["chain"]) - - # CHECK: Compare if they are the same - with open(external_copy_path) as f: - external_contents_file = f.read() - assert relation_external_cert == external_contents_file - - # Get a local copy of the internal cert - internal_copy_path = await scp_file_preserve_ctime(ops_test, unit.name, INTERNAL_CERT_PATH) - - # Get the external cert value from the relation - relation_internal_cert = "\n".join(internal_item["chain"]) - - # CHECK: Compare if they are the same - with open(internal_copy_path) as f: - internal_contents_file = f.read() - assert relation_internal_cert == internal_contents_file - - +@pytest.mark.skipif( + os.environ.get("PYTEST_SKIP_DEPLOY", False), + reason="skipping deploy, model expected to be provided.", +) @pytest.mark.abort_on_fail async def test_build_and_deploy(ops_test: OpsTest) -> None: """Build and deploy one unit of MongoDB and one unit of TLS.""" - async with ops_test.fast_forward(): - my_charm = await ops_test.build_charm(".") - await ops_test.model.deploy(my_charm, num_units=3) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active") - - config = {"generate-self-signed-certificates": "true", "ca-common-name": "Test CA"} - await ops_test.model.deploy(TLS_CERTIFICATES_APP_NAME, channel="stable", config=config) - await ops_test.model.wait_for_idle( - apps=[TLS_CERTIFICATES_APP_NAME], status="active", timeout=1000 - ) + # it is possible for users to provide their own cluster for testing. Hence check if there + # is a pre-existing cluster. + app_name = await get_app_name(ops_test) + if app_name: + await check_or_scale_app(ops_test, app_name, len(UNIT_IDS)) + else: + app_name = DATABASE_APP_NAME + async with ops_test.fast_forward(): + my_charm = await ops_test.build_charm(".") + await ops_test.model.deploy(my_charm, num_units=3) + await ops_test.model.wait_for_idle(apps=[app_name], status="active") + + config = {"generate-self-signed-certificates": "true", "ca-common-name": "Test CA"} + await ops_test.model.deploy(TLS_CERTIFICATES_APP_NAME, channel="stable", config=config) + await ops_test.model.wait_for_idle( + apps=[TLS_CERTIFICATES_APP_NAME], status="active", timeout=1000 + ) async def test_enable_tls(ops_test: OpsTest) -> None: """Verify each unit has TLS enabled after relating to the TLS application.""" - # Relate it to the PostgreSQL to enable TLS. - await ops_test.model.relate(DATABASE_APP_NAME, TLS_CERTIFICATES_APP_NAME) + # Relate it to the MongoDB to enable TLS. + app_name = await get_app_name(ops_test) or DATABASE_APP_NAME + await ops_test.model.integrate(app_name, TLS_CERTIFICATES_APP_NAME) await ops_test.model.wait_for_idle(status="active", timeout=1000, idle_period=60) # Wait for all units enabling TLS. - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: - assert await check_tls(ops_test, unit, enabled=True) + for unit in ops_test.model.applications[app_name].units: + assert await check_tls(ops_test, unit, enabled=True, app_name=app_name) async def test_rotate_tls_key(ops_test: OpsTest) -> None: @@ -114,7 +76,10 @@ async def test_rotate_tls_key(ops_test: OpsTest) -> None: # private keys these certificates should be updated and the mongod service should be # restarted original_tls_times = {} - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + + app_name = await get_app_name(ops_test) or DATABASE_APP_NAME + + for unit in ops_test.model.applications[app_name].units: original_tls_times[unit.name] = {} original_tls_times[unit.name]["external_cert"] = await time_file_created( ops_test, unit.name, EXTERNAL_CERT_PATH @@ -128,7 +93,7 @@ async def test_rotate_tls_key(ops_test: OpsTest) -> None: check_certs_correctly_distributed(ops_test, unit) # set external and internal key using auto-generated key for each unit - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + for unit in ops_test.model.applications[app_name].units: action = await unit.run_action(action_name="set-tls-private-key") action = await action.wait() assert action.status == "completed", "setting external and internal key failed." @@ -136,17 +101,17 @@ async def test_rotate_tls_key(ops_test: OpsTest) -> None: # wait for certificate to be available and processed. Can get receive two certificate # available events and restart twice so we want to ensure we are idle for at least 1 minute await ops_test.model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", timeout=1000, idle_period=60 + apps=[app_name], status="active", timeout=1000, idle_period=60 ) # After updating both the external key and the internal key a new certificate request will be # made; then the certificates should be available and updated. - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + for unit in ops_test.model.applications[app_name].units: new_external_cert_time = await time_file_created(ops_test, unit.name, EXTERNAL_CERT_PATH) new_internal_cert_time = await time_file_created(ops_test, unit.name, INTERNAL_CERT_PATH) new_mongod_service_time = await time_process_started(ops_test, unit.name, DB_SERVICE) - check_certs_correctly_distributed(ops_test, unit) + check_certs_correctly_distributed(ops_test, unit, app_name=app_name) assert ( new_external_cert_time > original_tls_times[unit.name]["external_cert"] @@ -162,9 +127,9 @@ async def test_rotate_tls_key(ops_test: OpsTest) -> None: ), f"mongod service for {unit.name} was not restarted." # Verify that TLS is functioning on all units. - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + for unit in ops_test.model.applications[app_name].units: assert await check_tls( - ops_test, unit, enabled=True + ops_test, unit, enabled=True, app_name=app_name ), f"tls is not enabled for {unit.name}." @@ -177,7 +142,8 @@ async def test_set_tls_key(ops_test: OpsTest) -> None: # private keys these certificates should be updated and the mongod service should be # restarted original_tls_times = {} - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + app_name = await get_app_name(ops_test) or DATABASE_APP_NAME + for unit in ops_test.model.applications[app_name].units: original_tls_times[unit.name] = {} original_tls_times[unit.name]["external_cert"] = await time_file_created( ops_test, unit.name, EXTERNAL_CERT_PATH @@ -194,8 +160,8 @@ async def test_set_tls_key(ops_test: OpsTest) -> None: internal_key_contents = "".join(internal_key_contents) # set external and internal key for each unit - for unit_id in range(len(ops_test.model.applications[DATABASE_APP_NAME].units)): - unit = ops_test.model.applications[DATABASE_APP_NAME].units[unit_id] + for unit_id in range(len(ops_test.model.applications[app_name].units)): + unit = ops_test.model.applications[app_name].units[unit_id] with open(f"{TLS_TEST_DATA}/external-key-{unit_id}.pem") as f: external_key_contents = f.readlines() @@ -216,17 +182,17 @@ async def test_set_tls_key(ops_test: OpsTest) -> None: # wait for certificate to be available and processed. Can get receive two certificate # available events and restart twice so we want to ensure we are idle for at least 1 minute await ops_test.model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", timeout=1000, idle_period=60 + apps=[app_name], status="active", timeout=1000, idle_period=60 ) # After updating both the external key and the internal key a new certificate request will be # made; then the certificates should be available and updated. - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + for unit in ops_test.model.applications[app_name].units: new_external_cert_time = await time_file_created(ops_test, unit.name, EXTERNAL_CERT_PATH) new_internal_cert_time = await time_file_created(ops_test, unit.name, INTERNAL_CERT_PATH) new_mongod_service_time = await time_process_started(ops_test, unit.name, DB_SERVICE) - check_certs_correctly_distributed(ops_test, unit) + check_certs_correctly_distributed(ops_test, unit, app_name=app_name) assert ( new_external_cert_time > original_tls_times[unit.name]["external_cert"] @@ -242,23 +208,24 @@ async def test_set_tls_key(ops_test: OpsTest) -> None: ), f"mongod service for {unit.name} was not restarted." # Verify that TLS is functioning on all units. - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + for unit in ops_test.model.applications[app_name].units: assert await check_tls( - ops_test, unit, enabled=True + ops_test, unit, enabled=True, app_name=app_name ), f"tls is not enabled for {unit.name}." async def test_disable_tls(ops_test: OpsTest) -> None: """Verify each unit has TLS disabled after removing relation to the TLS application.""" # Remove the relation. - await ops_test.model.applications[DATABASE_APP_NAME].remove_relation( - f"{DATABASE_APP_NAME}:certificates", f"{TLS_CERTIFICATES_APP_NAME}:certificates" + app_name = await get_app_name(ops_test) or DATABASE_APP_NAME + await ops_test.model.applications[app_name].remove_relation( + f"{app_name}:certificates", f"{TLS_CERTIFICATES_APP_NAME}:certificates" ) await ops_test.model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", timeout=1000, idle_period=60 + apps=[app_name], status="active", timeout=1000, idle_period=60 ) # Wait for all units disabling TLS. - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: - assert await check_tls(ops_test, unit, enabled=False) + for unit in ops_test.model.applications[app_name].units: + assert await check_tls(ops_test, unit, enabled=False, app_name=app_name) diff --git a/tests/unit/test_config_server_lib.py b/tests/unit/test_config_server_lib.py index 45b7d4d90..3a7f27fb2 100644 --- a/tests/unit/test_config_server_lib.py +++ b/tests/unit/test_config_server_lib.py @@ -84,3 +84,59 @@ def is_config_mock_call(*args): self.harness.set_leader(False) self.harness.charm.cluster.update_config_server_db(mock.Mock()) self.harness.charm.cluster.database_provides.update_relation_data.assert_not_called() + + def test_pass_hooks_check_waits_for_start_config_server(self): + """Ensure that pass_hooks defers until the database is initialized. + + Note: in some cases sharding related hooks execute before config and leader elected hooks, + therefore it is important that the `pass_hooks_check` defers an event until the database + has been started + """ + + def is_shard_mock_call(*args): + return args == ("shard") + + self.harness.charm.is_role = is_shard_mock_call + + event = mock.Mock() + event.params = {} + + self.harness.set_leader(False) + self.harness.charm.config_server.pass_hook_checks(event) + event.defer.assert_called() + + # once the database has been initialised, pass hooks check should no longer defer if the + # unit is not the leader nor is the wrong wrole + event = mock.Mock() + event.params = {} + self.harness.charm.app_peer_data["db_initialised"] = "True" + self.harness.charm.config_server.pass_hook_checks(event) + event.defer.assert_not_called() + + def test_pass_hooks_check_waits_for_start_shard(self): + """Ensure that pass_hooks defers until the database is initialized. + + Note: in some cases sharding related hooks execute before config and leader elected hooks, + therefore it is important that the `pass_hooks_check` defers an event until the database + has been started + """ + + def is_config_mock_call(*args): + return args == ("config-server") + + self.harness.charm.is_role = is_config_mock_call + + event = mock.Mock() + event.params = {} + + self.harness.set_leader(False) + self.harness.charm.shard.pass_hook_checks(event) + event.defer.assert_called() + + # once the database has been initialised, pass hooks check should no longer defer if the + # unit is not the leader nor is the wrong wrole + event = mock.Mock() + event.params = {} + self.harness.charm.app_peer_data["db_initialised"] = "True" + self.harness.charm.shard.pass_hook_checks(event) + event.defer.assert_not_called() diff --git a/tests/unit/test_mongodb_helpers.py b/tests/unit/test_mongodb_helpers.py index d64557a12..f9c78a515 100644 --- a/tests/unit/test_mongodb_helpers.py +++ b/tests/unit/test_mongodb_helpers.py @@ -14,6 +14,8 @@ def test_get_mongod_args(self): "--replSet=my_repl_set", "--dbpath=/var/snap/charmed-mongodb/common/var/lib/mongodb", "--port=27017", + "--auditDestination=syslog", + "--auditFormat=JSON", "--auth", "--clusterAuthMode=keyFile", "--keyFile=/var/snap/charmed-mongodb/current/etc/mongod/keyFile", @@ -36,6 +38,8 @@ def test_get_mongod_args(self): "--replSet=my_repl_set", "--dbpath=/var/snap/charmed-mongodb/common/var/lib/mongodb", "--port=27017", + "--auditDestination=syslog", + "--auditFormat=JSON", ] self.assertEqual( @@ -50,7 +54,8 @@ def test_get_mongod_args(self): "--replSet=my_repl_set", "--dbpath=/var/lib/mongodb", "--port=27017", - "--logpath=/var/lib/mongodb/mongodb.log", + "--auditDestination=syslog", + "--auditFormat=JSON", ] self.assertEqual( diff --git a/tox.ini b/tox.ini index 34c97d6e5..700b12c10 100644 --- a/tox.ini +++ b/tox.ini @@ -216,6 +216,22 @@ deps = commands = pytest -v --tb native --log-cli-level=INFO -s --durations=0 {posargs} {[vars]tests_path}/integration/sharding_tests/test_sharding_relations.py +[testenv:sharding-race-conditions] +description = Run sharding race condition tests +pass_env = + {[testenv]pass_env} + CI +deps = + pytest + juju==3.2.0.1 + pytest-mock + pytest-operator + protobuf==3.20 # temporary fix until new libjuju is released + git+https://github.com/canonical/data-platform-workflows@v8\#subdirectory=python/pytest_plugins/pytest_operator_cache + -r {tox_root}/requirements.txt +commands = + pytest -v --tb native --log-cli-level=INFO -s --durations=0 {posargs} {[vars]tests_path}/integration/sharding_tests/test_sharding_race_conds.py + [testenv:integration] description = Run all integration tests