Skip to content

Commit

Permalink
HA tests WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitry-ratushnyy committed Jan 6, 2024
1 parent 2eb0f55 commit 2484a3a
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 132 deletions.
78 changes: 40 additions & 38 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class ProcessRunningError(Exception):
"""Raised when a process is running when it is not expected to be."""


def replica_set_client(replica_ips: List[str], password: str, app_name=APP_NAME) -> MongoClient:
def replica_set_client(replica_ips: List[str], password: str, app_name=None) -> MongoClient:
"""Generates the replica set URI for multiple IP addresses.
Args:
Expand All @@ -67,7 +67,7 @@ def replica_set_client(replica_ips: List[str], password: str, app_name=APP_NAME)
return MongoClient(replica_set_uri)


async def fetch_replica_set_members(replica_ips: List[str], ops_test: OpsTest):
async def fetch_replica_set_members(replica_ips: List[str], ops_test: OpsTest, app_name=None):
"""Fetches the IPs listed as replica set members in the MongoDB replica set configuration.
Args:
Expand All @@ -76,7 +76,7 @@ async def fetch_replica_set_members(replica_ips: List[str], ops_test: OpsTest):
app: name of application which has the cluster.
"""
# connect to replica set uri
app_name = await get_app_name(ops_test)
app_name = app_name or await get_app_name(ops_test)
password = await get_password(ops_test, app_name)
client = replica_set_client(replica_ips, password, app_name)

Expand Down Expand Up @@ -150,10 +150,11 @@ async def fetch_primary(
return primary


async def count_primaries(ops_test: OpsTest) -> int:
# TODO remove duplication with common helpers
async def count_primaries(ops_test: OpsTest, app_name=None) -> int:
"""Returns the number of primaries in a replica set."""
# connect to MongoDB client
app_name = await get_app_name(ops_test)
app_name = app_name or await get_app_name(ops_test)
password = await get_password(ops_test, app_name)
replica_set_hosts = [
unit.public_address for unit in ops_test.model.applications[app_name].units
Expand Down Expand Up @@ -224,10 +225,10 @@ async def retrieve_entries(ops_test, app_name, db_name, collection_name, query_f
return cluster_entries


async def find_unit(ops_test: OpsTest, leader: bool) -> ops.model.Unit:
async def find_unit(ops_test: OpsTest, leader: bool, app_name=None) -> ops.model.Unit:
"""Helper function identifies the a unit, based on need for leader or non-leader."""
ret_unit = None
app_name = await get_app_name(ops_test)
app_name = app_name or await get_app_name(ops_test)
for unit in ops_test.model.applications[app_name].units:
if await unit.is_leader_from_status() == leader:
ret_unit = unit
Expand Down Expand Up @@ -282,7 +283,7 @@ async def start_continous_writes(ops_test: OpsTest, starting_number: int) -> Non
)


async def stop_continous_writes(ops_test: OpsTest, down_unit=None) -> int:
async def stop_continous_writes(ops_test: OpsTest, down_unit=None, app_name=None) -> int:
"""Stops continuous writes to MongoDB and returns the last written value.
In the future this should be put in a dummy charm.
Expand All @@ -293,7 +294,7 @@ async def stop_continous_writes(ops_test: OpsTest, down_unit=None) -> int:
# wait for process to be killed
proc.communicate()

app_name = await get_app_name(ops_test)
app_name = app_name or await get_app_name(ops_test)
password = await get_password(ops_test, app_name, down_unit)
hosts = [unit.public_address for unit in ops_test.model.applications[app_name].units]
hosts = ",".join(hosts)
Expand All @@ -309,9 +310,9 @@ async def stop_continous_writes(ops_test: OpsTest, down_unit=None) -> int:
return last_written_value


async def count_writes(ops_test: OpsTest, down_unit=None) -> int:
async def count_writes(ops_test: OpsTest, down_unit=None, app_name=None) -> int:
"""New versions of pymongo no longer support the count operation, instead find is used."""
app_name = await get_app_name(ops_test)
app_name = app_name or await get_app_name(ops_test)
password = await get_password(ops_test, app_name, down_unit)
hosts = [unit.public_address for unit in ops_test.model.applications[app_name].units]
hosts = ",".join(hosts)
Expand All @@ -325,12 +326,12 @@ async def count_writes(ops_test: OpsTest, down_unit=None) -> int:
return count


async def secondary_up_to_date(ops_test: OpsTest, unit_ip, expected_writes) -> bool:
async def secondary_up_to_date(ops_test: OpsTest, unit_ip, expected_writes, app_name=None) -> bool:
"""Checks if secondary is up to date with the cluster.
Retries over the period of one minute to give secondary adequate time to copy over data.
"""
app_name = await get_app_name(ops_test)
app_name = app_name or await get_app_name(ops_test)
password = await get_password(ops_test, app_name)
connection_string = f"mongodb://operator:{password}@{unit_ip}:{PORT}/admin?"
client = MongoClient(connection_string, directConnection=True)
Expand Down Expand Up @@ -398,29 +399,29 @@ def storage_id(ops_test, unit_name):
return line.split()[1]


async def add_unit_with_storage(ops_test, app, storage):
async def add_unit_with_storage(ops_test, app_name, storage):
"""Adds unit with storage.
Note: this function exists as a temporary solution until this issue is resolved:
https://github.com/juju/python-libjuju/issues/695
"""
expected_units = len(ops_test.model.applications[app].units) + 1
prev_units = [unit.name for unit in ops_test.model.applications[app].units]
expected_units = len(ops_test.model.applications[app_name].units) + 1
prev_units = [unit.name for unit in ops_test.model.applications[app_name].units]
model_name = ops_test.model.info.name
add_unit_cmd = f"add-unit {app} --model={model_name} --attach-storage={storage}".split()
add_unit_cmd = f"add-unit {app_name} --model={model_name} --attach-storage={storage}".split()
await ops_test.juju(*add_unit_cmd)
await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1000)
await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000)
assert (
len(ops_test.model.applications[app].units) == expected_units
len(ops_test.model.applications[app_name].units) == expected_units
), "New unit not added to model"

# verify storage attached
curr_units = [unit.name for unit in ops_test.model.applications[app].units]
curr_units = [unit.name for unit in ops_test.model.applications[app_name].units]
new_unit = list(set(curr_units) - set(prev_units))[0]
assert storage_id(ops_test, new_unit) == storage, "unit added with incorrect storage"

# return a reference to newly added unit
for unit in ops_test.model.applications[app].units:
for unit in ops_test.model.applications[app_name].units:
if unit.name == new_unit:
return unit

Expand Down Expand Up @@ -458,9 +459,9 @@ async def reused_storage(ops_test: OpsTest, unit_name, removal_time) -> bool:
return False


async def insert_focal_to_cluster(ops_test: OpsTest) -> None:
async def insert_focal_to_cluster(ops_test: OpsTest, app_name=None) -> None:
"""Inserts the Focal Fossa data into the MongoDB cluster via primary replica."""
app_name = await get_app_name(ops_test)
app_name = app_name or await get_app_name(ops_test)
ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units]
primary = (await replica_set_primary(ip_addresses, ops_test)).public_address
password = await get_password(ops_test, app_name)
Expand All @@ -471,10 +472,10 @@ async def insert_focal_to_cluster(ops_test: OpsTest) -> None:
client.close()


async def kill_unit_process(ops_test: OpsTest, unit_name: str, kill_code: str):
async def kill_unit_process(ops_test: OpsTest, unit_name: str, kill_code: str, app_name=None):
"""Kills the DB process on the unit according to the provided kill code."""
# killing the only replica can be disastrous
app_name = await get_app_name(ops_test)
app_name = app_name or await get_app_name(ops_test)
if len(ops_test.model.applications[app_name].units) < 2:
await ops_test.model.applications[app_name].add_unit(count=1)
await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000)
Expand All @@ -487,9 +488,9 @@ async def kill_unit_process(ops_test: OpsTest, unit_name: str, kill_code: str):
)


async def mongod_ready(ops_test, unit_ip) -> bool:
async def mongod_ready(ops_test, unit_ip, app_name=None) -> bool:
"""Verifies replica is running and available."""
app_name = await get_app_name(ops_test)
app_name = app_name or await get_app_name(ops_test)
password = await get_password(ops_test, app_name)
client = MongoClient(unit_uri(unit_ip, password, app_name), directConnection=True)
try:
Expand All @@ -505,9 +506,9 @@ async def mongod_ready(ops_test, unit_ip) -> bool:
return True


async def db_step_down(ops_test: OpsTest, old_primary_unit: str, sigterm_time: int):
async def db_step_down(ops_test: OpsTest, old_primary_unit: str, sigterm_time: int, app_name=None):
# loop through all units that aren't the old primary
app_name = await get_app_name(ops_test)
app_name = app_name or await get_app_name(ops_test)
for unit in ops_test.model.applications[app_name].units:
# verify log file exists on this machine
search_file = f"exec --unit {unit.name} ls {MONGODB_LOG_PATH}"
Expand Down Expand Up @@ -540,9 +541,9 @@ async def db_step_down(ops_test: OpsTest, old_primary_unit: str, sigterm_time: i
return False


async def all_db_processes_down(ops_test: OpsTest) -> bool:
async def all_db_processes_down(ops_test: OpsTest, app_name=None) -> bool:
"""Verifies that all units of the charm do not have the DB process running."""
app_name = await get_app_name(ops_test)
app_name = app_name or await get_app_name(ops_test)

try:
for attempt in Retrying(stop=stop_after_attempt(60), wait=wait_fixed(3)):
Expand Down Expand Up @@ -655,9 +656,9 @@ async def start_mongod(ops_test: OpsTest, unit) -> None:


@retry(stop=stop_after_attempt(8), wait=wait_fixed(15))
async def verify_replica_set_configuration(ops_test: OpsTest) -> None:
async def verify_replica_set_configuration(ops_test: OpsTest, app_name=None) -> None:
"""Verifies presence of primary, replica set members, and number of primaries."""
app_name = await get_app_name(ops_test)
app_name = app_name or await get_app_name(ops_test)
# `get_unit_ip` is used instead of `.public_address` because of a bug in python-libjuju that
# incorrectly reports the IP addresses after the network is restored this is reported as a
# bug here: https://github.com/juju/python-libjuju/issues/738 . Once this bug is resolved use
Expand All @@ -668,16 +669,16 @@ async def verify_replica_set_configuration(ops_test: OpsTest) -> None:
]

# verify presence of primary
new_primary = await replica_set_primary(ip_addresses, ops_test)
new_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name)
assert new_primary.name, "primary not elected."

# verify all units are running under the same replset
member_ips = await fetch_replica_set_members(ip_addresses, ops_test)
member_ips = await fetch_replica_set_members(ip_addresses, ops_test, app_name=app_name)
assert set(member_ips) == set(ip_addresses), "all members not running under the same replset"

# verify there is only one primary
assert (
await count_primaries(ops_test) == 1
await count_primaries(ops_test, app_name=app_name) == 1
), "there are more than one primary in the replica set."


Expand Down Expand Up @@ -791,8 +792,9 @@ async def scale_and_verify(ops_test: OpsTest, count: int, remove_leader: bool =
assert primary is not None, "Replica set has no primary"


async def _verify_writes(ops_test: OpsTest):
async def verify_writes(ops_test: OpsTest, app_name=None):
# verify that no writes to the db were missed
app_name = app_name or await get_app_name(ops_test)
total_expected_writes = await stop_continous_writes(ops_test)
actual_writes = await count_writes(ops_test)
actual_writes = await count_writes(ops_test, app_name)
assert total_expected_writes["number"] == actual_writes, "writes to the db were missed."
Loading

0 comments on commit 2484a3a

Please sign in to comment.