diff --git a/tests/integration/backup_tests/helpers.py b/tests/integration/backup_tests/helpers.py index a665be20a..26f1e6dd2 100644 --- a/tests/integration/backup_tests/helpers.py +++ b/tests/integration/backup_tests/helpers.py @@ -68,6 +68,15 @@ async def get_leader_unit(ops_test: OpsTest, db_app_name=None) -> ops.model.Unit return unit +async def get_backup_list(ops_test: OpsTest, db_app_name=None) -> str: + """Count the number of logical backups.""" + leader_unit = await get_leader_unit(ops_test, db_app_name=db_app_name) + action = await leader_unit.run_action(action_name="list-backups") + list_result = await action.wait() + list_result = list_result.results["backups"] + return list_result + + async def count_logical_backups(db_unit: ops.model.Unit) -> int: """Count the number of logical backups.""" action = await db_unit.run_action(action_name="list-backups") diff --git a/tests/integration/sharding_tests/test_sharding_backups.py b/tests/integration/sharding_tests/test_sharding_backups.py index 49a6e6190..9fcbb0af0 100644 --- a/tests/integration/sharding_tests/test_sharding_backups.py +++ b/tests/integration/sharding_tests/test_sharding_backups.py @@ -4,25 +4,41 @@ import secrets import string +import time import pytest from pytest_operator.plugin import OpsTest -from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed +from tenacity import Retrying, stop_after_delay, wait_fixed from ..backup_tests import helpers as backup_helpers + +# from .writes_helpers import writes_helpers from ..helpers import get_leader_id, get_password, set_password +from . import writes_helpers S3_APP_NAME = "s3-integrator" SHARD_ONE_APP_NAME = "shard-one" SHARD_TWO_APP_NAME = "shard-two" SHARD_APPS = [SHARD_ONE_APP_NAME, SHARD_TWO_APP_NAME] -CONFIG_SERVER_APP_NAME = "config-server-one" +CONFIG_SERVER_APP_NAME = "config-server" SHARD_REL_NAME = "sharding" CONFIG_SERVER_REL_NAME = "config-server" S3_REL_NAME = "s3-credentials" TIMEOUT = 10 * 60 +@pytest.fixture() +async def add_writes_to_db(ops_test: OpsTest): + """Adds writes to DB before test starts and clears writes at the end of the test.""" + await writes_helpers.start_continous_writes( + ops_test, 1, config_server_name=CONFIG_SERVER_APP_NAME + ) + time.sleep(20) + await writes_helpers.stop_continous_writes(ops_test, config_server_name=CONFIG_SERVER_APP_NAME) + yield + await writes_helpers.clear_db_writes(ops_test) + + @pytest.mark.group(1) @pytest.mark.abort_on_fail async def test_build_and_deploy(ops_test: OpsTest) -> None: @@ -100,19 +116,16 @@ async def test_set_credentials_in_cluster(ops_test: OpsTest, github_secrets) -> @pytest.mark.group(1) @pytest.mark.abort_on_fail -async def test_create_and_list_backups_in_cluster(ops_test: OpsTest, github_secrets) -> None: +async def test_create_and_list_backups_in_cluster(ops_test: OpsTest) -> None: """Tests that sharded cluster can successfully create and list backups.""" - leader_unit = await backup_helpers.get_leader_unit( - ops_test, db_app_name=CONFIG_SERVER_APP_NAME - ) - await backup_helpers.set_credentials(ops_test, github_secrets, cloud="AWS") # verify backup list works - action = await leader_unit.run_action(action_name="list-backups") - list_result = await action.wait() - backups = list_result.results["backups"] + backups = await backup_helpers.get_backup_list(ops_test, db_app_name=CONFIG_SERVER_APP_NAME) assert backups, "backups not outputted" # verify backup is started + leader_unit = await backup_helpers.get_leader_unit( + ops_test, db_app_name=CONFIG_SERVER_APP_NAME + ) action = await leader_unit.run_action(action_name="create-backup") backup_result = await action.wait() assert "backup started" in backup_result.results["backup-status"], "backup didn't start" @@ -121,13 +134,10 @@ async def test_create_and_list_backups_in_cluster(ops_test: OpsTest, github_secr # the action `create-backup` only confirms that the command was sent to the `pbm`. Creating a # backup can take a lot of time so this function returns once the command was successfully # sent to pbm. Therefore we should retry listing the backup several times - try: - for attempt in Retrying(stop=stop_after_delay(20), wait=wait_fixed(3)): - with attempt: - backups = await backup_helpers.count_logical_backups(leader_unit) - assert backups == 1 - except RetryError: - assert backups == 1, "Backup not created." + for attempt in Retrying(stop=stop_after_delay(20), wait=wait_fixed(3), reraise=True): + with attempt: + backups = await backup_helpers.count_logical_backups(leader_unit) + assert backups == 1 @pytest.mark.group(1) @@ -154,12 +164,16 @@ async def test_rotate_backup_password(ops_test: OpsTest) -> None: config_leader_id = await get_leader_id(ops_test, app_name=CONFIG_SERVER_APP_NAME) new_password = "new-password" - shard_backup_password = get_password(ops_test, username="backup", app_name=SHARD_ONE_APP_NAME) + shard_backup_password = await get_password( + ops_test, username="backup", app_name=SHARD_ONE_APP_NAME + ) assert ( shard_backup_password != new_password ), "shard-one is incorrectly already set to the new password." - shard_backup_password = get_password(ops_test, username="backup", app_name=SHARD_TWO_APP_NAME) + shard_backup_password = await get_password( + ops_test, username="backup", app_name=SHARD_TWO_APP_NAME + ) assert ( shard_backup_password != new_password ), "shard-two is incorrectly already set to the new password." @@ -173,10 +187,14 @@ async def test_rotate_backup_password(ops_test: OpsTest) -> None: timeout=TIMEOUT, ) - shard_backup_password = get_password(ops_test, username="backup", app_name=SHARD_ONE_APP_NAME) + shard_backup_password = await get_password( + ops_test, username="backup", app_name=SHARD_ONE_APP_NAME + ) assert shard_backup_password != new_password, "Application shard-one did not rotate password" - shard_backup_password = get_password(ops_test, username="backup", app_name=SHARD_TWO_APP_NAME) + shard_backup_password = await get_password( + ops_test, username="backup", app_name=SHARD_TWO_APP_NAME + ) assert shard_backup_password != new_password, "Application shard-two did not rotate password" # verify backup actions work after password rotation @@ -193,10 +211,79 @@ async def test_rotate_backup_password(ops_test: OpsTest) -> None: # the action `create-backup` only confirms that the command was sent to the `pbm`. Creating a # backup can take a lot of time so this function returns once the command was successfully # sent to pbm. Therefore we should retry listing the backup several times - try: - for attempt in Retrying(stop=stop_after_delay(20), wait=wait_fixed(3)): - with attempt: - backups = await backup_helpers.count_logical_backups(leader_unit) - assert backups == 2 - except RetryError: - assert backups == 2, "Backup not created after password rotation." + for attempt in Retrying(stop=stop_after_delay(20), wait=wait_fixed(3), reraise=True): + with attempt: + backups = await backup_helpers.count_logical_backups(leader_unit) + assert backups == 2, "Backup not created after password rotation." + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_restore_backup(ops_test: OpsTest, add_writes_to_db) -> None: + """Tests that sharded Charmed MongoDB cluster supports restores.""" + # count total writes + cluster_writes = await writes_helpers.get_cluster_writes_count( + ops_test, shard_app_names=SHARD_APPS + ) + assert cluster_writes["total_writes"] > 0, "no writes to backup" + + leader_unit = await backup_helpers.get_leader_unit( + ops_test, db_app_name=CONFIG_SERVER_APP_NAME + ) + prev_backups = await backup_helpers.count_logical_backups(leader_unit) + await ops_test.model.wait_for_idle( + apps=[CONFIG_SERVER_APP_NAME], status="active", idle_period=20 + ), + action = await leader_unit.run_action(action_name="create-backup") + first_backup = await action.wait() + assert first_backup.status == "completed", "First backup not started." + + # verify that backup was made on the bucket + for attempt in Retrying(stop=stop_after_delay(4), wait=wait_fixed(5), reraise=True): + with attempt: + backups = await backup_helpers.count_logical_backups(leader_unit) + assert backups == prev_backups + 1, "Backup not created." + + await ops_test.model.wait_for_idle( + apps=[CONFIG_SERVER_APP_NAME], status="active", idle_period=20 + ), + + # add writes to be cleared after restoring the backup. Note these are written to the same + # collection that was backed up. + await writes_helpers.insert_unwanted_data(ops_test) + new_total_writes = await writes_helpers.get_cluster_writes_count( + ops_test, shard_app_names=SHARD_APPS + ) + assert ( + new_total_writes["total_writes"] > cluster_writes["total_writes"] + ), "No writes to be cleared after restoring." + + # find most recent backup id and restore + list_result = await backup_helpers.get_backup_list( + ops_test, db_app_name=CONFIG_SERVER_APP_NAME + ) + most_recent_backup = list_result.split("\n")[-1] + backup_id = most_recent_backup.split()[0] + action = await leader_unit.run_action(action_name="restore", **{"backup-id": backup_id}) + restore = await action.wait() + assert restore.results["restore-status"] == "restore started", "restore not successful" + + await ops_test.model.wait_for_idle( + apps=[CONFIG_SERVER_APP_NAME], status="active", idle_period=20 + ), + + # verify all writes are present + for attempt in Retrying(stop=stop_after_delay(4), wait=wait_fixed(20), reraise=True): + with attempt: + restored_total_writes = await writes_helpers.get_cluster_writes_count( + ops_test, shard_app_names=SHARD_APPS + ) + assert ( + restored_total_writes["total_writes"] == cluster_writes["total_writes"] + ), "writes not correctly restored to whole cluster" + assert ( + restored_total_writes[SHARD_ONE_APP_NAME] == cluster_writes[SHARD_ONE_APP_NAME] + ), f"writes not correctly restored to {SHARD_ONE_APP_NAME}" + assert ( + restored_total_writes[SHARD_TWO_APP_NAME] == cluster_writes[SHARD_TWO_APP_NAME] + ), f"writes not correctly restored to {SHARD_TWO_APP_NAME}" diff --git a/tests/integration/sharding_tests/writes_helpers.py b/tests/integration/sharding_tests/writes_helpers.py new file mode 100644 index 000000000..4acd088d4 --- /dev/null +++ b/tests/integration/sharding_tests/writes_helpers.py @@ -0,0 +1,140 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +import logging +import subprocess +from pathlib import Path +from typing import Dict, List + +import yaml +from pymongo import MongoClient +from pytest_operator.plugin import OpsTest + +from ..helpers import get_password + +# TODO move these to a separate file for constants \ config +METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) +MONGOS_PORT = 27018 +MONGOD_PORT = 27017 +APP_NAME = "config-server" + +logger = logging.getLogger(__name__) + + +class ProcessError(Exception): + """Raised when a process fails.""" + + +class ProcessRunningError(Exception): + """Raised when a process is running when it is not expected to be.""" + + +async def mongos_uri(ops_test: OpsTest, config_server_name=APP_NAME) -> str: + """Returns a uri for connecting to mongos.""" + password = await get_password(ops_test, app_name=config_server_name) + hosts = [ + f"{unit.public_address}:{MONGOS_PORT}" + for unit in ops_test.model.applications[config_server_name].units + ] + hosts = ",".join(hosts) + return f"mongodb://operator:{password}@{hosts}/admin" + + +async def clear_db_writes(ops_test: OpsTest, config_server_name=APP_NAME) -> bool: + """Stop the DB process and remove any writes to the test collection.""" + await stop_continous_writes(ops_test) + + # remove collection from database + connection_string = await mongos_uri(ops_test, config_server_name) + + client = MongoClient(connection_string) + db = client["new-db"] + + # collection for continuous writes + test_collection = db["test_collection"] + test_collection.drop() + + client.close() + + +async def start_continous_writes( + ops_test: OpsTest, starting_number: int, config_server_name=APP_NAME +) -> None: + """Starts continuous writes to MongoDB.""" + connection_string = await mongos_uri(ops_test, config_server_name) + + # run continuous writes in the background. + subprocess.Popen( + [ + "python3", + "tests/integration/ha_tests/continuous_writes.py", + connection_string, + str(starting_number), + ] + ) + + +async def stop_continous_writes(ops_test: OpsTest, config_server_name=APP_NAME) -> int: + """Stops continuous writes to MongoDB and returns the last written value.""" + # stop the process + proc = subprocess.Popen(["pkill", "-9", "-f", "continuous_writes.py"]) + + # wait for process to be killed + proc.communicate() + + connection_string = await mongos_uri(ops_test, config_server_name) + + client = MongoClient(connection_string) + db = client["new-db"] + test_collection = db["test_collection"] + client.admin.command("enableSharding", "new-db") + + # last written value should be the highest number in the database. + last_written_value = test_collection.find_one(sort=[("number", -1)]) + client.close() + return last_written_value + + +async def count_shard_writes(ops_test: OpsTest, shard_app_name=APP_NAME) -> int: + """New versions of pymongo no longer support the count operation, instead find is used.""" + connection_string = await mongos_uri(ops_test, shard_app_name) + password = await get_password(ops_test, app_name=shard_app_name) + hosts = [ + f"{unit.public_address}:{MONGOD_PORT}" + for unit in ops_test.model.applications[shard_app_name].units + ] + hosts = ",".join(hosts) + connection_string = f"mongodb://operator:{password}@{hosts}/admin" + + client = MongoClient(connection_string) + db = client["new-db"] + test_collection = db["test_collection"] + count = test_collection.count_documents({}) + client.close() + return count + + +async def get_cluster_writes_count(ops_test, shard_app_names: List[str]) -> Dict: + """Returns a dictionary of the writes for each cluster_component and the total writes.""" + cluster_write_count = {} + total_writes = 0 + for app_name in shard_app_names: + component_writes = await count_shard_writes(ops_test, app_name) + cluster_write_count[app_name] = component_writes + total_writes += component_writes + + cluster_write_count["total_writes"] = total_writes + return cluster_write_count + + +async def insert_unwanted_data(ops_test: OpsTest, config_server_name=APP_NAME) -> None: + """Inserts the data into the MongoDB cluster via primary replica.""" + connection_string = await mongos_uri(ops_test, config_server_name) + + client = MongoClient(connection_string) + db = client["new-db"] + test_collection = db["test_collection"] + test_collection.insert_one({"unwanted_data": "bad data 1"}) + test_collection.insert_one({"unwanted_data": "bad data 2"}) + test_collection.insert_one({"unwanted_data": "bad data 3"}) + client.close()