diff --git a/tests/integration/backup_tests/helpers.py b/tests/integration/backup_tests/helpers.py index a5ff7a633..7ffcc4edd 100644 --- a/tests/integration/backup_tests/helpers.py +++ b/tests/integration/backup_tests/helpers.py @@ -9,6 +9,7 @@ from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed from ..ha_tests import helpers as ha_helpers +from ..helpers import get_app_name S3_APP_NAME = "s3-integrator" TIMEOUT = 10 * 60 @@ -62,30 +63,12 @@ async def create_and_verify_backup(ops_test: OpsTest) -> None: async def get_leader_unit(ops_test: OpsTest, db_app_name=None) -> ops.model.Unit: """Returns the leader unit of the database charm.""" - db_app_name = db_app_name or await app_name(ops_test) + db_app_name = db_app_name or await get_app_name(ops_test) for unit in ops_test.model.applications[db_app_name].units: if await unit.is_leader_from_status(): return unit -async def app_name(ops_test: OpsTest) -> str: - """Returns the name of the cluster running MongoDB. - - This is important since not all deployments of the MongoDB charm have the application name - "mongodb". - - Note: if multiple clusters are running MongoDB this will return the one first found. - """ - status = await ops_test.model.get_status() - for app in ops_test.model.applications: - # note that format of the charm field is not exactly "mongodb" but instead takes the form - # of `local:focal/mongodb-6` - if "mongodb" in status["applications"][app]["charm"]: - return app - - return None - - async def count_logical_backups(db_unit: ops.model.Unit) -> int: """Count the number of logical backups.""" action = await db_unit.run_action(action_name="list-backups") @@ -142,11 +125,11 @@ def is_relation_joined(ops_test: OpsTest, endpoint_one: str, endpoint_two: str) async def insert_unwanted_data(ops_test: OpsTest) -> None: """Inserts the data into the MongoDB cluster via primary replica.""" - app = await app_name(ops_test) - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] + app_name = await get_app_name(ops_test) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] primary = (await ha_helpers.replica_set_primary(ip_addresses, ops_test)).public_address - password = await ha_helpers.get_password(ops_test, app) - client = MongoClient(ha_helpers.unit_uri(primary, password, app), directConnection=True) + password = await ha_helpers.get_password(ops_test, app_name) + client = MongoClient(ha_helpers.unit_uri(primary, password, app_name), directConnection=True) db = client["new-db"] test_collection = db["test_collection"] test_collection.insert_one({"unwanted_data": "bad data 1"}) diff --git a/tests/integration/backup_tests/test_backups.py b/tests/integration/backup_tests/test_backups.py index c64145f67..4d73a6040 100644 --- a/tests/integration/backup_tests/test_backups.py +++ b/tests/integration/backup_tests/test_backups.py @@ -2,6 +2,7 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. import asyncio +import logging import secrets import string import time @@ -10,6 +11,8 @@ from pytest_operator.plugin import OpsTest from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed +from tests.integration.helpers import get_app_name + from ..ha_tests import helpers as ha_helpers from . import helpers @@ -18,6 +21,8 @@ ENDPOINT = "s3-credentials" NEW_CLUSTER = "new-mongodb" +logger = logging.getLogger(__name__) + @pytest.fixture() async def continuous_writes_to_db(ops_test: OpsTest): @@ -43,7 +48,7 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: """Build and deploy one unit of MongoDB.""" # it is possible for users to provide their own cluster for testing. Hence check if there # is a pre-existing cluster. - if not await helpers.app_name(ops_test): + if not await get_app_name(ops_test): db_charm = await ops_test.build_charm(".") await ops_test.model.deploy(db_charm, num_units=3) @@ -56,7 +61,7 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: @pytest.mark.abort_on_fail async def test_blocked_incorrect_creds(ops_test: OpsTest) -> None: """Verifies that the charm goes into blocked status when s3 creds are incorrect.""" - db_app_name = await helpers.app_name(ops_test) + db_app_name = await get_app_name(ops_test) # set incorrect s3 credentials s3_integrator_unit = ops_test.model.applications[S3_APP_NAME].units[0] @@ -86,7 +91,7 @@ async def test_blocked_incorrect_creds(ops_test: OpsTest) -> None: @pytest.mark.abort_on_fail async def test_blocked_incorrect_conf(ops_test: OpsTest) -> None: """Verifies that the charm goes into blocked status when s3 config options are incorrect.""" - db_app_name = await helpers.app_name(ops_test) + db_app_name = await get_app_name(ops_test) # set correct AWS credentials for s3 storage but incorrect configs await helpers.set_credentials(ops_test, cloud="AWS") @@ -105,7 +110,7 @@ async def test_blocked_incorrect_conf(ops_test: OpsTest) -> None: @pytest.mark.abort_on_fail async def test_ready_correct_conf(ops_test: OpsTest) -> None: """Verifies charm goes into active status when s3 config and creds options are correct.""" - db_app_name = await helpers.app_name(ops_test) + db_app_name = await get_app_name(ops_test) choices = string.ascii_letters + string.digits unique_path = "".join([secrets.choice(choices) for _ in range(4)]) configuration_parameters = { @@ -127,16 +132,19 @@ async def test_ready_correct_conf(ops_test: OpsTest) -> None: @pytest.mark.abort_on_fail async def test_create_and_list_backups(ops_test: OpsTest) -> None: - db_unit = await helpers.get_leader_unit(ops_test) - + db_app_name = await get_app_name(ops_test) + leader_unit = await helpers.get_leader_unit(ops_test, db_app_name=db_app_name) + await helpers.set_credentials(ops_test, cloud="AWS") # verify backup list works - action = await db_unit.run_action(action_name="list-backups") + logger.error("!!!!! test_create_and_list_backups >>> %s", leader_unit) + action = await leader_unit.run_action(action_name="list-backups") list_result = await action.wait() + logger.error("!!!!! test_create_and_list_backups >>> %s", list_result.results) backups = list_result.results["backups"] assert backups, "backups not outputted" # verify backup is started - action = await db_unit.run_action(action_name="create-backup") + action = await leader_unit.run_action(action_name="create-backup") backup_result = await action.wait() assert "backup started" in backup_result.results["backup-status"], "backup didn't start" @@ -147,7 +155,7 @@ async def test_create_and_list_backups(ops_test: OpsTest) -> None: try: for attempt in Retrying(stop=stop_after_delay(20), wait=wait_fixed(3)): with attempt: - backups = await helpers.count_logical_backups(db_unit) + backups = await helpers.count_logical_backups(leader_unit) assert backups == 1 except RetryError: assert backups == 1, "Backup not created." @@ -161,7 +169,7 @@ async def test_multi_backup(ops_test: OpsTest, continuous_writes_to_db) -> None: from AWS to GCP. This test verifies that the first backup in AWS is made, the second backup in GCP is made, and that before the second backup is made that pbm correctly resyncs. """ - db_app_name = await helpers.app_name(ops_test) + db_app_name = await get_app_name(ops_test) db_unit = await helpers.get_leader_unit(ops_test) # create first backup once ready @@ -247,7 +255,7 @@ async def test_restore(ops_test: OpsTest, add_writes_to_db) -> None: assert number_writes > 0, "no writes to backup" # create a backup in the AWS bucket - db_app_name = await helpers.app_name(ops_test) + db_app_name = await get_app_name(ops_test) db_unit = await helpers.get_leader_unit(ops_test) prev_backups = await helpers.count_logical_backups(db_unit) action = await db_unit.run_action(action_name="create-backup") @@ -296,7 +304,7 @@ async def test_restore(ops_test: OpsTest, add_writes_to_db) -> None: @pytest.mark.parametrize("cloud_provider", ["AWS", "GCP"]) async def test_restore_new_cluster(ops_test: OpsTest, add_writes_to_db, cloud_provider): # configure test for the cloud provider - db_app_name = await helpers.app_name(ops_test) + db_app_name = await get_app_name(ops_test) await helpers.set_credentials(ops_test, cloud=cloud_provider) if cloud_provider == "AWS": configuration_parameters = { @@ -383,7 +391,7 @@ async def test_restore_new_cluster(ops_test: OpsTest, add_writes_to_db, cloud_pr @pytest.mark.abort_on_fail async def test_update_backup_password(ops_test: OpsTest) -> None: """Verifies that after changing the backup password the pbm tool is updated and functional.""" - db_app_name = await helpers.app_name(ops_test) + db_app_name = await get_app_name(ops_test) db_unit = await helpers.get_leader_unit(ops_test) # wait for charm to be idle before setting password diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index fde1f58cc..becbe9d99 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -2,6 +2,7 @@ # See LICENSE file for licensing details. import json +import logging import subprocess import time from datetime import datetime @@ -25,6 +26,9 @@ wait_fixed, ) +from ..helpers import get_app_name, get_unit_ip, instance_ip + +# TODO move these to a separate file for constants \ config METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) PORT = 27017 APP_NAME = METADATA["name"] @@ -37,6 +41,8 @@ EXPORTER_PROC = "/usr/bin/mongodb_exporter" GREP_PROC = "grep" +logger = logging.getLogger(__name__) + class ProcessError(Exception): """Raised when a process fails.""" @@ -46,33 +52,33 @@ class ProcessRunningError(Exception): """Raised when a process is running when it is not expected to be.""" -def replica_set_client(replica_ips: List[str], password: str, app=APP_NAME) -> MongoClient: +def replica_set_client(replica_ips: List[str], password: str, app_name: str) -> MongoClient: """Generates the replica set URI for multiple IP addresses. Args: replica_ips: list of ips hosting the replica set. password: password of database. - app: name of application which hosts the cluster. + app_name: name of application which hosts the cluster. """ hosts = ["{}:{}".format(replica_ip, PORT) for replica_ip in replica_ips] hosts = ",".join(hosts) - replica_set_uri = f"mongodb://operator:" f"{password}@" f"{hosts}/admin?replicaSet={app}" + replica_set_uri = f"mongodb://operator:" f"{password}@" f"{hosts}/admin?replicaSet={app_name}" return MongoClient(replica_set_uri) -async def fetch_replica_set_members(replica_ips: List[str], ops_test: OpsTest): +async def fetch_replica_set_members(replica_ips: List[str], ops_test: OpsTest, app_name: str): """Fetches the IPs listed as replica set members in the MongoDB replica set configuration. Args: replica_ips: list of ips hosting the replica set. ops_test: reference to deployment. - app: name of application which has the cluster. + app_name: name of application which has the cluster. """ # connect to replica set uri - app = await app_name(ops_test) - password = await get_password(ops_test, app) - client = replica_set_client(replica_ips, password, app) + app_name = app_name or await get_app_name(ops_test) + password = await get_password(ops_test, app_name) + client = replica_set_client(replica_ips, password, app_name) # get ips from MongoDB replica set configuration rs_config = client.admin.command("replSetGetConfig") @@ -86,43 +92,44 @@ async def fetch_replica_set_members(replica_ips: List[str], ops_test: OpsTest): return member_ips -def unit_uri(ip_address: str, password, app=APP_NAME) -> str: +def unit_uri(ip_address: str, password, app_name=APP_NAME) -> str: """Generates URI that is used by MongoDB to connect to a single replica. Args: ip_address: ip address of replica/unit password: password of database. - app: name of application which has the cluster. + app_name: name of application which has the cluster. """ - return f"mongodb://operator:" f"{password}@" f"{ip_address}:{PORT}/admin?replicaSet={app}" + return f"mongodb://operator:" f"{password}@" f"{ip_address}:{PORT}/admin?replicaSet={app_name}" -async def get_password(ops_test: OpsTest, app, down_unit=None) -> str: +# TODO remove this duplicate with helpers.py +async def get_password(ops_test: OpsTest, app_name, down_unit=None) -> str: """Use the charm action to retrieve the password from provided unit. Returns: String with the password stored on the peer relation databag. """ # some tests disable the network for units, so find a unit that is available - for unit in ops_test.model.applications[app].units: + for unit in ops_test.model.applications[app_name].units: if not unit.name == down_unit: unit_id = unit.name.split("/")[1] break - action = await ops_test.model.units.get(f"{app}/{unit_id}").run_action("get-password") + action = await ops_test.model.units.get(f"{app_name}/{unit_id}").run_action("get-password") action = await action.wait() return action.results["password"] async def fetch_primary( - replica_set_hosts: List[str], ops_test: OpsTest, down_unit=None, app=None + replica_set_hosts: List[str], ops_test: OpsTest, down_unit=None, app_name=None ) -> str: """Returns IP address of current replica set primary.""" # connect to MongoDB client - app = app or await app_name(ops_test) + app_name = app_name or await get_app_name(ops_test) - password = await get_password(ops_test, app, down_unit) - client = replica_set_client(replica_set_hosts, password, app) + password = await get_password(ops_test, app_name, down_unit) + client = replica_set_client(replica_set_hosts, password, app_name) # grab the replica set status try: @@ -143,13 +150,16 @@ async def fetch_primary( return primary -async def count_primaries(ops_test: OpsTest) -> int: +# TODO remove duplication with common helpers +async def count_primaries(ops_test: OpsTest, password: str = None, app_name: str = None) -> int: """Returns the number of primaries in a replica set.""" # connect to MongoDB client - app = await app_name(ops_test) - password = await get_password(ops_test, app) - replica_set_hosts = [unit.public_address for unit in ops_test.model.applications[app].units] - client = replica_set_client(replica_set_hosts, password, app) + app_name = app_name or await get_app_name(ops_test) + password = password or await get_password(ops_test, app_name) + replica_set_hosts = [ + unit.public_address for unit in ops_test.model.applications[app_name].units + ] + client = replica_set_client(replica_set_hosts, password, app_name) # grab the replica set status try: @@ -178,29 +188,29 @@ async def replica_set_primary( replica_set_hosts: List[str], ops_test: OpsTest, down_unit=None, - app=None, + app_name=None, ) -> Optional[ops.model.Unit]: """Returns the primary of the replica set. Retrying 5 times to give the replica set time to elect a new primary, also checks against the valid_ips to verify that the primary is not outdated. """ - app = app or await app_name(ops_test) - primary_ip = await fetch_primary(replica_set_hosts, ops_test, down_unit, app) + app_name = app_name or await get_app_name(ops_test) + primary_ip = await fetch_primary(replica_set_hosts, ops_test, down_unit, app_name) # return None if primary is no longer in the replica set if primary_ip is not None and primary_ip not in replica_set_hosts: return None - for unit in ops_test.model.applications[app].units: + for unit in ops_test.model.applications[app_name].units: if unit.public_address == str(primary_ip): return unit -async def retrieve_entries(ops_test, app, db_name, collection_name, query_field): +async def retrieve_entries(ops_test, app_name, db_name, collection_name, query_field): """Retries entries from a specified collection within a specified database.""" - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] - password = await get_password(ops_test, app) - client = replica_set_client(ip_addresses, password, app) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] + password = await get_password(ops_test, app_name) + client = replica_set_client(ip_addresses, password, app_name) db = client[db_name] test_collection = db[collection_name] @@ -215,45 +225,27 @@ async def retrieve_entries(ops_test, app, db_name, collection_name, query_field) return cluster_entries -async def find_unit(ops_test: OpsTest, leader: bool) -> ops.model.Unit: +async def find_unit(ops_test: OpsTest, leader: bool, app_name=None) -> ops.model.Unit: """Helper function identifies the a unit, based on need for leader or non-leader.""" ret_unit = None - app = await app_name(ops_test) - for unit in ops_test.model.applications[app].units: + app_name = app_name or await get_app_name(ops_test) + for unit in ops_test.model.applications[app_name].units: if await unit.is_leader_from_status() == leader: ret_unit = unit return ret_unit -async def app_name(ops_test: OpsTest) -> str: - """Returns the name of the cluster running MongoDB. - - This is important since not all deployments of the MongoDB charm have the application name - "mongodb". - - Note: if multiple clusters are running MongoDB this will return the one first found. - """ - status = await ops_test.model.get_status() - for app in ops_test.model.applications: - # note that format of the charm field is not exactly "mongodb" but instead takes the form - # of `local:focal/mongodb-6` - if "mongodb" in status["applications"][app]["charm"]: - return app - - return None - - async def clear_db_writes(ops_test: OpsTest) -> bool: """Stop the DB process and remove any writes to the test collection.""" await stop_continous_writes(ops_test) # remove collection from database - app = await app_name(ops_test) - password = await get_password(ops_test, app) - hosts = [unit.public_address for unit in ops_test.model.applications[app].units] + app_name = await get_app_name(ops_test) + password = await get_password(ops_test, app_name) + hosts = [unit.public_address for unit in ops_test.model.applications[app_name].units] hosts = ",".join(hosts) - connection_string = f"mongodb://operator:{password}@{hosts}/admin?replicaSet={app}" + connection_string = f"mongodb://operator:{password}@{hosts}/admin?replicaSet={app_name}" client = MongoClient(connection_string) db = client["new-db"] @@ -274,11 +266,11 @@ async def start_continous_writes(ops_test: OpsTest, starting_number: int) -> Non In the future this should be put in a dummy charm. """ - app = await app_name(ops_test) - password = await get_password(ops_test, app) - hosts = [unit.public_address for unit in ops_test.model.applications[app].units] + app_name = await get_app_name(ops_test) + password = await get_password(ops_test, app_name) + hosts = [unit.public_address for unit in ops_test.model.applications[app_name].units] hosts = ",".join(hosts) - connection_string = f"mongodb://operator:{password}@{hosts}/admin?replicaSet={app}" + connection_string = f"mongodb://operator:{password}@{hosts}/admin?replicaSet={app_name}" # run continuous writes in the background. subprocess.Popen( @@ -291,7 +283,7 @@ async def start_continous_writes(ops_test: OpsTest, starting_number: int) -> Non ) -async def stop_continous_writes(ops_test: OpsTest, down_unit=None) -> int: +async def stop_continous_writes(ops_test: OpsTest, down_unit=None, app_name=None) -> int: """Stops continuous writes to MongoDB and returns the last written value. In the future this should be put in a dummy charm. @@ -302,11 +294,11 @@ async def stop_continous_writes(ops_test: OpsTest, down_unit=None) -> int: # wait for process to be killed proc.communicate() - app = await app_name(ops_test) - password = await get_password(ops_test, app, down_unit) - hosts = [unit.public_address for unit in ops_test.model.applications[app].units] + app_name = app_name or await get_app_name(ops_test) + password = await get_password(ops_test, app_name, down_unit) + hosts = [unit.public_address for unit in ops_test.model.applications[app_name].units] hosts = ",".join(hosts) - connection_string = f"mongodb://operator:{password}@{hosts}/admin?replicaSet={app}" + connection_string = f"mongodb://operator:{password}@{hosts}/admin?replicaSet={app_name}" client = MongoClient(connection_string) db = client["new-db"] @@ -318,13 +310,13 @@ async def stop_continous_writes(ops_test: OpsTest, down_unit=None) -> int: return last_written_value -async def count_writes(ops_test: OpsTest, down_unit=None) -> int: +async def count_writes(ops_test: OpsTest, down_unit=None, app_name=None) -> int: """New versions of pymongo no longer support the count operation, instead find is used.""" - app = await app_name(ops_test) - password = await get_password(ops_test, app, down_unit) - hosts = [unit.public_address for unit in ops_test.model.applications[app].units] + app_name = app_name or await get_app_name(ops_test) + password = await get_password(ops_test, app_name, down_unit) + hosts = [unit.public_address for unit in ops_test.model.applications[app_name].units] hosts = ",".join(hosts) - connection_string = f"mongodb://operator:{password}@{hosts}/admin?replicaSet={app}" + connection_string = f"mongodb://operator:{password}@{hosts}/admin?replicaSet={app_name}" client = MongoClient(connection_string) db = client["new-db"] @@ -334,13 +326,13 @@ async def count_writes(ops_test: OpsTest, down_unit=None) -> int: return count -async def secondary_up_to_date(ops_test: OpsTest, unit_ip, expected_writes) -> bool: +async def secondary_up_to_date(ops_test: OpsTest, unit_ip, expected_writes, app_name=None) -> bool: """Checks if secondary is up to date with the cluster. Retries over the period of one minute to give secondary adequate time to copy over data. """ - app = await app_name(ops_test) - password = await get_password(ops_test, app) + app_name = app_name or await get_app_name(ops_test) + password = await get_password(ops_test, app_name) connection_string = f"mongodb://operator:{password}@{unit_ip}:{PORT}/admin?" client = MongoClient(connection_string, directConnection=True) @@ -407,29 +399,29 @@ def storage_id(ops_test, unit_name): return line.split()[1] -async def add_unit_with_storage(ops_test, app, storage): +async def add_unit_with_storage(ops_test, app_name, storage): """Adds unit with storage. Note: this function exists as a temporary solution until this issue is resolved: https://github.com/juju/python-libjuju/issues/695 """ - expected_units = len(ops_test.model.applications[app].units) + 1 - prev_units = [unit.name for unit in ops_test.model.applications[app].units] + expected_units = len(ops_test.model.applications[app_name].units) + 1 + prev_units = [unit.name for unit in ops_test.model.applications[app_name].units] model_name = ops_test.model.info.name - add_unit_cmd = f"add-unit {app} --model={model_name} --attach-storage={storage}".split() + add_unit_cmd = f"add-unit {app_name} --model={model_name} --attach-storage={storage}".split() await ops_test.juju(*add_unit_cmd) - await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1000) + await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) assert ( - len(ops_test.model.applications[app].units) == expected_units + len(ops_test.model.applications[app_name].units) == expected_units ), "New unit not added to model" # verify storage attached - curr_units = [unit.name for unit in ops_test.model.applications[app].units] + curr_units = [unit.name for unit in ops_test.model.applications[app_name].units] new_unit = list(set(curr_units) - set(prev_units))[0] assert storage_id(ops_test, new_unit) == storage, "unit added with incorrect storage" # return a reference to newly added unit - for unit in ops_test.model.applications[app].units: + for unit in ops_test.model.applications[app_name].units: if unit.name == new_unit: return unit @@ -467,26 +459,26 @@ async def reused_storage(ops_test: OpsTest, unit_name, removal_time) -> bool: return False -async def insert_focal_to_cluster(ops_test: OpsTest) -> None: +async def insert_focal_to_cluster(ops_test: OpsTest, app_name=None) -> None: """Inserts the Focal Fossa data into the MongoDB cluster via primary replica.""" - app = await app_name(ops_test) - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] + app_name = app_name or await get_app_name(ops_test) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] primary = (await replica_set_primary(ip_addresses, ops_test)).public_address - password = await get_password(ops_test, app) - client = MongoClient(unit_uri(primary, password, app), directConnection=True) + password = await get_password(ops_test, app_name) + client = MongoClient(unit_uri(primary, password, app_name), directConnection=True) db = client["new-db"] test_collection = db["test_ubuntu_collection"] test_collection.insert_one({"release_name": "Focal Fossa", "version": 20.04, "LTS": True}) client.close() -async def kill_unit_process(ops_test: OpsTest, unit_name: str, kill_code: str): +async def kill_unit_process(ops_test: OpsTest, unit_name: str, kill_code: str, app_name=None): """Kills the DB process on the unit according to the provided kill code.""" # killing the only replica can be disastrous - app = await app_name(ops_test) - if len(ops_test.model.applications[app].units) < 2: - await ops_test.model.applications[app].add_unit(count=1) - await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1000) + app_name = app_name or await get_app_name(ops_test) + if len(ops_test.model.applications[app_name].units) < 2: + await ops_test.model.applications[app_name].add_unit(count=1) + await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) kill_cmd = f"exec --unit {unit_name} -- pkill --signal {kill_code} -f {DB_PROCESS}" return_code, _, _ = await ops_test.juju(*kill_cmd.split()) @@ -496,11 +488,11 @@ async def kill_unit_process(ops_test: OpsTest, unit_name: str, kill_code: str): ) -async def mongod_ready(ops_test, unit_ip) -> bool: +async def mongod_ready(ops_test, unit_ip, app_name=None) -> bool: """Verifies replica is running and available.""" - app = await app_name(ops_test) - password = await get_password(ops_test, app) - client = MongoClient(unit_uri(unit_ip, password, app), directConnection=True) + app_name = app_name or await get_app_name(ops_test) + password = await get_password(ops_test, app_name) + client = MongoClient(unit_uri(unit_ip, password, app_name), directConnection=True) try: for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3)): with attempt: @@ -514,10 +506,10 @@ async def mongod_ready(ops_test, unit_ip) -> bool: return True -async def db_step_down(ops_test: OpsTest, old_primary_unit: str, sigterm_time: int): +async def db_step_down(ops_test: OpsTest, old_primary_unit: str, sigterm_time: int, app_name=None): # loop through all units that aren't the old primary - app = await app_name(ops_test) - for unit in ops_test.model.applications[app].units: + app_name = app_name or await get_app_name(ops_test) + for unit in ops_test.model.applications[app_name].units: # verify log file exists on this machine search_file = f"exec --unit {unit.name} ls {MONGODB_LOG_PATH}" return_code, _, _ = await ops_test.juju(*search_file.split()) @@ -549,14 +541,14 @@ async def db_step_down(ops_test: OpsTest, old_primary_unit: str, sigterm_time: i return False -async def all_db_processes_down(ops_test: OpsTest) -> bool: +async def all_db_processes_down(ops_test: OpsTest, app_name=None) -> bool: """Verifies that all units of the charm do not have the DB process running.""" - app = await app_name(ops_test) + app_name = app_name or await get_app_name(ops_test) try: for attempt in Retrying(stop=stop_after_attempt(60), wait=wait_fixed(3)): with attempt: - for unit in ops_test.model.applications[app].units: + for unit in ops_test.model.applications[app_name].units: search_db_process = f"exec --unit {unit.name} pgrep -x mongod" _, processes, _ = await ops_test.juju(*search_db_process.split()) # splitting processes by "\n" results in one or more empty lines, hence we @@ -664,28 +656,29 @@ async def start_mongod(ops_test: OpsTest, unit) -> None: @retry(stop=stop_after_attempt(8), wait=wait_fixed(15)) -async def verify_replica_set_configuration(ops_test: OpsTest) -> None: +async def verify_replica_set_configuration(ops_test: OpsTest, app_name=None) -> None: """Verifies presence of primary, replica set members, and number of primaries.""" - app = await app_name(ops_test) + app_name = app_name or await get_app_name(ops_test) # `get_unit_ip` is used instead of `.public_address` because of a bug in python-libjuju that # incorrectly reports the IP addresses after the network is restored this is reported as a # bug here: https://github.com/juju/python-libjuju/issues/738 . Once this bug is resolved use # of `get_unit_ip` should be replaced with `.public_address` ip_addresses = [ - await get_unit_ip(ops_test, unit.name) for unit in ops_test.model.applications[app].units + await get_unit_ip(ops_test, unit.name) + for unit in ops_test.model.applications[app_name].units ] # verify presence of primary - new_primary = await replica_set_primary(ip_addresses, ops_test) + new_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) assert new_primary.name, "primary not elected." # verify all units are running under the same replset - member_ips = await fetch_replica_set_members(ip_addresses, ops_test) + member_ips = await fetch_replica_set_members(ip_addresses, ops_test, app_name=app_name) assert set(member_ips) == set(ip_addresses), "all members not running under the same replset" # verify there is only one primary assert ( - await count_primaries(ops_test) == 1 + await count_primaries(ops_test, app_name=app_name) == 1 ), "there are more than one primary in the replica set." @@ -750,37 +743,6 @@ def is_machine_reachable_from(origin_machine: str, target_machine: str) -> bool: return False -async def unit_hostname(ops_test: OpsTest, unit_name: str) -> str: - """Get hostname for a unit. - - Args: - ops_test: The ops test object passed into every test case - unit_name: The name of the unit to be tested - - Returns: - The machine/container hostname - """ - _, raw_hostname, _ = await ops_test.juju("ssh", unit_name, "hostname") - return raw_hostname.strip() - - -def instance_ip(model: str, instance: str) -> str: - """Translate juju instance name to IP. - - Args: - model: The name of the model - instance: The name of the instance - - Returns: - The (str) IP address of the instance - """ - output = subprocess.check_output(f"juju machines --model {model}".split()) - - for line in output.decode("utf8").splitlines(): - if instance in line: - return line.split()[2] - - @retry(stop=stop_after_attempt(60), wait=wait_fixed(15)) def wait_network_restore(model_name: str, hostname: str, old_ip: str) -> None: """Wait until network is restored. @@ -794,18 +756,45 @@ def wait_network_restore(model_name: str, hostname: str, old_ip: str) -> None: raise Exception("Network not restored, IP address has not changed yet.") -async def get_unit_ip(ops_test: OpsTest, unit_name: str) -> str: - """Wrapper for getting unit ip. +async def scale_and_verify(ops_test: OpsTest, count: int, remove_leader: bool = False): + if count == 0: + logger.warning("Skipping scale up/down by 0") + return - Juju incorrectly reports the IP addresses after the network is restored this is reported as a - bug here: https://github.com/juju/python-libjuju/issues/738 . Once this bug is resolved use of - `get_unit_ip` should be replaced with `.public_address` + app_name = await get_app_name(ops_test) - Args: - ops_test: The ops test object passed into every test case - unit_name: The name of the unit to be tested + if count > 0: + logger.info(f"Scaling up by {count} units") + await ops_test.model.applications[app_name].add_units(count) + else: + logger.info(f"Scaling down by {abs(count)} units") + # find leader unit + leader_unit = await find_unit(ops_test, leader=True) + units_to_remove = [] + for unit in ops_test.model.applications[app_name].units: + if not remove_leader and unit.name == leader_unit.name: + continue + if len(units_to_remove) < abs(count): + units_to_remove.append(unit.name) + + logger.info(f"Units to remove {units_to_remove}") + await ops_test.model.applications[app_name].destroy_units(*units_to_remove) + logger.info("Waiting for idle") + await ops_test.model.wait_for_idle( + apps=[app_name], + status="active", + timeout=1000, + ) + logger.info("Validating replica set has primary") + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] + primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) - Returns: - The (str) ip of the unit - """ - return instance_ip(ops_test.model.info.name, await unit_hostname(ops_test, unit_name)) + assert primary is not None, "Replica set has no primary" + + +async def verify_writes(ops_test: OpsTest, app_name=None): + # verify that no writes to the db were missed + app_name = app_name or await get_app_name(ops_test) + total_expected_writes = await stop_continous_writes(ops_test) + actual_writes = await count_writes(ops_test, app_name) + assert total_expected_writes["number"] == actual_writes, "writes to the db were missed." diff --git a/tests/integration/ha_tests/test_ha.py b/tests/integration/ha_tests/test_ha.py index 36c2d937b..83bdcc690 100644 --- a/tests/integration/ha_tests/test_ha.py +++ b/tests/integration/ha_tests/test_ha.py @@ -3,6 +3,7 @@ # See LICENSE file for licensing details. import asyncio import logging +import os import time import pytest @@ -10,7 +11,50 @@ from pytest_operator.plugin import OpsTest from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed -from . import helpers +from ..helpers import ( + check_or_scale_app, + get_app_name, + get_unit_ip, + instance_ip, + unit_hostname, + unit_uri, +) +from .helpers import ( + MONGODB_LOG_PATH, + add_unit_with_storage, + all_db_processes_down, + clear_db_writes, + count_primaries, + count_writes, + cut_network_from_unit, + db_step_down, + fetch_replica_set_members, + find_unit, + get_controller_machine, + get_password, + insert_focal_to_cluster, + is_machine_reachable_from, + kill_unit_process, + mongod_ready, + replica_set_client, + replica_set_primary, + restore_network_for_unit, + retrieve_entries, + reused_storage, + scale_and_verify, + secondary_up_to_date, + start_continous_writes, + start_mongod, + stop_continous_writes, + stop_mongod, + storage_id, + storage_type, + update_restart_delay, + update_service_logging, + verify_replica_set_configuration, + verify_writes, + wait_network_restore, +) ANOTHER_DATABASE_APP_NAME = "another-database-a" MEDIAN_REELECTION_TIME = 12 @@ -19,70 +63,23 @@ logger = logging.getLogger(__name__) -@pytest.fixture() -async def continuous_writes(ops_test: OpsTest): - """Starts continuous write operations to MongoDB for test and clears writes at end of test.""" - await helpers.start_continous_writes(ops_test, 1) - yield - await helpers.clear_db_writes(ops_test) - - -@pytest.fixture() -async def reset_restart_delay(ops_test: OpsTest): - """Resets service file delay on all units.""" - yield - app = await helpers.app_name(ops_test) - for unit in ops_test.model.applications[app].units: - await helpers.update_restart_delay(ops_test, unit, ORIGINAL_RESTART_DELAY) - - -@pytest.fixture() -async def change_logging(ops_test: OpsTest): - """Enables appending logging for a test and resets the logging at the end of the test.""" - app = await helpers.app_name(ops_test) - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] - primary = await helpers.replica_set_primary(ip_addresses, ops_test) - - for unit in ops_test.model.applications[app].units: - # tests which use this fixture restart the primary. Therefore the primary should not be - # restarted as to leave the restart testing to the test itself. - if unit.name == primary.name: - continue - - # must restart unit to ensure that changes to logging are made - await helpers.stop_mongod(ops_test, unit) - await helpers.update_service_logging(ops_test, unit, logging=True) - await helpers.start_mongod(ops_test, unit) - - # sleep long enough for the mongod to start up correctly - time.sleep(15) - yield - - app = await helpers.app_name(ops_test) - for unit in ops_test.model.applications[app].units: - # must restart unit to ensure that changes to logging are made - await helpers.stop_mongod(ops_test, unit) - await helpers.update_service_logging(ops_test, unit, logging=False) - await helpers.start_mongod(ops_test, unit) - - # sleep long enough for the mongod to start up correctly - time.sleep(15) - - # remove the log file as to not clog up space on the replicas. - rm_cmd = f"exec --unit {unit.name} rm {helpers.MONGODB_LOG_PATH}" - await ops_test.juju(*rm_cmd.split()) - - +@pytest.mark.skipif( + os.environ.get("PYTEST_SKIP_DEPLOY", False), + reason="skipping deploy, model expected to be provided.", +) @pytest.mark.abort_on_fail async def test_build_and_deploy(ops_test: OpsTest) -> None: """Build and deploy one unit of MongoDB.""" # it is possible for users to provide their own cluster for HA testing. Hence check if there # is a pre-existing cluster. - if await helpers.app_name(ops_test): + required_units = 3 + user_app_name = await get_app_name(ops_test) + if user_app_name: + await check_or_scale_app(ops_test, user_app_name, required_units) return my_charm = await ops_test.build_charm(".") - await ops_test.model.deploy(my_charm, num_units=3) + await ops_test.model.deploy(my_charm, num_units=required_units) await ops_test.model.wait_for_idle() @@ -93,35 +90,35 @@ async def test_storage_re_use(ops_test, continuous_writes): properly uses the storage that was provided. (ie. doesn't just re-sync everything from primary, but instead computes a diff between current storage and primary storage.) """ - app = await helpers.app_name(ops_test) - if helpers.storage_type(ops_test, app) == "rootfs": + app_name = await get_app_name(ops_test) + if storage_type(ops_test, app_name) == "rootfs": pytest.skip( "reuse of storage can only be used on deployments with persistent storage not on rootfs deployments" ) # removing the only replica can be disastrous - if len(ops_test.model.applications[app].units) < 2: - await ops_test.model.applications[app].add_unit(count=1) - await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1000) + if len(ops_test.model.applications[app_name].units) < 2: + await ops_test.model.applications[app_name].add_unit(count=1) + await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) # remove a unit and attach it's storage to a new unit - unit = ops_test.model.applications[app].units[0] - unit_storage_id = helpers.storage_id(ops_test, unit.name) - expected_units = len(ops_test.model.applications[app].units) - 1 + unit = ops_test.model.applications[app_name].units[0] + unit_storage_id = storage_id(ops_test, unit.name) + expected_units = len(ops_test.model.applications[app_name].units) - 1 removal_time = time.time() await ops_test.model.destroy_unit(unit.name) await ops_test.model.wait_for_idle( - apps=[app], status="active", timeout=1000, wait_for_exact_units=expected_units + apps=[app_name], status="active", timeout=1000, wait_for_exact_units=expected_units ) - new_unit = await helpers.add_unit_with_storage(ops_test, app, unit_storage_id) + new_unit = await add_unit_with_storage(ops_test, app_name, unit_storage_id) - assert await helpers.reused_storage( + assert await reused_storage( ops_test, new_unit.public_address, removal_time ), "attached storage not properly reused by MongoDB." # verify that the no writes were skipped - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes @@ -133,28 +130,29 @@ async def test_add_units(ops_test: OpsTest, continuous_writes) -> None: MongoDB replica set configuration. """ # add units and wait for idle - app = await helpers.app_name(ops_test) - expected_units = len(ops_test.model.applications[app].units) + 2 - await ops_test.model.applications[app].add_unit(count=2) + app_name = await get_app_name(ops_test) + expected_units = len(ops_test.model.applications[app_name].units) + 2 + await ops_test.model.applications[app_name].add_unit(count=2) await ops_test.model.wait_for_idle( - apps=[app], status="active", timeout=1000, wait_for_exact_units=expected_units + apps=[app_name], status="active", timeout=1000, wait_for_exact_units=expected_units ) # grab unit ips - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] # connect to replica set uri and get replica set members - member_ips = await helpers.fetch_replica_set_members(ip_addresses, ops_test) + member_ips = await fetch_replica_set_members(ip_addresses, ops_test, app_name=app_name) # verify that the replica set members have the correct units assert set(member_ips) == set(ip_addresses) # verify that the no writes were skipped - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes +@pytest.mark.abort_on_fail @pytest.mark.abort_on_fail async def test_scale_down_capablities(ops_test: OpsTest, continuous_writes) -> None: """Tests clusters behavior when scaling down a minority and removing a primary replica. @@ -171,12 +169,12 @@ async def test_scale_down_capablities(ops_test: OpsTest, continuous_writes) -> N 5. deleting a non-leader unit is properly handled. """ deleted_unit_ips = [] - app = await helpers.app_name(ops_test) + app_name = await get_app_name(ops_test) units_to_remove = [] - minority_count = int(len(ops_test.model.applications[app].units) / 2) + minority_count = int(len(ops_test.model.applications[app_name].units) / 2) # find leader unit - leader_unit = await helpers.find_unit(ops_test, leader=True) + leader_unit = await find_unit(ops_test, leader=True, app_name=app_name) minority_count -= 1 # verify that we have a leader @@ -186,7 +184,7 @@ async def test_scale_down_capablities(ops_test: OpsTest, continuous_writes) -> N # find non-leader units to remove such that the largest minority possible is removed. avail_units = [] - for unit in ops_test.model.applications[app].units: + for unit in ops_test.model.applications[app_name].units: if not unit.name == leader_unit.name: avail_units.append(unit) @@ -196,20 +194,20 @@ async def test_scale_down_capablities(ops_test: OpsTest, continuous_writes) -> N units_to_remove.append(unit_to_remove.name) # destroy units simultaneously - expected_units = len(ops_test.model.applications[app].units) - len(units_to_remove) + expected_units = len(ops_test.model.applications[app_name].units) - len(units_to_remove) await ops_test.model.destroy_units(*units_to_remove) # wait for app to be active after removal of units await ops_test.model.wait_for_idle( - apps=[app], status="active", timeout=1000, wait_for_exact_units=expected_units + apps=[app_name], status="active", timeout=1000, wait_for_exact_units=expected_units ) # grab unit ips - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] # check that the replica set with the remaining units has a primary try: - primary = await helpers.replica_set_primary(ip_addresses, ops_test) + primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) except RetryError: primary = None @@ -222,28 +220,28 @@ async def test_scale_down_capablities(ops_test: OpsTest, continuous_writes) -> N ), "replica set primary is not one of the available units" # verify that the configuration of mongodb no longer has the deleted ip - member_ips = await helpers.fetch_replica_set_members(ip_addresses, ops_test) + member_ips = await fetch_replica_set_members(ip_addresses, ops_test, app_name=app_name) assert set(member_ips) == set(ip_addresses), "mongod config contains deleted units" # verify that the no writes were skipped - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes async def test_replication_across_members(ops_test: OpsTest, continuous_writes) -> None: """Check consistency, ie write to primary, read data from secondaries.""" # first find primary, write to primary, then read from each unit - await helpers.insert_focal_to_cluster(ops_test) - app = await helpers.app_name(ops_test) - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] - primary = await helpers.replica_set_primary(ip_addresses, ops_test) - password = await helpers.get_password(ops_test, app) + await insert_focal_to_cluster(ops_test) + app_name = await get_app_name(ops_test) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] + primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) + password = await get_password(ops_test, app_name) secondaries = set(ip_addresses) - set([primary.public_address]) for secondary in secondaries: - client = MongoClient(helpers.unit_uri(secondary, password, app), directConnection=True) + client = MongoClient(unit_uri(secondary, password, app_name), directConnection=True) db = client["new-db"] test_collection = db["test_ubuntu_collection"] @@ -253,15 +251,16 @@ async def test_replication_across_members(ops_test: OpsTest, continuous_writes) client.close() # verify that the no writes were skipped - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes async def test_unique_cluster_dbs(ops_test: OpsTest, continuous_writes) -> None: """Verify unique clusters do not share DBs.""" # first find primary, write to primary, - await helpers.insert_focal_to_cluster(ops_test) + app_name = await get_app_name(ops_test) + await insert_focal_to_cluster(ops_test, app_name=app_name) # deploy new cluster my_charm = await ops_test.build_charm(".") @@ -273,24 +272,24 @@ async def test_unique_cluster_dbs(ops_test: OpsTest, continuous_writes) -> None: unit.public_address for unit in ops_test.model.applications[ANOTHER_DATABASE_APP_NAME].units ] - password = await helpers.get_password(ops_test, app=ANOTHER_DATABASE_APP_NAME) - client = helpers.replica_set_client(ip_addresses, password, app=ANOTHER_DATABASE_APP_NAME) + password = await get_password(ops_test, app_name=ANOTHER_DATABASE_APP_NAME) + client = replica_set_client(ip_addresses, password, app_name=ANOTHER_DATABASE_APP_NAME) db = client["new-db"] test_collection = db["test_ubuntu_collection"] test_collection.insert_one({"release_name": "Jammy Jelly", "version": 22.04, "LTS": False}) client.close() - cluster_1_entries = await helpers.retrieve_entries( + cluster_1_entries = await retrieve_entries( ops_test, - app=ANOTHER_DATABASE_APP_NAME, + app_name=ANOTHER_DATABASE_APP_NAME, db_name="new-db", collection_name="test_ubuntu_collection", query_field="release_name", ) - cluster_2_entries = await helpers.retrieve_entries( + cluster_2_entries = await retrieve_entries( ops_test, - app=helpers.APP_NAME, + app_name=app_name, db_name="new-db", collection_name="test_ubuntu_collection", query_field="release_name", @@ -300,8 +299,8 @@ async def test_unique_cluster_dbs(ops_test: OpsTest, continuous_writes) -> None: assert len(common_entries) == 0, "Writes from one cluster are replicated to another cluster." # verify that the no writes were skipped - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes @@ -310,23 +309,25 @@ async def test_replication_member_scaling(ops_test: OpsTest, continuous_writes) Verify newly members have replicated data and newly removed members are gone without data. """ - # first find primary, write to primary, - await helpers.insert_focal_to_cluster(ops_test) + app_name = await get_app_name(ops_test) - app = await helpers.app_name(ops_test) + # first find primary, write to primary, + await insert_focal_to_cluster(ops_test, app_name=app_name) original_ip_addresses = [ - unit.public_address for unit in ops_test.model.applications[app].units + unit.public_address for unit in ops_test.model.applications[app_name].units ] - expected_units = len(ops_test.model.applications[app].units) + 1 - await ops_test.model.applications[app].add_unit(count=1) + expected_units = len(ops_test.model.applications[app_name].units) + 1 + await ops_test.model.applications[app_name].add_unit(count=1) await ops_test.model.wait_for_idle( - apps=[app], status="active", timeout=1000, wait_for_exact_units=expected_units + apps=[app_name], status="active", timeout=1000, wait_for_exact_units=expected_units ) - new_ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] + new_ip_addresses = [ + unit.public_address for unit in ops_test.model.applications[app_name].units + ] new_member_ip = list(set(new_ip_addresses) - set(original_ip_addresses))[0] - password = await helpers.get_password(ops_test, app) - client = MongoClient(helpers.unit_uri(new_member_ip, password, app), directConnection=True) + password = await get_password(ops_test, app_name) + client = MongoClient(unit_uri(new_member_ip, password, app_name), directConnection=True) # check for replicated data while retrying to give time for replica to copy over data. try: @@ -343,123 +344,122 @@ async def test_replication_member_scaling(ops_test: OpsTest, continuous_writes) client.close() # verify that the no writes were skipped - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes async def test_kill_db_process(ops_test, continuous_writes): # locate primary unit - app = await helpers.app_name(ops_test) - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] - primary = await helpers.replica_set_primary(ip_addresses, ops_test) + app_name = await get_app_name(ops_test) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] + primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) - await helpers.kill_unit_process(ops_test, primary.name, kill_code="SIGKILL") + await kill_unit_process(ops_test, primary.name, kill_code="SIGKILL", app_name=app_name) # verify new writes are continuing by counting the number of writes before and after a 5 second # wait - writes = await helpers.count_writes(ops_test) + writes = await count_writes(ops_test, app_name=app_name) time.sleep(5) - more_writes = await helpers.count_writes(ops_test) + more_writes = await count_writes(ops_test, app_name=app_name) assert more_writes > writes, "writes not continuing to DB" # sleep for twice the median election time time.sleep(MEDIAN_REELECTION_TIME * 2) # verify that db service got restarted and is ready - assert await helpers.mongod_ready(ops_test, primary.public_address) + assert await mongod_ready(ops_test, primary.public_address, app_name=app_name) # verify that a new primary gets elected (ie old primary is secondary) - new_primary = await helpers.replica_set_primary(ip_addresses, ops_test) + new_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) assert new_primary.name != primary.name # verify that no writes to the db were missed - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes, "writes to the db were missed." # verify that old primary is up to date. - assert await helpers.secondary_up_to_date( - ops_test, primary.public_address, total_expected_writes["number"] + assert await secondary_up_to_date( + ops_test, primary.public_address, total_expected_writes["number"], app_name=app_name ), "secondary not up to date with the cluster after restarting." async def test_freeze_db_process(ops_test, continuous_writes): # locate primary unit - app = await helpers.app_name(ops_test) - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] - primary = await helpers.replica_set_primary(ip_addresses, ops_test) - await helpers.kill_unit_process(ops_test, primary.name, kill_code="SIGSTOP") + app_name = await get_app_name(ops_test) + password = await get_password(ops_test, app_name) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] + primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) + await kill_unit_process(ops_test, primary.name, kill_code="SIGSTOP", app_name=app_name) # sleep for twice the median election time time.sleep(MEDIAN_REELECTION_TIME * 2) # verify that a new primary gets elected - new_primary = await helpers.replica_set_primary(ip_addresses, ops_test) + new_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) assert new_primary.name != primary.name # verify new writes are continuing by counting the number of writes before and after a 5 second # wait - writes = await helpers.count_writes(ops_test) + writes = await count_writes(ops_test, app_name=app_name) time.sleep(5) - more_writes = await helpers.count_writes(ops_test) - + more_writes = await count_writes(ops_test, app_name=app_name) # un-freeze the old primary - await helpers.kill_unit_process(ops_test, primary.name, kill_code="SIGCONT") + await kill_unit_process(ops_test, primary.name, kill_code="SIGCONT", app_name=app_name) # check this after un-freezing the old primary so that if this check fails we still "turned # back on" the mongod process assert more_writes > writes, "writes not continuing to DB" # verify that db service got restarted and is ready - assert await helpers.mongod_ready(ops_test, primary.public_address) + assert await mongod_ready(ops_test, primary.public_address, app_name=app_name) # verify all units are running under the same replset - member_ips = await helpers.fetch_replica_set_members(ip_addresses, ops_test) + member_ips = await fetch_replica_set_members(ip_addresses, ops_test, app_name=app_name) assert set(member_ips) == set(ip_addresses), "all members not running under the same replset" # verify there is only one primary after un-freezing old primary assert ( - await helpers.count_primaries(ops_test) == 1 + await count_primaries(ops_test, password=password, app_name=app_name) == 1 ), "there are more than one primary in the replica set." # verify that the old primary does not "reclaim" primary status after un-freezing old primary - new_primary = await helpers.replica_set_primary(ip_addresses, ops_test) + new_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) assert new_primary.name != primary.name, "un-frozen primary should be secondary." # verify that no writes were missed. - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert actual_writes == total_expected_writes["number"], "db writes missing." - # verify that old primary is up to date. - assert await helpers.secondary_up_to_date( - ops_test, primary.public_address, actual_writes + assert await secondary_up_to_date( + ops_test, primary.public_address, actual_writes, app_name=app_name ), "secondary not up to date with the cluster after restarting." async def test_restart_db_process(ops_test, continuous_writes, change_logging): # locate primary unit - app = await helpers.app_name(ops_test) - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] - old_primary = await helpers.replica_set_primary(ip_addresses, ops_test) + app_name = await get_app_name(ops_test) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] + old_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) # send SIGTERM, we expect `systemd` to restart the process sig_term_time = time.time() - await helpers.kill_unit_process(ops_test, old_primary.name, kill_code="SIGTERM") + await kill_unit_process(ops_test, old_primary.name, kill_code="SIGTERM", app_name=app_name) # verify new writes are continuing by counting the number of writes before and after a 5 second # wait - writes = await helpers.count_writes(ops_test) + writes = await count_writes(ops_test, app_name=app_name) time.sleep(5) - more_writes = await helpers.count_writes(ops_test) + more_writes = await count_writes(ops_test, app_name=app_name) assert more_writes > writes, "writes not continuing to DB" # verify that db service got restarted and is ready - assert await helpers.mongod_ready(ops_test, old_primary.public_address) + assert await mongod_ready(ops_test, old_primary.public_address, app_name=app_name) # verify that a new primary gets elected (ie old primary is secondary) - new_primary = await helpers.replica_set_primary(ip_addresses, ops_test) + new_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) assert new_primary.name != old_primary.name # verify that a stepdown was performed on restart. SIGTERM should send a graceful restart and @@ -467,146 +467,149 @@ async def test_restart_db_process(ops_test, continuous_writes, change_logging): try: for attempt in Retrying(stop=stop_after_delay(30), wait=wait_fixed(3)): with attempt: - assert await helpers.db_step_down( - ops_test, old_primary.name, sig_term_time + assert await db_step_down( + ops_test, old_primary.name, sig_term_time, app_name=app_name ), "old primary departed without stepping down." except RetryError: False, "old primary departed without stepping down." # verify that no writes were missed - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes # verify that old primary is up to date. - assert await helpers.secondary_up_to_date( - ops_test, old_primary.public_address, total_expected_writes["number"] + assert await secondary_up_to_date( + ops_test, old_primary.public_address, total_expected_writes["number"], app_name=app_name ), "secondary not up to date with the cluster after restarting." async def test_full_cluster_crash(ops_test: OpsTest, continuous_writes, reset_restart_delay): - app = await helpers.app_name(ops_test) + app_name = await get_app_name(ops_test) # update all units to have a new RESTART_DELAY, Modifying the Restart delay to 3 minutes # should ensure enough time for all replicas to be down at the same time. - for unit in ops_test.model.applications[app].units: - await helpers.update_restart_delay(ops_test, unit, RESTART_DELAY) + for unit in ops_test.model.applications[app_name].units: + await update_restart_delay(ops_test, unit, RESTART_DELAY) # kill all units "simultaneously" await asyncio.gather( *[ - helpers.kill_unit_process(ops_test, unit.name, kill_code="SIGKILL") - for unit in ops_test.model.applications[app].units + kill_unit_process(ops_test, unit.name, kill_code="SIGKILL", app_name=app_name) + for unit in ops_test.model.applications[app_name].units ] ) # This test serves to verify behavior when all replicas are down at the same time that when # they come back online they operate as expected. This check verifies that we meet the criteria # of all replicas being down at the same time. - assert await helpers.all_db_processes_down(ops_test), "Not all units down at the same time." + assert await all_db_processes_down( + ops_test, app_name=app_name + ), "Not all units down at the same time." # sleep for twice the median election time and the restart delay time.sleep(MEDIAN_REELECTION_TIME * 2 + RESTART_DELAY) # verify all units are up and running - for unit in ops_test.model.applications[app].units: - assert await helpers.mongod_ready( - ops_test, unit.public_address + for unit in ops_test.model.applications[app_name].units: + assert await mongod_ready( + ops_test, unit.public_address, app_name=app_name ), f"unit {unit.name} not restarted after cluster crash." # verify new writes are continuing by counting the number of writes before and after a 5 second # wait - writes = await helpers.count_writes(ops_test) + writes = await count_writes(ops_test, app_name=app_name) time.sleep(5) - more_writes = await helpers.count_writes(ops_test) + more_writes = await count_writes(ops_test, app_name=app_name) assert more_writes > writes, "writes not continuing to DB" # verify presence of primary, replica set member configuration, and number of primaries - await helpers.verify_replica_set_configuration(ops_test) + await verify_replica_set_configuration(ops_test, app_name=app_name) # verify that no writes to the db were missed - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) # verify that no writes were missed. assert actual_writes == total_expected_writes["number"], "db writes missing." async def test_full_cluster_restart(ops_test: OpsTest, continuous_writes, reset_restart_delay): - app = await helpers.app_name(ops_test) + app_name = await get_app_name(ops_test) # update all units to have a new RESTART_DELAY, Modifying the Restart delay to 3 minutes # should ensure enough time for all replicas to be down at the same time. - for unit in ops_test.model.applications[app].units: - await helpers.update_restart_delay(ops_test, unit, RESTART_DELAY) + for unit in ops_test.model.applications[app_name].units: + await update_restart_delay(ops_test, unit, RESTART_DELAY) # kill all units "simultaneously" await asyncio.gather( *[ - helpers.kill_unit_process(ops_test, unit.name, kill_code="SIGTERM") - for unit in ops_test.model.applications[app].units + kill_unit_process(ops_test, unit.name, kill_code="SIGTERM", app_name=app_name) + for unit in ops_test.model.applications[app_name].units ] ) # This test serves to verify behavior when all replicas are down at the same time that when # they come back online they operate as expected. This check verifies that we meet the criteria # of all replicas being down at the same time. - assert await helpers.all_db_processes_down(ops_test), "Not all units down at the same time." + assert await all_db_processes_down( + ops_test, app_name=app_name + ), "Not all units down at the same time." # sleep for twice the median election time and the restart delay time.sleep(MEDIAN_REELECTION_TIME * 2 + RESTART_DELAY) # verify all units are up and running - for unit in ops_test.model.applications[app].units: - assert await helpers.mongod_ready( - ops_test, unit.public_address + for unit in ops_test.model.applications[app_name].units: + assert await mongod_ready( + ops_test, unit.public_address, app_name=app_name ), f"unit {unit.name} not restarted after cluster crash." # verify new writes are continuing by counting the number of writes before and after a 5 second # wait - writes = await helpers.count_writes(ops_test) + writes = await count_writes(ops_test, app_name=app_name) time.sleep(5) - more_writes = await helpers.count_writes(ops_test) + more_writes = await count_writes(ops_test, app_name=app_name) assert more_writes > writes, "writes not continuing to DB" # verify presence of primary, replica set member configuration, and number of primaries - await helpers.verify_replica_set_configuration(ops_test) + await verify_replica_set_configuration(ops_test, app_name=app_name) # verify that no writes to the db were missed - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes, "writes to the db were missed." async def test_network_cut(ops_test, continuous_writes): # locate primary unit - app = await helpers.app_name(ops_test) - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] - primary = await helpers.replica_set_primary(ip_addresses, ops_test) - all_units = ops_test.model.applications[app].units + app_name = await get_app_name(ops_test) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] + primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) + all_units = ops_test.model.applications[app_name].units model_name = ops_test.model.info.name - primary_hostname = await helpers.unit_hostname(ops_test, primary.name) - primary_unit_ip = await helpers.get_unit_ip(ops_test, primary.name) + primary_hostname = await unit_hostname(ops_test, primary.name) + primary_unit_ip = await get_unit_ip(ops_test, primary.name) # before cutting network verify that connection is possible - assert await helpers.mongod_ready( - ops_test, - primary.public_address, + assert await mongod_ready( + ops_test, primary.public_address, app_name=app_name ), f"Connection to host {primary.public_address} is not possible" - helpers.cut_network_from_unit(primary_hostname) + cut_network_from_unit(primary_hostname) # verify machine is not reachable from peer units for unit in set(all_units) - {primary}: - hostname = await helpers.unit_hostname(ops_test, unit.name) - assert not helpers.is_machine_reachable_from( + hostname = await unit_hostname(ops_test, unit.name) + assert not is_machine_reachable_from( hostname, primary_hostname ), "unit is reachable from peer" # verify machine is not reachable from controller - controller = await helpers.get_controller_machine(ops_test) - assert not helpers.is_machine_reachable_from( + controller = await get_controller_machine(ops_test) + assert not is_machine_reachable_from( controller, primary_hostname ), "unit is reachable from controller" @@ -615,43 +618,46 @@ async def test_network_cut(ops_test, continuous_writes): # verify new writes are continuing by counting the number of writes before and after a 5 second # wait - writes = await helpers.count_writes(ops_test, down_unit=primary.name) + writes = await count_writes(ops_test, down_unit=primary.name, app_name=app_name) time.sleep(5) - more_writes = await helpers.count_writes(ops_test, down_unit=primary.name) + more_writes = await count_writes(ops_test, down_unit=primary.name, app_name=app_name) assert more_writes > writes, "writes not continuing to DB" # verify that a new primary got elected - new_primary = await helpers.replica_set_primary(ip_addresses, ops_test, down_unit=primary.name) + new_primary = await replica_set_primary( + ip_addresses, ops_test, down_unit=primary.name, app_name=app_name + ) assert new_primary.name != primary.name # verify that no writes to the db were missed - total_expected_writes = await helpers.stop_continous_writes(ops_test, down_unit=primary.name) - actual_writes = await helpers.count_writes(ops_test, down_unit=primary.name) + total_expected_writes = await stop_continous_writes( + ops_test, down_unit=primary.name, app_name=app_name + ) + actual_writes = await count_writes(ops_test, down_unit=primary.name, app_name=app_name) assert total_expected_writes["number"] == actual_writes, "writes to the db were missed." # restore network connectivity to old primary - helpers.restore_network_for_unit(primary_hostname) + restore_network_for_unit(primary_hostname) # wait until network is reestablished for the unit - helpers.wait_network_restore(model_name, primary_hostname, primary_unit_ip) + wait_network_restore(model_name, primary_hostname, primary_unit_ip) # self healing is performed with update status hook async with ops_test.fast_forward(): - await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1000) + await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) # verify we have connection to the old primary - new_ip = helpers.instance_ip(model_name, primary_hostname) - assert await helpers.mongod_ready( - ops_test, - new_ip, + new_ip = instance_ip(model_name, primary_hostname) + assert await mongod_ready( + ops_test, new_ip, app_name=app_name ), f"Connection to host {new_ip} is not possible" # verify presence of primary, replica set member configuration, and number of primaries - await helpers.verify_replica_set_configuration(ops_test) + await verify_replica_set_configuration(ops_test, app_name=app_name) # verify that old primary is up to date. - assert await helpers.secondary_up_to_date( - ops_test, new_ip, total_expected_writes["number"] + assert await secondary_up_to_date( + ops_test, new_ip, total_expected_writes["number"], app_name=app_name ), "secondary not up to date with the cluster after restarting." @@ -662,7 +668,7 @@ async def test_scale_up_down(ops_test: OpsTest, continuous_writes): scales = [3, -3, 4, -4, 5, -5, 6, -6, 7, -7] for count in scales: await scale_and_verify(ops_test, count=count) - await _verify_writes(ops_test) + await verify_writes(ops_test) @pytest.mark.abort_on_fail @@ -672,47 +678,65 @@ async def test_scale_up_down_removing_leader(ops_test: OpsTest, continuous_write scales = [3, -3, 4, -4, 5, -5, 6, -6, 7, -7] for count in scales: await scale_and_verify(ops_test, count=count, remove_leader=True) - await _verify_writes(ops_test) + await verify_writes(ops_test) -async def scale_and_verify(ops_test: OpsTest, count: int, remove_leader: bool = False): - if count == 0: - logger.warning("Skipping scale up/down by 0") - return +# TODO put this into a separate file +# Fixtures start - app = await helpers.app_name(ops_test) - - if count > 0: - logger.info(f"Scaling up by {count} units") - await ops_test.model.applications[app].add_units(count) - else: - logger.info(f"Scaling down by {abs(count)} units") - # find leader unit - leader_unit = await helpers.find_unit(ops_test, leader=True) - units_to_remove = [] - for unit in ops_test.model.applications[app].units: - if not remove_leader and unit.name == leader_unit.name: - continue - if len(units_to_remove) < abs(count): - units_to_remove.append(unit.name) - - logger.info(f"Units to remove {units_to_remove}") - await ops_test.model.applications[app].destroy_units(*units_to_remove) - logger.info("Waiting for idle") - await ops_test.model.wait_for_idle( - apps=[app], - status="active", - timeout=1000, - ) - logger.info("Validating replica set has primary") - ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] - primary = await helpers.replica_set_primary(ip_addresses, ops_test, app=app) - assert primary is not None, "Replica set has no primary" +@pytest.fixture() +async def continuous_writes(ops_test: OpsTest): + """Starts continuous write operations to MongoDB for test and clears writes at end of test.""" + await start_continous_writes(ops_test, 1) + yield + await clear_db_writes(ops_test) -async def _verify_writes(ops_test: OpsTest): - # verify that no writes to the db were missed - total_expected_writes = await helpers.stop_continous_writes(ops_test) - actual_writes = await helpers.count_writes(ops_test) - assert total_expected_writes["number"] == actual_writes, "writes to the db were missed." +@pytest.fixture() +async def reset_restart_delay(ops_test: OpsTest): + """Resets service file delay on all units.""" + yield + app_name = await get_app_name(ops_test) + for unit in ops_test.model.applications[app_name].units: + await update_restart_delay(ops_test, unit, ORIGINAL_RESTART_DELAY) + + +@pytest.fixture() +async def change_logging(ops_test: OpsTest): + """Enables appending logging for a test and resets the logging at the end of the test.""" + app_name = await get_app_name(ops_test) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] + primary = await replica_set_primary(ip_addresses, ops_test) + + for unit in ops_test.model.applications[app_name].units: + # tests which use this fixture restart the primary. Therefore the primary should not be + # restarted as to leave the restart testing to the test itself. + if unit.name == primary.name: + continue + + # must restart unit to ensure that changes to logging are made + await stop_mongod(ops_test, unit) + await update_service_logging(ops_test, unit, logging=True) + await start_mongod(ops_test, unit) + + # sleep long enough for the mongod to start up correctly + time.sleep(15) + yield + + app_name = await get_app_name(ops_test) + for unit in ops_test.model.applications[app_name].units: + # must restart unit to ensure that changes to logging are made + await stop_mongod(ops_test, unit) + await update_service_logging(ops_test, unit, logging=False) + await start_mongod(ops_test, unit) + + # sleep long enough for the mongod to start up correctly + time.sleep(15) + + # remove the log file as to not clog up space on the replicas. + rm_cmd = f"exec --unit {unit.name} rm {MONGODB_LOG_PATH}" + await ops_test.juju(*rm_cmd.split()) + + +# Fixtures end diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index efe1f38e5..ddedaf408 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -3,8 +3,9 @@ import json import logging +import subprocess from pathlib import Path -from typing import Dict, Optional +from typing import Dict, List, Optional import ops import yaml @@ -13,11 +14,12 @@ from tenacity import retry, retry_if_result, stop_after_attempt, wait_exponential METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) -PORT = 27017 APP_NAME = METADATA["name"] +PORT = 27017 UNIT_IDS = [0, 1, 2] SERIES = "jammy" +logger = logging.getLogger(__name__) logger = logging.getLogger(__name__) @@ -33,21 +35,28 @@ def unit_uri(ip_address: str, password, app=APP_NAME) -> str: return f"mongodb://operator:" f"{password}@" f"{ip_address}:{PORT}/admin?replicaSet={app}" -async def get_password(ops_test: OpsTest, app=APP_NAME, username="operator") -> str: +async def get_password(ops_test: OpsTest, username="operator", app_name=None) -> str: """Use the charm action to retrieve the password from provided unit. Returns: String with the password stored on the peer relation databag. """ + app_name = app_name or await get_app_name(ops_test) + # can retrieve from any unit running unit so we pick the first - unit_name = ops_test.model.applications[app].units[0].name + unit_name = ops_test.model.applications[app_name].units[0].name unit_id = unit_name.split("/")[1] - action = await ops_test.model.units.get(f"{app}/{unit_id}").run_action( + action = await ops_test.model.units.get(f"{app_name}/{unit_id}").run_action( "get-password", **{"username": username} ) action = await action.wait() - return action.results["password"] + try: + password = action.results["password"] + return password + except KeyError: + logger.error("Failed to get password. Action %s. Results %s", action, action.results) + return None @retry( @@ -55,18 +64,21 @@ async def get_password(ops_test: OpsTest, app=APP_NAME, username="operator") -> stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=30), ) -def count_primaries(ops_test: OpsTest, password: str) -> int: +async def count_primaries(ops_test: OpsTest, password: str, app_name=None) -> int: """Counts the number of primaries in a replica set. Will retry counting when the number of primaries is 0 at most 5 times. """ + app_name = app_name or await get_app_name(ops_test) number_of_primaries = 0 for unit_id in UNIT_IDS: # get unit - unit = ops_test.model.applications[APP_NAME].units[unit_id] + unit = ops_test.model.applications[app_name].units[unit_id] # connect to mongod - client = MongoClient(unit_uri(unit.public_address, password), directConnection=True) + client = MongoClient( + unit_uri(unit.public_address, password, app_name), directConnection=True + ) # check primary status if client.is_primary: @@ -75,10 +87,11 @@ def count_primaries(ops_test: OpsTest, password: str) -> int: return number_of_primaries -async def find_unit(ops_test: OpsTest, leader: bool, app=APP_NAME) -> ops.model.Unit: +async def find_unit(ops_test: OpsTest, leader: bool, app_name=None) -> ops.model.Unit: """Helper function identifies the a unit, based on need for leader or non-leader.""" + app_name = app_name or await get_app_name(ops_test) ret_unit = None - for unit in ops_test.model.applications[app].units: + for unit in ops_test.model.applications[app_name].units: if await unit.is_leader_from_status() == leader: ret_unit = unit @@ -87,7 +100,8 @@ async def find_unit(ops_test: OpsTest, leader: bool, app=APP_NAME) -> ops.model. async def get_leader_id(ops_test: OpsTest) -> int: """Returns the unit number of the juju leader unit.""" - for unit in ops_test.model.applications[APP_NAME].units: + app_name = await get_app_name(ops_test) + for unit in ops_test.model.applications[app_name].units: if await unit.is_leader_from_status(): return int(unit.name.split("/")[1]) return -1 @@ -101,7 +115,8 @@ async def set_password( Returns: String with the password stored on the peer relation databag. """ - action = await ops_test.model.units.get(f"{APP_NAME}/{unit_id}").run_action( + app_name = await get_app_name(ops_test) + action = await ops_test.model.units.get(f"{app_name}/{unit_id}").run_action( "set-password", **{"username": username, "password": password} ) action = await action.wait() @@ -143,7 +158,6 @@ async def get_application_relation_data( # Filter the data based on the relation name. relation_data = [v for v in data[unit_name]["relation-info"] if v["endpoint"] == relation_name] - if relation_id: # Filter the data based on the relation id. relation_data = [v for v in relation_data if v["relation-id"] == relation_id] @@ -195,6 +209,96 @@ async def get_secret_content(ops_test, secret_id) -> Dict[str, str]: return data[secret_id]["content"]["Data"] +async def check_or_scale_app(ops_test: OpsTest, user_app_name: str, required_units: int) -> None: + """A helper function that scales existing cluster if necessary.""" + # check if we need to scale + current_units = len(ops_test.model.applications[user_app_name].units) + + if current_units == required_units: + return + elif current_units > required_units: + for i in range(0, current_units): + unit_to_remove = [ops_test.model.applications[user_app_name].units[i].name] + await ops_test.model.destroy_units(*unit_to_remove) + await ops_test.model.wait_for_idle() + else: + units_to_add = required_units - current_units + await ops_test.model.applications[user_app_name].add_unit(count=units_to_add) + await ops_test.model.wait_for_idle() + + +async def get_app_name(ops_test: OpsTest, test_deployments: List[str] = []) -> str: + """Returns the name of the cluster running MongoDB. + + This is important since not all deployments of the MongoDB charm have the application name + "mongodb". + + Note: if multiple clusters are running MongoDB this will return the one first found. + """ + status = await ops_test.model.get_status() + for app in ops_test.model.applications: + # note that format of the charm field is not exactly "mongodb" but instead takes the form + # of `local:focal/mongodb-6` + if "mongodb" in status["applications"][app]["charm"]: + logger.debug("Found mongodb app named '%s'", app) + + if app in test_deployments: + logger.debug("mongodb app named '%s', was deployed by the test, not by user", app) + continue + + return app + + return None + + +async def unit_hostname(ops_test: OpsTest, unit_name: str) -> str: + """Get hostname for a unit. + + Args: + ops_test: The ops test object passed into every test case + unit_name: The name of the unit to be tested + + Returns: + The machine/container hostname + """ + _, raw_hostname, _ = await ops_test.juju("ssh", unit_name, "hostname") + return raw_hostname.strip() + + +def instance_ip(model: str, instance: str) -> str: + """Translate juju instance name to IP. + + Args: + model: The name of the model + instance: The name of the instance + + Returns: + The (str) IP address of the instance + """ + output = subprocess.check_output(f"juju machines --model {model}".split()) + + for line in output.decode("utf8").splitlines(): + if instance in line: + return line.split()[2] + + +async def get_unit_ip(ops_test: OpsTest, unit_name: str) -> str: + """Wrapper for getting unit ip. + + Juju incorrectly reports the IP addresses after the network is restored this is reported as a + bug here: https://github.com/juju/python-libjuju/issues/738 . Once this bug is resolved use of + `get_unit_ip` should be replaced with `.public_address` + + Args: + ops_test: The ops test object passed into every test case + unit_name: The name of the unit to be tested + + Returns: + The (str) ip of the unit + """ + return instance_ip(ops_test.model.info.name, await unit_hostname(ops_test, unit_name)) + + def audit_log_line_sanity_check(entry) -> bool: fields = ["atype", "ts", "local", "remote", "users", "roles", "param", "result"] for field in fields: diff --git a/tests/integration/metrics_tests/test_metrics.py b/tests/integration/metrics_tests/test_metrics.py index 090d8b570..d7a9eec5c 100644 --- a/tests/integration/metrics_tests/test_metrics.py +++ b/tests/integration/metrics_tests/test_metrics.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. +import os import time import ops @@ -9,31 +10,30 @@ from pytest_operator.plugin import OpsTest from ..ha_tests import helpers as ha_helpers -from ..helpers import find_unit +from ..helpers import ( + UNIT_IDS, + check_or_scale_app, + find_unit, + get_app_name, + get_unit_ip, + unit_hostname, +) MONGODB_EXPORTER_PORT = 9216 MEDIAN_REELECTION_TIME = 12 -async def verify_endpoints(ops_test: OpsTest, unit: ops.model.Unit) -> str: - """Verifies mongodb endpoint is functional on a given unit.""" - http = urllib3.PoolManager() - - unit_address = await ha_helpers.get_unit_ip(ops_test, unit.name) - mongodb_exporter_url = f"http://{unit_address}:{MONGODB_EXPORTER_PORT}/metrics" - mongo_resp = http.request("GET", mongodb_exporter_url) - - assert mongo_resp.status == 200 - - # if configured correctly there should be more than one mongodb metric present - mongodb_metrics = mongo_resp._body.decode("utf8") - assert mongodb_metrics.count("mongo") > 1 - - +@pytest.mark.skipif( + os.environ.get("PYTEST_SKIP_DEPLOY", False), + reason="skipping deploy, model expected to be provided.", +) @pytest.mark.abort_on_fail async def test_build_and_deploy(ops_test: OpsTest) -> None: """Build and deploy one unit of MongoDB.""" - if await ha_helpers.app_name(ops_test): + app_name = await get_app_name(ops_test) + if app_name: + return await check_or_scale_app(ops_test, app_name, len(UNIT_IDS)) + if await get_app_name(ops_test): return my_charm = await ops_test.build_charm(".") @@ -43,8 +43,8 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: async def test_endpoints(ops_test: OpsTest): """Sanity check that endpoints are running.""" - app = await ha_helpers.app_name(ops_test) - application = ops_test.model.applications[app] + app_name = await get_app_name(ops_test) + application = ops_test.model.applications[app_name] for unit in application.units: await verify_endpoints(ops_test, unit) @@ -52,24 +52,24 @@ async def test_endpoints(ops_test: OpsTest): async def test_endpoints_new_password(ops_test: OpsTest): """Verify that endpoints still function correctly after the monitor user password changes.""" - app = await ha_helpers.app_name(ops_test) - application = ops_test.model.applications[app] + app_name = await get_app_name(ops_test) + application = ops_test.model.applications[app_name] leader_unit = await find_unit(ops_test, leader=True) action = await leader_unit.run_action("set-password", **{"username": "monitor"}) action = await action.wait() # wait for non-leader units to receive relation changed event. time.sleep(3) - await ops_test.model.wait_for_idle(apps=[app], status="active", idle_period=15) + await ops_test.model.wait_for_idle(apps=[app_name], status="active", idle_period=15) for unit in application.units: await verify_endpoints(ops_test, unit) async def test_endpoints_network_cut(ops_test: OpsTest): """Verify that endpoint still function correctly after a network cut.""" - app = await ha_helpers.app_name(ops_test) - unit = ops_test.model.applications[app].units[0] - hostname = await ha_helpers.unit_hostname(ops_test, unit.name) - unit_ip = await ha_helpers.get_unit_ip(ops_test, unit.name) + app_name = await get_app_name(ops_test) + unit = ops_test.model.applications[app_name].units[0] + hostname = await unit_hostname(ops_test, unit.name) + unit_ip = await get_unit_ip(ops_test, unit.name) ha_helpers.cut_network_from_unit(hostname) # sleep for twice the median election time @@ -79,3 +79,21 @@ async def test_endpoints_network_cut(ops_test: OpsTest): ha_helpers.restore_network_for_unit(hostname) ha_helpers.wait_network_restore(ops_test.model.info.name, hostname, unit_ip) await verify_endpoints(ops_test, unit) + + +# helpers + + +async def verify_endpoints(ops_test: OpsTest, unit: ops.model.Unit) -> str: + """Verifies mongodb endpoint is functional on a given unit.""" + http = urllib3.PoolManager() + + unit_address = await get_unit_ip(ops_test, unit.name) + mongodb_exporter_url = f"http://{unit_address}:{MONGODB_EXPORTER_PORT}/metrics" + mongo_resp = http.request("GET", mongodb_exporter_url) + + assert mongo_resp.status == 200 + + # if configured correctly there should be more than one mongodb metric present + mongodb_metrics = mongo_resp._body.decode("utf8") + assert mongodb_metrics.count("mongo") > 1 diff --git a/tests/integration/relation_tests/new_relations/test_charm_relations.py b/tests/integration/relation_tests/new_relations/test_charm_relations.py index f354b9795..6be5f1aa9 100644 --- a/tests/integration/relation_tests/new_relations/test_charm_relations.py +++ b/tests/integration/relation_tests/new_relations/test_charm_relations.py @@ -13,6 +13,7 @@ from tenacity import RetryError from ...ha_tests.helpers import replica_set_primary +from ...helpers import check_or_scale_app, get_app_name from .helpers import ( get_application_relation_data, get_connection_string, @@ -29,7 +30,7 @@ MULTIPLE_DATABASE_CLUSTERS_RELATION_NAME = "multiple-database-clusters" ALIASED_MULTIPLE_DATABASE_CLUSTERS_RELATION_NAME = "aliased-multiple-database-clusters" ANOTHER_DATABASE_APP_NAME = "another-database" -APP_NAMES = [APPLICATION_APP_NAME, DATABASE_APP_NAME, ANOTHER_DATABASE_APP_NAME] +APP_NAMES = [APPLICATION_APP_NAME, ANOTHER_DATABASE_APP_NAME] @pytest.mark.abort_on_fail @@ -37,31 +38,62 @@ async def test_deploy_charms(ops_test: OpsTest, application_charm, database_char """Deploy both charms (application and database) to use in the tests.""" # Deploy both charms (2 units for each application to test that later they correctly # set data in the relation application databag using only the leader unit). - await asyncio.gather( - ops_test.model.deploy( - application_charm, - application_name=APPLICATION_APP_NAME, - num_units=2, - ), - ops_test.model.deploy( - database_charm, - application_name=DATABASE_APP_NAME, - num_units=2, - ), - ops_test.model.deploy( - database_charm, - application_name=ANOTHER_DATABASE_APP_NAME, - ), + required_units = 2 + app_name = await get_app_name(ops_test) + if app_name == ANOTHER_DATABASE_APP_NAME: + assert ( + False + ), f"provided MongoDB application, cannot be named {ANOTHER_DATABASE_APP_NAME}, this name is reserved for this test." + + if app_name: + await asyncio.gather( + ops_test.model.deploy( + application_charm, + application_name=APPLICATION_APP_NAME, + num_units=required_units, + ), + check_or_scale_app(ops_test, app_name, required_units), + ops_test.model.deploy( + database_charm, + application_name=ANOTHER_DATABASE_APP_NAME, + ), + ) + else: + await asyncio.gather( + ops_test.model.deploy( + application_charm, + application_name=APPLICATION_APP_NAME, + num_units=required_units, + ), + ops_test.model.deploy( + database_charm, + application_name=DATABASE_APP_NAME, + num_units=required_units, + ), + ops_test.model.deploy( + database_charm, + application_name=ANOTHER_DATABASE_APP_NAME, + ), + ) + if app_name: + APP_NAMES.append(app_name) + else: + APP_NAMES.append(DATABASE_APP_NAME) + await ops_test.model.wait_for_idle( + apps=APP_NAMES, status="active", wait_for_at_least_units=required_units ) - await ops_test.model.wait_for_idle(apps=APP_NAMES, status="active", wait_for_at_least_units=1) @pytest.mark.abort_on_fail async def test_database_relation_with_charm_libraries(ops_test: OpsTest): """Test basic functionality of database relation interface.""" # Relate the charms and wait for them exchanging some connection data. - await ops_test.model.add_relation( - f"{APPLICATION_APP_NAME}:{FIRST_DATABASE_RELATION_NAME}", DATABASE_APP_NAME + db_app_name = ( + await get_app_name(ops_test, test_deployments=[ANOTHER_DATABASE_APP_NAME]) + or DATABASE_APP_NAME + ) + await ops_test.model.integrate( + f"{APPLICATION_APP_NAME}:{FIRST_DATABASE_RELATION_NAME}", db_app_name ) await ops_test.model.wait_for_idle(apps=APP_NAMES, status="active") @@ -106,15 +138,19 @@ async def test_database_relation_with_charm_libraries(ops_test: OpsTest): async def test_app_relation_metadata_change(ops_test: OpsTest) -> None: """Verifies that the app metadata changes with db relation joined and departed events.""" # verify application metadata is correct before adding/removing units. + db_app_name = ( + await get_app_name(ops_test, test_deployments=[ANOTHER_DATABASE_APP_NAME]) + or DATABASE_APP_NAME + ) try: await verify_application_data( - ops_test, APPLICATION_APP_NAME, DATABASE_APP_NAME, FIRST_DATABASE_RELATION_NAME + ops_test, APPLICATION_APP_NAME, db_app_name, FIRST_DATABASE_RELATION_NAME ) except RetryError: assert False, "Hosts are not correct in application data." # verify application metadata is correct after adding units. - await ops_test.model.applications[DATABASE_APP_NAME].add_units(count=2) + await ops_test.model.applications[db_app_name].add_units(count=2) await ops_test.model.wait_for_idle( apps=APP_NAMES, status="active", @@ -123,7 +159,7 @@ async def test_app_relation_metadata_change(ops_test: OpsTest) -> None: try: await verify_application_data( - ops_test, APPLICATION_APP_NAME, DATABASE_APP_NAME, FIRST_DATABASE_RELATION_NAME + ops_test, APPLICATION_APP_NAME, db_app_name, FIRST_DATABASE_RELATION_NAME ) except RetryError: assert False, "Hosts not updated in application data after adding units." @@ -131,14 +167,14 @@ async def test_app_relation_metadata_change(ops_test: OpsTest) -> None: # verify application metadata is correct after removing the pre-existing units. This is # this is important since we want to test that the application related will work with # only the newly added units from above. - await ops_test.model.applications[DATABASE_APP_NAME].destroy_units(f"{DATABASE_APP_NAME}/0") + await ops_test.model.applications[db_app_name].destroy_units(f"{db_app_name}/0") await ops_test.model.wait_for_idle( apps=APP_NAMES, status="active", timeout=1000, ) - await ops_test.model.applications[DATABASE_APP_NAME].destroy_units(f"{DATABASE_APP_NAME}/1") + await ops_test.model.applications[db_app_name].destroy_units(f"{db_app_name}/1") await ops_test.model.wait_for_idle( apps=APP_NAMES, status="active", @@ -147,7 +183,7 @@ async def test_app_relation_metadata_change(ops_test: OpsTest) -> None: try: await verify_application_data( - ops_test, APPLICATION_APP_NAME, DATABASE_APP_NAME, FIRST_DATABASE_RELATION_NAME + ops_test, APPLICATION_APP_NAME, db_app_name, FIRST_DATABASE_RELATION_NAME ) except RetryError: assert False, "Hosts not updated in application data after removing units." @@ -160,7 +196,7 @@ async def test_app_relation_metadata_change(ops_test: OpsTest) -> None: ) ip_addresses = endpoints_str.split(",") try: - primary = await replica_set_primary(ip_addresses, ops_test, app=DATABASE_APP_NAME) + primary = await replica_set_primary(ip_addresses, ops_test, app_name=db_app_name) except RetryError: assert False, "replica set has no primary" @@ -245,10 +281,14 @@ async def test_two_applications_doesnt_share_the_same_relation_data( ) await ops_test.model.wait_for_idle(apps=all_app_names, status="active") + db_app_name = ( + await get_app_name(ops_test, test_deployments=[ANOTHER_DATABASE_APP_NAME]) + or DATABASE_APP_NAME + ) # Relate the new application with the database # and wait for them exchanging some connection data. - await ops_test.model.add_relation( - f"{another_application_app_name}:{FIRST_DATABASE_RELATION_NAME}", DATABASE_APP_NAME + await ops_test.model.integrate( + f"{another_application_app_name}:{FIRST_DATABASE_RELATION_NAME}", db_app_name ) await ops_test.model.wait_for_idle(apps=all_app_names, status="active") @@ -267,10 +307,14 @@ async def test_an_application_can_connect_to_multiple_database_clusters(ops_test """Test that an application can connect to different clusters of the same database.""" # Relate the application with both database clusters # and wait for them exchanging some connection data. - first_cluster_relation = await ops_test.model.add_relation( - f"{APPLICATION_APP_NAME}:{MULTIPLE_DATABASE_CLUSTERS_RELATION_NAME}", DATABASE_APP_NAME + db_app_name = ( + await get_app_name(ops_test, test_deployments=[ANOTHER_DATABASE_APP_NAME]) + or DATABASE_APP_NAME ) - second_cluster_relation = await ops_test.model.add_relation( + first_cluster_relation = await ops_test.model.integrate( + f"{APPLICATION_APP_NAME}:{MULTIPLE_DATABASE_CLUSTERS_RELATION_NAME}", db_app_name + ) + second_cluster_relation = await ops_test.model.integrate( f"{APPLICATION_APP_NAME}:{MULTIPLE_DATABASE_CLUSTERS_RELATION_NAME}", ANOTHER_DATABASE_APP_NAME, ) @@ -301,12 +345,16 @@ async def test_an_application_can_connect_to_multiple_aliased_database_clusters( # """Test that an application can connect to different clusters of the same database.""" # Relate the application with both database clusters # and wait for them exchanging some connection data. + db_app_name = ( + await get_app_name(ops_test, test_deployments=[ANOTHER_DATABASE_APP_NAME]) + or DATABASE_APP_NAME + ) await asyncio.gather( - ops_test.model.add_relation( + ops_test.model.integrate( f"{APPLICATION_APP_NAME}:{ALIASED_MULTIPLE_DATABASE_CLUSTERS_RELATION_NAME}", - DATABASE_APP_NAME, + db_app_name, ), - ops_test.model.add_relation( + ops_test.model.integrate( f"{APPLICATION_APP_NAME}:{ALIASED_MULTIPLE_DATABASE_CLUSTERS_RELATION_NAME}", ANOTHER_DATABASE_APP_NAME, ), @@ -336,8 +384,12 @@ async def test_an_application_can_connect_to_multiple_aliased_database_clusters( async def test_an_application_can_request_multiple_databases(ops_test: OpsTest, application_charm): """Test that an application can request additional databases using the same interface.""" # Relate the charms using another relation and wait for them exchanging some connection data. - await ops_test.model.add_relation( - f"{APPLICATION_APP_NAME}:{SECOND_DATABASE_RELATION_NAME}", DATABASE_APP_NAME + db_app_name = ( + await get_app_name(ops_test, test_deployments=[ANOTHER_DATABASE_APP_NAME]) + or DATABASE_APP_NAME + ) + await ops_test.model.integrate( + f"{APPLICATION_APP_NAME}:{SECOND_DATABASE_RELATION_NAME}", db_app_name ) await ops_test.model.wait_for_idle(apps=APP_NAMES, status="active") @@ -359,9 +411,12 @@ async def test_removed_relation_no_longer_has_access(ops_test: OpsTest): connection_string = await get_connection_string( ops_test, APPLICATION_APP_NAME, FIRST_DATABASE_RELATION_NAME ) - - await ops_test.model.applications[DATABASE_APP_NAME].remove_relation( - f"{APPLICATION_APP_NAME}:{FIRST_DATABASE_RELATION_NAME}", f"{DATABASE_APP_NAME}:database" + db_app_name = ( + await get_app_name(ops_test, test_deployments=[ANOTHER_DATABASE_APP_NAME]) + or DATABASE_APP_NAME + ) + await ops_test.model.applications[db_app_name].remove_relation( + f"{APPLICATION_APP_NAME}:{FIRST_DATABASE_RELATION_NAME}", f"{db_app_name}:database" ) await ops_test.model.wait_for_idle(apps=APP_NAMES, status="active") diff --git a/tests/integration/sharding_tests/helpers.py b/tests/integration/sharding_tests/helpers.py index d8f9f8ae5..fac50041b 100644 --- a/tests/integration/sharding_tests/helpers.py +++ b/tests/integration/sharding_tests/helpers.py @@ -16,7 +16,7 @@ async def generate_mongodb_client(ops_test: OpsTest, app_name: str, mongos: bool): """Returns a MongoDB client for mongos/mongod.""" hosts = [unit.public_address for unit in ops_test.model.applications[app_name].units] - password = await get_password(ops_test, app_name) + password = await get_password(ops_test, app_name=app_name) port = MONGOS_PORT if mongos else MONGOD_PORT hosts = [f"{host}:{port}" for host in hosts] hosts = ",".join(hosts) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index b52a78751..d2cbf6fbc 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -17,14 +17,15 @@ from pytest_operator.plugin import OpsTest from tenacity import RetryError -from .ha_tests.helpers import app_name, kill_unit_process +from .ha_tests.helpers import kill_unit_process from .helpers import ( - APP_NAME, PORT, UNIT_IDS, audit_log_line_sanity_check, + check_or_scale_app, count_primaries, find_unit, + get_app_name, get_leader_id, get_password, set_password, @@ -33,7 +34,6 @@ logger = logging.getLogger(__name__) -ANOTHER_DATABASE_APP_NAME = "another-database-a" MEDIAN_REELECTION_TIME = 12 @@ -45,6 +45,12 @@ @pytest.mark.abort_on_fail async def test_build_and_deploy(ops_test: OpsTest) -> None: """Build and deploy one unit of MongoDB.""" + # it is possible for users to provide their own cluster for testing. Hence check if there + # is a pre-existing cluster. + app_name = await get_app_name(ops_test) + if app_name: + return await check_or_scale_app(ops_test, app_name, len(UNIT_IDS)) + my_charm = await ops_test.build_charm(".") await ops_test.model.deploy(my_charm, num_units=len(UNIT_IDS)) await ops_test.model.wait_for_idle() @@ -53,17 +59,19 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: @pytest.mark.abort_on_fail async def test_status(ops_test: OpsTest) -> None: """Verifies that the application and unit are active.""" - await ops_test.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000) - assert len(ops_test.model.applications[APP_NAME].units) == len(UNIT_IDS) + app_name = await get_app_name(ops_test) + await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) + assert len(ops_test.model.applications[app_name].units) == len(UNIT_IDS) @pytest.mark.parametrize("unit_id", UNIT_IDS) async def test_unit_is_running_as_replica_set(ops_test: OpsTest, unit_id: int) -> None: """Tests that mongodb is running as a replica set for the application unit.""" # connect to mongo replica set - unit = ops_test.model.applications[APP_NAME].units[unit_id] + app_name = await get_app_name(ops_test) + unit = ops_test.model.applications[app_name].units[unit_id] connection = unit.public_address + ":" + str(PORT) - client = MongoClient(connection, replicaset="mongodb") + client = MongoClient(connection, replicaset=app_name) # check mongo replica set is ready try: @@ -77,15 +85,18 @@ async def test_unit_is_running_as_replica_set(ops_test: OpsTest, unit_id: int) - async def test_leader_is_primary_on_deployment(ops_test: OpsTest) -> None: """Tests that right after deployment that the primary unit is the leader.""" + app_name = await get_app_name(ops_test) # grab leader unit - leader_unit = await find_unit(ops_test, leader=True) + leader_unit = await find_unit(ops_test, leader=True, app_name=app_name) # verify that we have a leader assert leader_unit is not None, "No unit is leader" # connect to mongod - password = await get_password(ops_test) - client = MongoClient(unit_uri(leader_unit.public_address, password), directConnection=True) + password = await get_password(ops_test, app_name=app_name) + client = MongoClient( + unit_uri(leader_unit.public_address, password, app_name), directConnection=True + ) # verify primary status assert client.is_primary, "Leader is not primary" @@ -94,9 +105,10 @@ async def test_leader_is_primary_on_deployment(ops_test: OpsTest) -> None: async def test_exactly_one_primary(ops_test: OpsTest) -> None: """Tests that there is exactly one primary in the deployed units.""" + app_name = await get_app_name(ops_test) try: - password = await get_password(ops_test) - number_of_primaries = count_primaries(ops_test, password) + password = await get_password(ops_test, app_name=app_name) + number_of_primaries = await count_primaries(ops_test, password, app_name=app_name) except RetryError: number_of_primaries = 0 @@ -109,11 +121,14 @@ async def test_exactly_one_primary(ops_test: OpsTest) -> None: async def test_get_primary_action(ops_test: OpsTest) -> None: """Tests that action get-primary outputs the correct unit with the primary replica.""" # determine which unit is the primary + app_name = await get_app_name(ops_test) expected_primary = None - for unit in ops_test.model.applications[APP_NAME].units: + for unit in ops_test.model.applications[app_name].units: # connect to mongod password = await get_password(ops_test) - client = MongoClient(unit_uri(unit.public_address, password), directConnection=True) + client = MongoClient( + unit_uri(unit.public_address, password, app_name), directConnection=True + ) # check primary status if client.is_primary: @@ -125,7 +140,7 @@ async def test_get_primary_action(ops_test: OpsTest) -> None: # check if get-primary returns the correct primary unit regardless of # which unit the action is run on - for unit in ops_test.model.applications[APP_NAME].units: + for unit in ops_test.model.applications[app_name].units: # use get-primary action to find primary action = await unit.run_action("get-primary") action = await action.wait() @@ -146,10 +161,13 @@ async def test_set_password_action(ops_test: OpsTest) -> None: assert new_password != old_password new_password_reported = await get_password(ops_test) assert new_password == new_password_reported + user_app_name = await get_app_name(ops_test) # verify that the password is updated in mongod by inserting into the collection. try: - client = MongoClient(unit_uri(unit.public_address, new_password), directConnection=True) + client = MongoClient( + unit_uri(unit.public_address, new_password, user_app_name), directConnection=True + ) client["new-db"].list_collection_names() except PyMongoError as e: assert False, f"Failed to access collection with new password, error: {e}" @@ -167,7 +185,9 @@ async def test_set_password_action(ops_test: OpsTest) -> None: # verify that the password is updated in mongod by inserting into the collection. try: - client = MongoClient(unit_uri(unit.public_address, "safe_pass"), directConnection=True) + client = MongoClient( + unit_uri(unit.public_address, "safe_pass", user_app_name), directConnection=True + ) client["new-db"].list_collection_names() except PyMongoError as e: assert False, f"Failed to access collection with new password, error: {e}" @@ -177,13 +197,14 @@ async def test_set_password_action(ops_test: OpsTest) -> None: async def test_monitor_user(ops_test: OpsTest) -> None: """Test verifies that the monitor user can perform operations such as 'rs.conf()'.""" - unit = ops_test.model.applications[APP_NAME].units[0] - password = await get_password(ops_test, "mongodb", "monitor") + app_name = await get_app_name(ops_test) + unit = ops_test.model.applications[app_name].units[0] + password = await get_password(ops_test, username="monitor") replica_set_hosts = [ - unit.public_address for unit in ops_test.model.applications["mongodb"].units + unit.public_address for unit in ops_test.model.applications[app_name].units ] hosts = ",".join(replica_set_hosts) - replica_set_uri = f"mongodb://monitor:{password}@{hosts}/admin?replicaSet=mongodb" + replica_set_uri = f"mongodb://monitor:{password}@{hosts}/admin?replicaSet={app_name}" admin_mongod_cmd = f"{MONGO_SHELL} '{replica_set_uri}' --eval 'rs.conf()'" check_monitor_cmd = f"exec --unit {unit.name} -- {admin_mongod_cmd}" @@ -273,13 +294,13 @@ async def test_exactly_one_primary_reported_by_juju(ops_test: OpsTest) -> None: async def get_unit_messages(): """Collects unit status messages.""" - app = await app_name(ops_test) + app_name = await get_app_name(ops_test) unit_messages = {} async with ops_test.fast_forward(): time.sleep(20) - for unit in ops_test.model.applications[app].units: + for unit in ops_test.model.applications[app_name].units: unit_messages[unit.entity_id] = unit.workload_status_message return unit_messages @@ -321,7 +342,8 @@ def juju_reports_one_primary(unit_messages): async def test_audit_log(ops_test: OpsTest) -> None: """Test that audit log was created and contains actual audit data.""" - leader_unit = await find_unit(ops_test, leader=True) + app_name = await get_app_name(ops_test) + leader_unit = await find_unit(ops_test, leader=True, app_name=app_name) audit_log_snap_path = "/var/snap/charmed-mongodb/common/var/lib/mongodb/audit.json" audit_log = check_output( f"JUJU_MODEL={ops_test.model_full_name} juju ssh {leader_unit.name} 'sudo cat {audit_log_snap_path}'", diff --git a/tests/integration/tls_tests/helpers.py b/tests/integration/tls_tests/helpers.py index a3fd4065d..3523f1ee2 100644 --- a/tests/integration/tls_tests/helpers.py +++ b/tests/integration/tls_tests/helpers.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. +import json import logging from datetime import datetime @@ -9,9 +10,15 @@ from pytest_operator.plugin import OpsTest from tenacity import RetryError, Retrying, stop_after_attempt, wait_exponential -from ..ha_tests.helpers import app_name -from ..helpers import get_password +from ..helpers import ( + get_app_name, + get_application_relation_data, + get_password, + get_secret_content, + get_secret_id, +) +# TODO move this to a separate constants file PORT = 27017 MONGODB_SNAP_DATA_DIR = "/var/snap/charmed-mongodb/current" @@ -21,6 +28,8 @@ INTERNAL_CERT_PATH = f"{MONGOD_CONF_DIR}/internal-ca.crt" EXTERNAL_PEM_PATH = f"{MONGOD_CONF_DIR}/external-cert.pem" +TLS_RELATION_NAME = "certificates" + logger = logging.getLogger(__name__) @@ -28,13 +37,15 @@ class ProcessError(Exception): """Raised when a process fails.""" -async def mongo_tls_command(ops_test: OpsTest) -> str: +async def mongo_tls_command(ops_test: OpsTest, app_name=None) -> str: """Generates a command which verifies TLS status.""" - app = await app_name(ops_test) - replica_set_hosts = [unit.public_address for unit in ops_test.model.applications[app].units] - password = await get_password(ops_test, app) + app_name = app_name or await get_app_name(ops_test) + replica_set_hosts = [ + unit.public_address for unit in ops_test.model.applications[app_name].units + ] + password = await get_password(ops_test, app_name=app_name) hosts = ",".join(replica_set_hosts) - replica_set_uri = f"mongodb://operator:" f"{password}@" f"{hosts}/admin?replicaSet={app}" + replica_set_uri = f"mongodb://operator:" f"{password}@" f"{hosts}/admin?replicaSet={app_name}" return ( f"{MONGO_SHELL} '{replica_set_uri}' --eval 'rs.status()'" @@ -43,7 +54,7 @@ async def mongo_tls_command(ops_test: OpsTest) -> str: ) -async def check_tls(ops_test: OpsTest, unit: ops.model.Unit, enabled: bool) -> bool: +async def check_tls(ops_test: OpsTest, unit: ops.model.Unit, enabled: bool, app_name=None) -> bool: """Returns whether TLS is enabled on the specific PostgreSQL instance. Args: @@ -59,7 +70,7 @@ async def check_tls(ops_test: OpsTest, unit: ops.model.Unit, enabled: bool) -> b stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=30) ): with attempt: - mongod_tls_check = await mongo_tls_command(ops_test) + mongod_tls_check = await mongo_tls_command(ops_test, app_name=app_name) check_tls_cmd = f"exec --unit {unit.name} -- {mongod_tls_check}" return_code, _, _ = await ops_test.juju(*check_tls_cmd.split()) tls_enabled = return_code == 0 @@ -132,3 +143,58 @@ async def scp_file_preserve_ctime(ops_test: OpsTest, unit_name: str, path: str) ) return f"{filename}" + + +async def check_certs_correctly_distributed( + ops_test: OpsTest, unit: ops.Unit, app_name=None +) -> None: + """Comparing expected vs distributed certificates. + + Verifying certificates downloaded on the charm against the ones distributed by the TLS operator + """ + app_name = app_name or await get_app_name(ops_test) + app_secret_id = await get_secret_id(ops_test, app_name) + unit_secret_id = await get_secret_id(ops_test, unit.name) + app_secret_content = await get_secret_content(ops_test, app_secret_id) + unit_secret_content = await get_secret_content(ops_test, unit_secret_id) + app_current_crt = app_secret_content["csr-secret"] + unit_current_crt = unit_secret_content["csr-secret"] + + # Get the values for certs from the relation, as provided by TLS Charm + certificates_raw_data = await get_application_relation_data( + ops_test, app_name, TLS_RELATION_NAME, "certificates" + ) + certificates_data = json.loads(certificates_raw_data) + + external_item = [ + data + for data in certificates_data + if data["certificate_signing_request"].rstrip() == unit_current_crt.rstrip() + ][0] + internal_item = [ + data + for data in certificates_data + if data["certificate_signing_request"].rstrip() == app_current_crt.rstrip() + ][0] + + # Get a local copy of the external cert + external_copy_path = await scp_file_preserve_ctime(ops_test, unit.name, EXTERNAL_CERT_PATH) + + # Get the external cert value from the relation + relation_external_cert = "\n".join(external_item["chain"]) + + # CHECK: Compare if they are the same + with open(external_copy_path) as f: + external_contents_file = f.read() + assert relation_external_cert == external_contents_file + + # Get a local copy of the internal cert + internal_copy_path = await scp_file_preserve_ctime(ops_test, unit.name, INTERNAL_CERT_PATH) + + # Get the external cert value from the relation + relation_internal_cert = "\n".join(internal_item["chain"]) + + # CHECK: Compare if they are the same + with open(internal_copy_path) as f: + internal_contents_file = f.read() + assert relation_internal_cert == internal_contents_file diff --git a/tests/integration/tls_tests/test_tls.py b/tests/integration/tls_tests/test_tls.py index 9b932c606..8ef6c32b1 100644 --- a/tests/integration/tls_tests/test_tls.py +++ b/tests/integration/tls_tests/test_tls.py @@ -2,107 +2,69 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. -import json +import os +from pathlib import Path import pytest -from ops import Unit +import yaml from pytest_operator.plugin import OpsTest -from ..helpers import get_application_relation_data, get_secret_content, get_secret_id +from ..helpers import UNIT_IDS, check_or_scale_app, get_app_name from .helpers import ( EXTERNAL_CERT_PATH, INTERNAL_CERT_PATH, + check_certs_correctly_distributed, check_tls, - scp_file_preserve_ctime, time_file_created, time_process_started, ) -MONGO_COMMON_DIR = "/var/snap/charmed-mongodb/common" TLS_CERTIFICATES_APP_NAME = "tls-certificates-operator" -TLS_RELATION_NAME = "certificates" -DATABASE_APP_NAME = "mongodb" + +DATABASE_METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) +PORT = 27017 +DATABASE_APP_NAME = DATABASE_METADATA["name"] TLS_TEST_DATA = "tests/integration/tls_tests/data" DB_SERVICE = "snap.charmed-mongodb.mongod.service" -async def check_certs_correctly_distributed(ops_test: OpsTest, unit: Unit) -> None: - """Comparing expected vs distributed certificates. - - Verifying certificates downloaded on the charm against the ones distributed by the TLS operator - """ - app_secret_id = await get_secret_id(ops_test, DATABASE_APP_NAME) - unit_secret_id = await get_secret_id(ops_test, unit.name) - app_secret_content = await get_secret_content(ops_test, app_secret_id) - unit_secret_content = await get_secret_content(ops_test, unit_secret_id) - app_current_crt = app_secret_content["csr-secret"] - unit_current_crt = unit_secret_content["csr-secret"] - - # Get the values for certs from the relation, as provided by TLS Charm - certificates_raw_data = await get_application_relation_data( - ops_test, DATABASE_APP_NAME, TLS_RELATION_NAME, "certificates" - ) - certificates_data = json.loads(certificates_raw_data) - - external_item = [ - data - for data in certificates_data - if data["certificate_signing_request"].rstrip() == unit_current_crt.rstrip() - ][0] - internal_item = [ - data - for data in certificates_data - if data["certificate_signing_request"].rstrip() == app_current_crt.rstrip() - ][0] - - # Get a local copy of the external cert - external_copy_path = await scp_file_preserve_ctime(ops_test, unit.name, EXTERNAL_CERT_PATH) - - # Get the external cert value from the relation - relation_external_cert = "\n".join(external_item["chain"]) - - # CHECK: Compare if they are the same - with open(external_copy_path) as f: - external_contents_file = f.read() - assert relation_external_cert == external_contents_file - - # Get a local copy of the internal cert - internal_copy_path = await scp_file_preserve_ctime(ops_test, unit.name, INTERNAL_CERT_PATH) - - # Get the external cert value from the relation - relation_internal_cert = "\n".join(internal_item["chain"]) - - # CHECK: Compare if they are the same - with open(internal_copy_path) as f: - internal_contents_file = f.read() - assert relation_internal_cert == internal_contents_file - - +@pytest.mark.skipif( + os.environ.get("PYTEST_SKIP_DEPLOY", False), + reason="skipping deploy, model expected to be provided.", +) @pytest.mark.abort_on_fail async def test_build_and_deploy(ops_test: OpsTest) -> None: """Build and deploy one unit of MongoDB and one unit of TLS.""" - async with ops_test.fast_forward(): - my_charm = await ops_test.build_charm(".") - await ops_test.model.deploy(my_charm, num_units=3) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active") - - config = {"generate-self-signed-certificates": "true", "ca-common-name": "Test CA"} - await ops_test.model.deploy(TLS_CERTIFICATES_APP_NAME, channel="stable", config=config) - await ops_test.model.wait_for_idle( - apps=[TLS_CERTIFICATES_APP_NAME], status="active", timeout=1000 - ) + # it is possible for users to provide their own cluster for testing. Hence check if there + # is a pre-existing cluster. + app_name = await get_app_name(ops_test) + if app_name: + await check_or_scale_app(ops_test, app_name, len(UNIT_IDS)) + else: + app_name = DATABASE_APP_NAME + async with ops_test.fast_forward(): + my_charm = await ops_test.build_charm(".") + await ops_test.model.deploy(my_charm, num_units=3) + await ops_test.model.wait_for_idle(apps=[app_name], status="active") + + config = {"generate-self-signed-certificates": "true", "ca-common-name": "Test CA"} + await ops_test.model.deploy(TLS_CERTIFICATES_APP_NAME, channel="stable", config=config) + await ops_test.model.wait_for_idle( + apps=[TLS_CERTIFICATES_APP_NAME], status="active", timeout=1000 + ) async def test_enable_tls(ops_test: OpsTest) -> None: """Verify each unit has TLS enabled after relating to the TLS application.""" - # Relate it to the PostgreSQL to enable TLS. - await ops_test.model.relate(DATABASE_APP_NAME, TLS_CERTIFICATES_APP_NAME) + # Relate it to the MongoDB to enable TLS. + app_name = await get_app_name(ops_test) or DATABASE_APP_NAME + await ops_test.model.integrate(app_name, TLS_CERTIFICATES_APP_NAME) await ops_test.model.wait_for_idle(status="active", timeout=1000, idle_period=60) # Wait for all units enabling TLS. - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: - assert await check_tls(ops_test, unit, enabled=True) + for unit in ops_test.model.applications[app_name].units: + assert await check_tls(ops_test, unit, enabled=True, app_name=app_name) async def test_rotate_tls_key(ops_test: OpsTest) -> None: @@ -114,7 +76,10 @@ async def test_rotate_tls_key(ops_test: OpsTest) -> None: # private keys these certificates should be updated and the mongod service should be # restarted original_tls_times = {} - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + + app_name = await get_app_name(ops_test) or DATABASE_APP_NAME + + for unit in ops_test.model.applications[app_name].units: original_tls_times[unit.name] = {} original_tls_times[unit.name]["external_cert"] = await time_file_created( ops_test, unit.name, EXTERNAL_CERT_PATH @@ -128,7 +93,7 @@ async def test_rotate_tls_key(ops_test: OpsTest) -> None: check_certs_correctly_distributed(ops_test, unit) # set external and internal key using auto-generated key for each unit - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + for unit in ops_test.model.applications[app_name].units: action = await unit.run_action(action_name="set-tls-private-key") action = await action.wait() assert action.status == "completed", "setting external and internal key failed." @@ -136,17 +101,17 @@ async def test_rotate_tls_key(ops_test: OpsTest) -> None: # wait for certificate to be available and processed. Can get receive two certificate # available events and restart twice so we want to ensure we are idle for at least 1 minute await ops_test.model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", timeout=1000, idle_period=60 + apps=[app_name], status="active", timeout=1000, idle_period=60 ) # After updating both the external key and the internal key a new certificate request will be # made; then the certificates should be available and updated. - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + for unit in ops_test.model.applications[app_name].units: new_external_cert_time = await time_file_created(ops_test, unit.name, EXTERNAL_CERT_PATH) new_internal_cert_time = await time_file_created(ops_test, unit.name, INTERNAL_CERT_PATH) new_mongod_service_time = await time_process_started(ops_test, unit.name, DB_SERVICE) - check_certs_correctly_distributed(ops_test, unit) + check_certs_correctly_distributed(ops_test, unit, app_name=app_name) assert ( new_external_cert_time > original_tls_times[unit.name]["external_cert"] @@ -162,9 +127,9 @@ async def test_rotate_tls_key(ops_test: OpsTest) -> None: ), f"mongod service for {unit.name} was not restarted." # Verify that TLS is functioning on all units. - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + for unit in ops_test.model.applications[app_name].units: assert await check_tls( - ops_test, unit, enabled=True + ops_test, unit, enabled=True, app_name=app_name ), f"tls is not enabled for {unit.name}." @@ -177,7 +142,8 @@ async def test_set_tls_key(ops_test: OpsTest) -> None: # private keys these certificates should be updated and the mongod service should be # restarted original_tls_times = {} - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + app_name = await get_app_name(ops_test) or DATABASE_APP_NAME + for unit in ops_test.model.applications[app_name].units: original_tls_times[unit.name] = {} original_tls_times[unit.name]["external_cert"] = await time_file_created( ops_test, unit.name, EXTERNAL_CERT_PATH @@ -194,8 +160,8 @@ async def test_set_tls_key(ops_test: OpsTest) -> None: internal_key_contents = "".join(internal_key_contents) # set external and internal key for each unit - for unit_id in range(len(ops_test.model.applications[DATABASE_APP_NAME].units)): - unit = ops_test.model.applications[DATABASE_APP_NAME].units[unit_id] + for unit_id in range(len(ops_test.model.applications[app_name].units)): + unit = ops_test.model.applications[app_name].units[unit_id] with open(f"{TLS_TEST_DATA}/external-key-{unit_id}.pem") as f: external_key_contents = f.readlines() @@ -216,17 +182,17 @@ async def test_set_tls_key(ops_test: OpsTest) -> None: # wait for certificate to be available and processed. Can get receive two certificate # available events and restart twice so we want to ensure we are idle for at least 1 minute await ops_test.model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", timeout=1000, idle_period=60 + apps=[app_name], status="active", timeout=1000, idle_period=60 ) # After updating both the external key and the internal key a new certificate request will be # made; then the certificates should be available and updated. - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + for unit in ops_test.model.applications[app_name].units: new_external_cert_time = await time_file_created(ops_test, unit.name, EXTERNAL_CERT_PATH) new_internal_cert_time = await time_file_created(ops_test, unit.name, INTERNAL_CERT_PATH) new_mongod_service_time = await time_process_started(ops_test, unit.name, DB_SERVICE) - check_certs_correctly_distributed(ops_test, unit) + check_certs_correctly_distributed(ops_test, unit, app_name=app_name) assert ( new_external_cert_time > original_tls_times[unit.name]["external_cert"] @@ -242,23 +208,24 @@ async def test_set_tls_key(ops_test: OpsTest) -> None: ), f"mongod service for {unit.name} was not restarted." # Verify that TLS is functioning on all units. - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + for unit in ops_test.model.applications[app_name].units: assert await check_tls( - ops_test, unit, enabled=True + ops_test, unit, enabled=True, app_name=app_name ), f"tls is not enabled for {unit.name}." async def test_disable_tls(ops_test: OpsTest) -> None: """Verify each unit has TLS disabled after removing relation to the TLS application.""" # Remove the relation. - await ops_test.model.applications[DATABASE_APP_NAME].remove_relation( - f"{DATABASE_APP_NAME}:certificates", f"{TLS_CERTIFICATES_APP_NAME}:certificates" + app_name = await get_app_name(ops_test) or DATABASE_APP_NAME + await ops_test.model.applications[app_name].remove_relation( + f"{app_name}:certificates", f"{TLS_CERTIFICATES_APP_NAME}:certificates" ) await ops_test.model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", timeout=1000, idle_period=60 + apps=[app_name], status="active", timeout=1000, idle_period=60 ) # Wait for all units disabling TLS. - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: - assert await check_tls(ops_test, unit, enabled=False) + for unit in ops_test.model.applications[app_name].units: + assert await check_tls(ops_test, unit, enabled=False, app_name=app_name)