diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 913ab2e69..efc279200 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -52,7 +52,7 @@ class ProcessRunningError(Exception): """Raised when a process is running when it is not expected to be.""" -def replica_set_client(replica_ips: List[str], password: str, app_name=APP_NAME) -> MongoClient: +def replica_set_client(replica_ips: List[str], password: str, app_name=None) -> MongoClient: """Generates the replica set URI for multiple IP addresses. Args: @@ -67,7 +67,7 @@ def replica_set_client(replica_ips: List[str], password: str, app_name=APP_NAME) return MongoClient(replica_set_uri) -async def fetch_replica_set_members(replica_ips: List[str], ops_test: OpsTest): +async def fetch_replica_set_members(replica_ips: List[str], ops_test: OpsTest, app_name=None): """Fetches the IPs listed as replica set members in the MongoDB replica set configuration. Args: @@ -76,7 +76,7 @@ async def fetch_replica_set_members(replica_ips: List[str], ops_test: OpsTest): app: name of application which has the cluster. """ # connect to replica set uri - app_name = await get_app_name(ops_test) + app_name = app_name or await get_app_name(ops_test) password = await get_password(ops_test, app_name) client = replica_set_client(replica_ips, password, app_name) @@ -150,10 +150,11 @@ async def fetch_primary( return primary -async def count_primaries(ops_test: OpsTest) -> int: +# TODO remove duplication with common helpers +async def count_primaries(ops_test: OpsTest, app_name=None) -> int: """Returns the number of primaries in a replica set.""" # connect to MongoDB client - app_name = await get_app_name(ops_test) + app_name = app_name or await get_app_name(ops_test) password = await get_password(ops_test, app_name) replica_set_hosts = [ unit.public_address for unit in ops_test.model.applications[app_name].units @@ -224,10 +225,10 @@ async def retrieve_entries(ops_test, app_name, db_name, collection_name, query_f return cluster_entries -async def find_unit(ops_test: OpsTest, leader: bool) -> ops.model.Unit: +async def find_unit(ops_test: OpsTest, leader: bool, app_name=None) -> ops.model.Unit: """Helper function identifies the a unit, based on need for leader or non-leader.""" ret_unit = None - app_name = await get_app_name(ops_test) + app_name = app_name or await get_app_name(ops_test) for unit in ops_test.model.applications[app_name].units: if await unit.is_leader_from_status() == leader: ret_unit = unit @@ -282,7 +283,7 @@ async def start_continous_writes(ops_test: OpsTest, starting_number: int) -> Non ) -async def stop_continous_writes(ops_test: OpsTest, down_unit=None) -> int: +async def stop_continous_writes(ops_test: OpsTest, down_unit=None, app_name=None) -> int: """Stops continuous writes to MongoDB and returns the last written value. In the future this should be put in a dummy charm. @@ -293,7 +294,7 @@ async def stop_continous_writes(ops_test: OpsTest, down_unit=None) -> int: # wait for process to be killed proc.communicate() - app_name = await get_app_name(ops_test) + app_name = app_name or await get_app_name(ops_test) password = await get_password(ops_test, app_name, down_unit) hosts = [unit.public_address for unit in ops_test.model.applications[app_name].units] hosts = ",".join(hosts) @@ -309,9 +310,9 @@ async def stop_continous_writes(ops_test: OpsTest, down_unit=None) -> int: return last_written_value -async def count_writes(ops_test: OpsTest, down_unit=None) -> int: +async def count_writes(ops_test: OpsTest, down_unit=None, app_name=None) -> int: """New versions of pymongo no longer support the count operation, instead find is used.""" - app_name = await get_app_name(ops_test) + app_name = app_name or await get_app_name(ops_test) password = await get_password(ops_test, app_name, down_unit) hosts = [unit.public_address for unit in ops_test.model.applications[app_name].units] hosts = ",".join(hosts) @@ -325,12 +326,12 @@ async def count_writes(ops_test: OpsTest, down_unit=None) -> int: return count -async def secondary_up_to_date(ops_test: OpsTest, unit_ip, expected_writes) -> bool: +async def secondary_up_to_date(ops_test: OpsTest, unit_ip, expected_writes, app_name=None) -> bool: """Checks if secondary is up to date with the cluster. Retries over the period of one minute to give secondary adequate time to copy over data. """ - app_name = await get_app_name(ops_test) + app_name = app_name or await get_app_name(ops_test) password = await get_password(ops_test, app_name) connection_string = f"mongodb://operator:{password}@{unit_ip}:{PORT}/admin?" client = MongoClient(connection_string, directConnection=True) @@ -398,29 +399,29 @@ def storage_id(ops_test, unit_name): return line.split()[1] -async def add_unit_with_storage(ops_test, app, storage): +async def add_unit_with_storage(ops_test, app_name, storage): """Adds unit with storage. Note: this function exists as a temporary solution until this issue is resolved: https://github.com/juju/python-libjuju/issues/695 """ - expected_units = len(ops_test.model.applications[app].units) + 1 - prev_units = [unit.name for unit in ops_test.model.applications[app].units] + expected_units = len(ops_test.model.applications[app_name].units) + 1 + prev_units = [unit.name for unit in ops_test.model.applications[app_name].units] model_name = ops_test.model.info.name - add_unit_cmd = f"add-unit {app} --model={model_name} --attach-storage={storage}".split() + add_unit_cmd = f"add-unit {app_name} --model={model_name} --attach-storage={storage}".split() await ops_test.juju(*add_unit_cmd) - await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1000) + await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) assert ( - len(ops_test.model.applications[app].units) == expected_units + len(ops_test.model.applications[app_name].units) == expected_units ), "New unit not added to model" # verify storage attached - curr_units = [unit.name for unit in ops_test.model.applications[app].units] + curr_units = [unit.name for unit in ops_test.model.applications[app_name].units] new_unit = list(set(curr_units) - set(prev_units))[0] assert storage_id(ops_test, new_unit) == storage, "unit added with incorrect storage" # return a reference to newly added unit - for unit in ops_test.model.applications[app].units: + for unit in ops_test.model.applications[app_name].units: if unit.name == new_unit: return unit @@ -458,9 +459,9 @@ async def reused_storage(ops_test: OpsTest, unit_name, removal_time) -> bool: return False -async def insert_focal_to_cluster(ops_test: OpsTest) -> None: +async def insert_focal_to_cluster(ops_test: OpsTest, app_name=None) -> None: """Inserts the Focal Fossa data into the MongoDB cluster via primary replica.""" - app_name = await get_app_name(ops_test) + app_name = app_name or await get_app_name(ops_test) ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] primary = (await replica_set_primary(ip_addresses, ops_test)).public_address password = await get_password(ops_test, app_name) @@ -471,10 +472,10 @@ async def insert_focal_to_cluster(ops_test: OpsTest) -> None: client.close() -async def kill_unit_process(ops_test: OpsTest, unit_name: str, kill_code: str): +async def kill_unit_process(ops_test: OpsTest, unit_name: str, kill_code: str, app_name=None): """Kills the DB process on the unit according to the provided kill code.""" # killing the only replica can be disastrous - app_name = await get_app_name(ops_test) + app_name = app_name or await get_app_name(ops_test) if len(ops_test.model.applications[app_name].units) < 2: await ops_test.model.applications[app_name].add_unit(count=1) await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) @@ -487,9 +488,9 @@ async def kill_unit_process(ops_test: OpsTest, unit_name: str, kill_code: str): ) -async def mongod_ready(ops_test, unit_ip) -> bool: +async def mongod_ready(ops_test, unit_ip, app_name=None) -> bool: """Verifies replica is running and available.""" - app_name = await get_app_name(ops_test) + app_name = app_name or await get_app_name(ops_test) password = await get_password(ops_test, app_name) client = MongoClient(unit_uri(unit_ip, password, app_name), directConnection=True) try: @@ -505,9 +506,9 @@ async def mongod_ready(ops_test, unit_ip) -> bool: return True -async def db_step_down(ops_test: OpsTest, old_primary_unit: str, sigterm_time: int): +async def db_step_down(ops_test: OpsTest, old_primary_unit: str, sigterm_time: int, app_name=None): # loop through all units that aren't the old primary - app_name = await get_app_name(ops_test) + app_name = app_name or await get_app_name(ops_test) for unit in ops_test.model.applications[app_name].units: # verify log file exists on this machine search_file = f"exec --unit {unit.name} ls {MONGODB_LOG_PATH}" @@ -540,9 +541,9 @@ async def db_step_down(ops_test: OpsTest, old_primary_unit: str, sigterm_time: i return False -async def all_db_processes_down(ops_test: OpsTest) -> bool: +async def all_db_processes_down(ops_test: OpsTest, app_name=None) -> bool: """Verifies that all units of the charm do not have the DB process running.""" - app_name = await get_app_name(ops_test) + app_name = app_name or await get_app_name(ops_test) try: for attempt in Retrying(stop=stop_after_attempt(60), wait=wait_fixed(3)): @@ -655,9 +656,9 @@ async def start_mongod(ops_test: OpsTest, unit) -> None: @retry(stop=stop_after_attempt(8), wait=wait_fixed(15)) -async def verify_replica_set_configuration(ops_test: OpsTest) -> None: +async def verify_replica_set_configuration(ops_test: OpsTest, app_name=None) -> None: """Verifies presence of primary, replica set members, and number of primaries.""" - app_name = await get_app_name(ops_test) + app_name = app_name or await get_app_name(ops_test) # `get_unit_ip` is used instead of `.public_address` because of a bug in python-libjuju that # incorrectly reports the IP addresses after the network is restored this is reported as a # bug here: https://github.com/juju/python-libjuju/issues/738 . Once this bug is resolved use @@ -668,16 +669,16 @@ async def verify_replica_set_configuration(ops_test: OpsTest) -> None: ] # verify presence of primary - new_primary = await replica_set_primary(ip_addresses, ops_test) + new_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) assert new_primary.name, "primary not elected." # verify all units are running under the same replset - member_ips = await fetch_replica_set_members(ip_addresses, ops_test) + member_ips = await fetch_replica_set_members(ip_addresses, ops_test, app_name=app_name) assert set(member_ips) == set(ip_addresses), "all members not running under the same replset" # verify there is only one primary assert ( - await count_primaries(ops_test) == 1 + await count_primaries(ops_test, app_name=app_name) == 1 ), "there are more than one primary in the replica set." @@ -791,8 +792,9 @@ async def scale_and_verify(ops_test: OpsTest, count: int, remove_leader: bool = assert primary is not None, "Replica set has no primary" -async def _verify_writes(ops_test: OpsTest): +async def verify_writes(ops_test: OpsTest, app_name=None): # verify that no writes to the db were missed + app_name = app_name or await get_app_name(ops_test) total_expected_writes = await stop_continous_writes(ops_test) - actual_writes = await count_writes(ops_test) + actual_writes = await count_writes(ops_test, app_name) assert total_expected_writes["number"] == actual_writes, "writes to the db were missed." diff --git a/tests/integration/ha_tests/test_ha.py b/tests/integration/ha_tests/test_ha.py index a7da59f13..340ecc2df 100644 --- a/tests/integration/ha_tests/test_ha.py +++ b/tests/integration/ha_tests/test_ha.py @@ -13,7 +13,6 @@ from ..helpers import ( check_or_scale_app, - count_primaries, get_app_name, get_unit_ip, instance_ip, @@ -21,12 +20,15 @@ unit_uri, ) from .helpers import ( - APP_NAME, MONGODB_LOG_PATH, - _verify_writes, add_unit_with_storage, all_db_processes_down, clear_db_writes, +) +from .helpers import ( + count_primaries as count_primaries_ha, # TODO remove this duplication +) +from .helpers import ( count_writes, cut_network_from_unit, db_step_down, @@ -56,6 +58,7 @@ update_restart_delay, update_service_logging, verify_replica_set_configuration, + verify_writes, wait_network_restore, ) @@ -78,7 +81,8 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: required_units = 3 user_app_name = await get_app_name(ops_test) if user_app_name: - return check_or_scale_app(ops_test, user_app_name, required_units) + await check_or_scale_app(ops_test, user_app_name, required_units) + return my_charm = await ops_test.build_charm(".") await ops_test.model.deploy(my_charm, num_units=required_units) @@ -119,8 +123,8 @@ async def test_storage_re_use(ops_test, continuous_writes): ), "attached storage not properly reused by MongoDB." # verify that the no writes were skipped - total_expected_writes = await stop_continous_writes(ops_test) - actual_writes = await count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes @@ -143,17 +147,18 @@ async def test_add_units(ops_test: OpsTest, continuous_writes) -> None: ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] # connect to replica set uri and get replica set members - member_ips = await fetch_replica_set_members(ip_addresses, ops_test) + member_ips = await fetch_replica_set_members(ip_addresses, ops_test, app_name=app_name) # verify that the replica set members have the correct units assert set(member_ips) == set(ip_addresses) # verify that the no writes were skipped - total_expected_writes = await stop_continous_writes(ops_test) - actual_writes = await count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes +@pytest.mark.abort_on_fail @pytest.mark.abort_on_fail async def test_scale_down_capablities(ops_test: OpsTest, continuous_writes) -> None: """Tests clusters behavior when scaling down a minority and removing a primary replica. @@ -175,7 +180,7 @@ async def test_scale_down_capablities(ops_test: OpsTest, continuous_writes) -> N minority_count = int(len(ops_test.model.applications[app_name].units) / 2) # find leader unit - leader_unit = await find_unit(ops_test, leader=True) + leader_unit = await find_unit(ops_test, leader=True, app_name=app_name) minority_count -= 1 # verify that we have a leader @@ -208,7 +213,7 @@ async def test_scale_down_capablities(ops_test: OpsTest, continuous_writes) -> N # check that the replica set with the remaining units has a primary try: - primary = await replica_set_primary(ip_addresses, ops_test) + primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) except RetryError: primary = None @@ -221,23 +226,24 @@ async def test_scale_down_capablities(ops_test: OpsTest, continuous_writes) -> N ), "replica set primary is not one of the available units" # verify that the configuration of mongodb no longer has the deleted ip - member_ips = await fetch_replica_set_members(ip_addresses, ops_test) + member_ips = await fetch_replica_set_members(ip_addresses, ops_test, app_name=app_name) assert set(member_ips) == set(ip_addresses), "mongod config contains deleted units" # verify that the no writes were skipped - total_expected_writes = await stop_continous_writes(ops_test) - actual_writes = await count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes +@pytest.mark.skip("skip") async def test_replication_across_members(ops_test: OpsTest, continuous_writes) -> None: """Check consistency, ie write to primary, read data from secondaries.""" # first find primary, write to primary, then read from each unit await insert_focal_to_cluster(ops_test) app_name = await get_app_name(ops_test) ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] - primary = await replica_set_primary(ip_addresses, ops_test) + primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) password = await get_password_ha(ops_test, app_name) secondaries = set(ip_addresses) - set([primary.public_address]) @@ -252,15 +258,17 @@ async def test_replication_across_members(ops_test: OpsTest, continuous_writes) client.close() # verify that the no writes were skipped - total_expected_writes = await stop_continous_writes(ops_test) - actual_writes = await count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes +@pytest.mark.skip("skip") async def test_unique_cluster_dbs(ops_test: OpsTest, continuous_writes) -> None: """Verify unique clusters do not share DBs.""" # first find primary, write to primary, - await insert_focal_to_cluster(ops_test) + app_name = await get_app_name(ops_test) + await insert_focal_to_cluster(ops_test, app_name=app_name) # deploy new cluster my_charm = await ops_test.build_charm(".") @@ -289,7 +297,7 @@ async def test_unique_cluster_dbs(ops_test: OpsTest, continuous_writes) -> None: cluster_2_entries = await retrieve_entries( ops_test, - app_name=APP_NAME, + app_name=app_name, db_name="new-db", collection_name="test_ubuntu_collection", query_field="release_name", @@ -299,20 +307,21 @@ async def test_unique_cluster_dbs(ops_test: OpsTest, continuous_writes) -> None: assert len(common_entries) == 0, "Writes from one cluster are replicated to another cluster." # verify that the no writes were skipped - total_expected_writes = await stop_continous_writes(ops_test) - actual_writes = await count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes +@pytest.mark.skip("skip") async def test_replication_member_scaling(ops_test: OpsTest, continuous_writes) -> None: """Verify newly added and newly removed members properly replica data. Verify newly members have replicated data and newly removed members are gone without data. """ - # first find primary, write to primary, - await insert_focal_to_cluster(ops_test) - app_name = await get_app_name(ops_test) + + # first find primary, write to primary, + await insert_focal_to_cluster(ops_test, app_name=app_name) original_ip_addresses = [ unit.public_address for unit in ops_test.model.applications[app_name].units ] @@ -344,124 +353,131 @@ async def test_replication_member_scaling(ops_test: OpsTest, continuous_writes) client.close() # verify that the no writes were skipped - total_expected_writes = await stop_continous_writes(ops_test) - actual_writes = await count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes +@pytest.mark.skip("skip") async def test_kill_db_process(ops_test, continuous_writes): # locate primary unit app_name = await get_app_name(ops_test) ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] - primary = await replica_set_primary(ip_addresses, ops_test) + primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) - await kill_unit_process(ops_test, primary.name, kill_code="SIGKILL") + await kill_unit_process(ops_test, primary.name, kill_code="SIGKILL", app_name=app_name) # verify new writes are continuing by counting the number of writes before and after a 5 second # wait - writes = await count_writes(ops_test) + writes = await count_writes(ops_test, app_name=app_name) time.sleep(5) - more_writes = await count_writes(ops_test) + more_writes = await count_writes(ops_test, app_name=app_name) assert more_writes > writes, "writes not continuing to DB" # sleep for twice the median election time time.sleep(MEDIAN_REELECTION_TIME * 2) # verify that db service got restarted and is ready - assert await mongod_ready(ops_test, primary.public_address) + assert await mongod_ready(ops_test, primary.public_address, app_name=app_name) # verify that a new primary gets elected (ie old primary is secondary) - new_primary = await replica_set_primary(ip_addresses, ops_test) + new_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) assert new_primary.name != primary.name # verify that no writes to the db were missed - total_expected_writes = await stop_continous_writes(ops_test) - actual_writes = await count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes, "writes to the db were missed." # verify that old primary is up to date. assert await secondary_up_to_date( - ops_test, primary.public_address, total_expected_writes["number"] + ops_test, primary.public_address, total_expected_writes["number"], app_name=app_name ), "secondary not up to date with the cluster after restarting." async def test_freeze_db_process(ops_test, continuous_writes): # locate primary unit + logger.error(">>>>>>>>>>>>>>>>>>> test_freeze_db_process 1") app_name = await get_app_name(ops_test) password = await get_password_ha(ops_test, app_name) ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] - primary = await replica_set_primary(ip_addresses, ops_test) - await kill_unit_process(ops_test, primary.name, kill_code="SIGSTOP") + primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) + logger.error(">>>>>>>>>>>>>>>>>>> test_freeze_db_process 2") + await kill_unit_process(ops_test, primary.name, kill_code="SIGSTOP", app_name=app_name) # sleep for twice the median election time time.sleep(MEDIAN_REELECTION_TIME * 2) # verify that a new primary gets elected - new_primary = await replica_set_primary(ip_addresses, ops_test) + new_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) assert new_primary.name != primary.name + logger.error(">>>>>>>>>>>>>>>>>>> test_freeze_db_process 3") # verify new writes are continuing by counting the number of writes before and after a 5 second # wait - writes = await count_writes(ops_test) + writes = await count_writes(ops_test, app_name=app_name) time.sleep(5) - more_writes = await count_writes(ops_test) - + more_writes = await count_writes(ops_test, app_name=app_name) + logger.error(">>>>>>>>>>>>>>>>>>> test_freeze_db_process 4") # un-freeze the old primary - await kill_unit_process(ops_test, primary.name, kill_code="SIGCONT") + await kill_unit_process(ops_test, primary.name, kill_code="SIGCONT", app_name=app_name) # check this after un-freezing the old primary so that if this check fails we still "turned # back on" the mongod process assert more_writes > writes, "writes not continuing to DB" # verify that db service got restarted and is ready - assert await mongod_ready(ops_test, primary.public_address) + assert await mongod_ready(ops_test, primary.public_address, app_name=app_name) # verify all units are running under the same replset - member_ips = await fetch_replica_set_members(ip_addresses, ops_test) + member_ips = await fetch_replica_set_members(ip_addresses, ops_test, app_name=app_name) assert set(member_ips) == set(ip_addresses), "all members not running under the same replset" - + logger.error(">>>>>>>>>>>>>>>>>>> test_freeze_db_process 5") # verify there is only one primary after un-freezing old primary assert ( - await count_primaries(ops_test, password) == 1 + await count_primaries_ha(ops_test, password, app_name=app_name) == 1 ), "there are more than one primary in the replica set." # verify that the old primary does not "reclaim" primary status after un-freezing old primary - new_primary = await replica_set_primary(ip_addresses, ops_test) + new_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) + logger.error(">>>>>>>>>>>>>>>>>>> test_freeze_db_process 6") assert new_primary.name != primary.name, "un-frozen primary should be secondary." # verify that no writes were missed. - total_expected_writes = await stop_continous_writes(ops_test) - actual_writes = await count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert actual_writes == total_expected_writes["number"], "db writes missing." - + logger.error(">>>>>>>>>>>>>>>>>>> test_freeze_db_process 7") # verify that old primary is up to date. assert await secondary_up_to_date( - ops_test, primary.public_address, actual_writes + ops_test, primary.public_address, actual_writes, app_name=app_name ), "secondary not up to date with the cluster after restarting." + logger.error(">>>>>>>>>>>>>>>>>>> test_freeze_db_process 8") +@pytest.mark.skip("skip") async def test_restart_db_process(ops_test, continuous_writes, change_logging): # locate primary unit app_name = await get_app_name(ops_test) ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] - old_primary = await replica_set_primary(ip_addresses, ops_test) + old_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) # send SIGTERM, we expect `systemd` to restart the process sig_term_time = time.time() - await kill_unit_process(ops_test, old_primary.name, kill_code="SIGTERM") + await kill_unit_process(ops_test, old_primary.name, kill_code="SIGTERM", app_name=app_name) # verify new writes are continuing by counting the number of writes before and after a 5 second # wait - writes = await count_writes(ops_test) + writes = await count_writes(ops_test, app_name=app_name) time.sleep(5) - more_writes = await count_writes(ops_test) + more_writes = await count_writes(ops_test, app_name=app_name) assert more_writes > writes, "writes not continuing to DB" # verify that db service got restarted and is ready - assert await mongod_ready(ops_test, old_primary.public_address) + assert await mongod_ready(ops_test, old_primary.public_address, app_name=app_name) # verify that a new primary gets elected (ie old primary is secondary) - new_primary = await replica_set_primary(ip_addresses, ops_test) + new_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) assert new_primary.name != old_primary.name # verify that a stepdown was performed on restart. SIGTERM should send a graceful restart and @@ -470,22 +486,23 @@ async def test_restart_db_process(ops_test, continuous_writes, change_logging): for attempt in Retrying(stop=stop_after_delay(30), wait=wait_fixed(3)): with attempt: assert await db_step_down( - ops_test, old_primary.name, sig_term_time + ops_test, old_primary.name, sig_term_time, app_name=app_name ), "old primary departed without stepping down." except RetryError: False, "old primary departed without stepping down." # verify that no writes were missed - total_expected_writes = await stop_continous_writes(ops_test) - actual_writes = await count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes # verify that old primary is up to date. assert await secondary_up_to_date( - ops_test, old_primary.public_address, total_expected_writes["number"] + ops_test, old_primary.public_address, total_expected_writes["number"], app_name=app_name ), "secondary not up to date with the cluster after restarting." +@pytest.mark.skip("skip") async def test_full_cluster_crash(ops_test: OpsTest, continuous_writes, reset_restart_delay): app_name = await get_app_name(ops_test) @@ -497,7 +514,7 @@ async def test_full_cluster_crash(ops_test: OpsTest, continuous_writes, reset_re # kill all units "simultaneously" await asyncio.gather( *[ - kill_unit_process(ops_test, unit.name, kill_code="SIGKILL") + kill_unit_process(ops_test, unit.name, kill_code="SIGKILL", app_name=app_name) for unit in ops_test.model.applications[app_name].units ] ) @@ -505,7 +522,9 @@ async def test_full_cluster_crash(ops_test: OpsTest, continuous_writes, reset_re # This test serves to verify behavior when all replicas are down at the same time that when # they come back online they operate as expected. This check verifies that we meet the criteria # of all replicas being down at the same time. - assert await all_db_processes_down(ops_test), "Not all units down at the same time." + assert await all_db_processes_down( + ops_test, app_name=app_name + ), "Not all units down at the same time." # sleep for twice the median election time and the restart delay time.sleep(MEDIAN_REELECTION_TIME * 2 + RESTART_DELAY) @@ -513,29 +532,30 @@ async def test_full_cluster_crash(ops_test: OpsTest, continuous_writes, reset_re # verify all units are up and running for unit in ops_test.model.applications[app_name].units: assert await mongod_ready( - ops_test, unit.public_address + ops_test, unit.public_address, app_name=app_name ), f"unit {unit.name} not restarted after cluster crash." # verify new writes are continuing by counting the number of writes before and after a 5 second # wait - writes = await count_writes(ops_test) + writes = await count_writes(ops_test, app_name=app_name) time.sleep(5) - more_writes = await count_writes(ops_test) + more_writes = await count_writes(ops_test, app_name=app_name) assert more_writes > writes, "writes not continuing to DB" # verify presence of primary, replica set member configuration, and number of primaries - await verify_replica_set_configuration(ops_test) + await verify_replica_set_configuration(ops_test, app_name=app_name) # verify that no writes to the db were missed - total_expected_writes = await stop_continous_writes(ops_test) - actual_writes = await count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) # verify that no writes were missed. assert actual_writes == total_expected_writes["number"], "db writes missing." +@pytest.mark.skip("skip") async def test_full_cluster_restart(ops_test: OpsTest, continuous_writes, reset_restart_delay): - app_name = await get_app_name(ops_test) + app_name = await get_app_name(ops_test, app_name=app_name) # update all units to have a new RESTART_DELAY, Modifying the Restart delay to 3 minutes # should ensure enough time for all replicas to be down at the same time. @@ -545,7 +565,7 @@ async def test_full_cluster_restart(ops_test: OpsTest, continuous_writes, reset_ # kill all units "simultaneously" await asyncio.gather( *[ - kill_unit_process(ops_test, unit.name, kill_code="SIGTERM") + kill_unit_process(ops_test, unit.name, kill_code="SIGTERM", app_name=app_name) for unit in ops_test.model.applications[app_name].units ] ) @@ -553,7 +573,9 @@ async def test_full_cluster_restart(ops_test: OpsTest, continuous_writes, reset_ # This test serves to verify behavior when all replicas are down at the same time that when # they come back online they operate as expected. This check verifies that we meet the criteria # of all replicas being down at the same time. - assert await all_db_processes_down(ops_test), "Not all units down at the same time." + assert await all_db_processes_down( + ops_test, app_name=app_name + ), "Not all units down at the same time." # sleep for twice the median election time and the restart delay time.sleep(MEDIAN_REELECTION_TIME * 2 + RESTART_DELAY) @@ -561,40 +583,40 @@ async def test_full_cluster_restart(ops_test: OpsTest, continuous_writes, reset_ # verify all units are up and running for unit in ops_test.model.applications[app_name].units: assert await mongod_ready( - ops_test, unit.public_address + ops_test, unit.public_address, app_name=app_name ), f"unit {unit.name} not restarted after cluster crash." # verify new writes are continuing by counting the number of writes before and after a 5 second # wait - writes = await count_writes(ops_test) + writes = await count_writes(ops_test, app_name=app_name) time.sleep(5) - more_writes = await count_writes(ops_test) + more_writes = await count_writes(ops_test, app_name=app_name) assert more_writes > writes, "writes not continuing to DB" # verify presence of primary, replica set member configuration, and number of primaries - await verify_replica_set_configuration(ops_test) + await verify_replica_set_configuration(ops_test, app_name=app_name) # verify that no writes to the db were missed - total_expected_writes = await stop_continous_writes(ops_test) - actual_writes = await count_writes(ops_test) + total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name) + actual_writes = await count_writes(ops_test, app_name=app_name) assert total_expected_writes["number"] == actual_writes, "writes to the db were missed." +@pytest.mark.skip("skip") async def test_network_cut(ops_test, continuous_writes): # locate primary unit app_name = await get_app_name(ops_test) ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units] - primary = await replica_set_primary(ip_addresses, ops_test) + primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name) all_units = ops_test.model.applications[app_name].units model_name = ops_test.model.info.name - primary_hostname = await unit_hostname(ops_test, primary.name) - primary_unit_ip = await get_unit_ip(ops_test, primary.name) + primary_hostname = await unit_hostname(ops_test, primary.name, app_name=app_name) + primary_unit_ip = await get_unit_ip(ops_test, primary.name, app_name=app_name) # before cutting network verify that connection is possible assert await mongod_ready( - ops_test, - primary.public_address, + ops_test, primary.public_address, app_name=app_name ), f"Connection to host {primary.public_address} is not possible" cut_network_from_unit(primary_hostname) @@ -617,18 +639,22 @@ async def test_network_cut(ops_test, continuous_writes): # verify new writes are continuing by counting the number of writes before and after a 5 second # wait - writes = await count_writes(ops_test, down_unit=primary.name) + writes = await count_writes(ops_test, down_unit=primary.name, app_name=app_name) time.sleep(5) - more_writes = await count_writes(ops_test, down_unit=primary.name) + more_writes = await count_writes(ops_test, down_unit=primary.name, app_name=app_name) assert more_writes > writes, "writes not continuing to DB" # verify that a new primary got elected - new_primary = await replica_set_primary(ip_addresses, ops_test, down_unit=primary.name) + new_primary = await replica_set_primary( + ip_addresses, ops_test, down_unit=primary.name, app_name=app_name + ) assert new_primary.name != primary.name # verify that no writes to the db were missed - total_expected_writes = await stop_continous_writes(ops_test, down_unit=primary.name) - actual_writes = await count_writes(ops_test, down_unit=primary.name) + total_expected_writes = await stop_continous_writes( + ops_test, down_unit=primary.name, app_name=app_name + ) + actual_writes = await count_writes(ops_test, down_unit=primary.name, app_name=app_name) assert total_expected_writes["number"] == actual_writes, "writes to the db were missed." # restore network connectivity to old primary @@ -644,19 +670,19 @@ async def test_network_cut(ops_test, continuous_writes): # verify we have connection to the old primary new_ip = instance_ip(model_name, primary_hostname) assert await mongod_ready( - ops_test, - new_ip, + ops_test, new_ip, app_name=app_name ), f"Connection to host {new_ip} is not possible" # verify presence of primary, replica set member configuration, and number of primaries - await verify_replica_set_configuration(ops_test) + await verify_replica_set_configuration(ops_test, app_name=app_name) # verify that old primary is up to date. assert await secondary_up_to_date( - ops_test, new_ip, total_expected_writes["number"] + ops_test, new_ip, total_expected_writes["number"], app_name=app_name ), "secondary not up to date with the cluster after restarting." +@pytest.mark.skip("skip") @pytest.mark.abort_on_fail @pytest.mark.unstable async def test_scale_up_down(ops_test: OpsTest, continuous_writes): @@ -664,9 +690,10 @@ async def test_scale_up_down(ops_test: OpsTest, continuous_writes): scales = [3, -3, 4, -4, 5, -5, 6, -6, 7, -7] for count in scales: await scale_and_verify(ops_test, count=count) - await _verify_writes(ops_test) + await verify_writes(ops_test) +@pytest.mark.skip("skip") @pytest.mark.abort_on_fail @pytest.mark.unstable async def test_scale_up_down_removing_leader(ops_test: OpsTest, continuous_writes): @@ -674,7 +701,7 @@ async def test_scale_up_down_removing_leader(ops_test: OpsTest, continuous_write scales = [3, -3, 4, -4, 5, -5, 6, -6, 7, -7] for count in scales: await scale_and_verify(ops_test, count=count, remove_leader=True) - await _verify_writes(ops_test) + await verify_writes(ops_test) # TODO put this into a separate file