Skip to content

Commit

Permalink
Fixed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitry-ratushnyy committed Jan 7, 2024
1 parent 2484a3a commit 4965837
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 45 deletions.
6 changes: 3 additions & 3 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def fetch_replica_set_members(replica_ips: List[str], ops_test: OpsTest, a
Args:
replica_ips: list of ips hosting the replica set.
ops_test: reference to deployment.
app: name of application which has the cluster.
app_name: name of application which has the cluster.
"""
# connect to replica set uri
app_name = app_name or await get_app_name(ops_test)
Expand Down Expand Up @@ -151,11 +151,11 @@ async def fetch_primary(


# TODO remove duplication with common helpers
async def count_primaries(ops_test: OpsTest, app_name=None) -> int:
async def count_primaries(ops_test: OpsTest, password: str = None, app_name: str = None) -> int:
"""Returns the number of primaries in a replica set."""
# connect to MongoDB client
app_name = app_name or await get_app_name(ops_test)
password = await get_password(ops_test, app_name)
password = password or await get_password(ops_test, app_name)
replica_set_hosts = [
unit.public_address for unit in ops_test.model.applications[app_name].units
]
Expand Down
41 changes: 9 additions & 32 deletions tests/integration/ha_tests/test_ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,14 @@
add_unit_with_storage,
all_db_processes_down,
clear_db_writes,
)
from .helpers import (
count_primaries as count_primaries_ha, # TODO remove this duplication
)
from .helpers import (
count_primaries,
count_writes,
cut_network_from_unit,
db_step_down,
fetch_replica_set_members,
find_unit,
get_controller_machine,
)
from .helpers import get_password as get_password_ha # TODO remove this duplication
from .helpers import (
get_password,
insert_focal_to_cluster,
is_machine_reachable_from,
kill_unit_process,
Expand Down Expand Up @@ -236,15 +230,14 @@ async def test_scale_down_capablities(ops_test: OpsTest, continuous_writes) -> N
assert total_expected_writes["number"] == actual_writes


@pytest.mark.skip("skip")
async def test_replication_across_members(ops_test: OpsTest, continuous_writes) -> None:
"""Check consistency, ie write to primary, read data from secondaries."""
# first find primary, write to primary, then read from each unit
await insert_focal_to_cluster(ops_test)
app_name = await get_app_name(ops_test)
ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units]
primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name)
password = await get_password_ha(ops_test, app_name)
password = await get_password(ops_test, app_name)

secondaries = set(ip_addresses) - set([primary.public_address])
for secondary in secondaries:
Expand All @@ -263,7 +256,6 @@ async def test_replication_across_members(ops_test: OpsTest, continuous_writes)
assert total_expected_writes["number"] == actual_writes


@pytest.mark.skip("skip")
async def test_unique_cluster_dbs(ops_test: OpsTest, continuous_writes) -> None:
"""Verify unique clusters do not share DBs."""
# first find primary, write to primary,
Expand All @@ -280,7 +272,7 @@ async def test_unique_cluster_dbs(ops_test: OpsTest, continuous_writes) -> None:
unit.public_address
for unit in ops_test.model.applications[ANOTHER_DATABASE_APP_NAME].units
]
password = await get_password_ha(ops_test, app_name=ANOTHER_DATABASE_APP_NAME)
password = await get_password(ops_test, app_name=ANOTHER_DATABASE_APP_NAME)
client = replica_set_client(ip_addresses, password, app_name=ANOTHER_DATABASE_APP_NAME)
db = client["new-db"]
test_collection = db["test_ubuntu_collection"]
Expand Down Expand Up @@ -312,7 +304,6 @@ async def test_unique_cluster_dbs(ops_test: OpsTest, continuous_writes) -> None:
assert total_expected_writes["number"] == actual_writes


@pytest.mark.skip("skip")
async def test_replication_member_scaling(ops_test: OpsTest, continuous_writes) -> None:
"""Verify newly added and newly removed members properly replica data.
Expand All @@ -335,7 +326,7 @@ async def test_replication_member_scaling(ops_test: OpsTest, continuous_writes)
unit.public_address for unit in ops_test.model.applications[app_name].units
]
new_member_ip = list(set(new_ip_addresses) - set(original_ip_addresses))[0]
password = await get_password_ha(ops_test, app_name)
password = await get_password(ops_test, app_name)
client = MongoClient(unit_uri(new_member_ip, password, app_name), directConnection=True)

# check for replicated data while retrying to give time for replica to copy over data.
Expand All @@ -358,7 +349,6 @@ async def test_replication_member_scaling(ops_test: OpsTest, continuous_writes)
assert total_expected_writes["number"] == actual_writes


@pytest.mark.skip("skip")
async def test_kill_db_process(ops_test, continuous_writes):
# locate primary unit
app_name = await get_app_name(ops_test)
Expand Down Expand Up @@ -397,12 +387,10 @@ async def test_kill_db_process(ops_test, continuous_writes):

async def test_freeze_db_process(ops_test, continuous_writes):
# locate primary unit
logger.error(">>>>>>>>>>>>>>>>>>> test_freeze_db_process 1")
app_name = await get_app_name(ops_test)
password = await get_password_ha(ops_test, app_name)
password = await get_password(ops_test, app_name)
ip_addresses = [unit.public_address for unit in ops_test.model.applications[app_name].units]
primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name)
logger.error(">>>>>>>>>>>>>>>>>>> test_freeze_db_process 2")
await kill_unit_process(ops_test, primary.name, kill_code="SIGSTOP", app_name=app_name)

# sleep for twice the median election time
Expand All @@ -412,13 +400,11 @@ async def test_freeze_db_process(ops_test, continuous_writes):
new_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name)
assert new_primary.name != primary.name

logger.error(">>>>>>>>>>>>>>>>>>> test_freeze_db_process 3")
# verify new writes are continuing by counting the number of writes before and after a 5 second
# wait
writes = await count_writes(ops_test, app_name=app_name)
time.sleep(5)
more_writes = await count_writes(ops_test, app_name=app_name)
logger.error(">>>>>>>>>>>>>>>>>>> test_freeze_db_process 4")
# un-freeze the old primary
await kill_unit_process(ops_test, primary.name, kill_code="SIGCONT", app_name=app_name)

Expand All @@ -432,30 +418,26 @@ async def test_freeze_db_process(ops_test, continuous_writes):
# verify all units are running under the same replset
member_ips = await fetch_replica_set_members(ip_addresses, ops_test, app_name=app_name)
assert set(member_ips) == set(ip_addresses), "all members not running under the same replset"
logger.error(">>>>>>>>>>>>>>>>>>> test_freeze_db_process 5")

# verify there is only one primary after un-freezing old primary
assert (
await count_primaries_ha(ops_test, password, app_name=app_name) == 1
await count_primaries(ops_test, password=password, app_name=app_name) == 1
), "there are more than one primary in the replica set."

# verify that the old primary does not "reclaim" primary status after un-freezing old primary
new_primary = await replica_set_primary(ip_addresses, ops_test, app_name=app_name)
logger.error(">>>>>>>>>>>>>>>>>>> test_freeze_db_process 6")
assert new_primary.name != primary.name, "un-frozen primary should be secondary."

# verify that no writes were missed.
total_expected_writes = await stop_continous_writes(ops_test, app_name=app_name)
actual_writes = await count_writes(ops_test, app_name=app_name)
assert actual_writes == total_expected_writes["number"], "db writes missing."
logger.error(">>>>>>>>>>>>>>>>>>> test_freeze_db_process 7")
# verify that old primary is up to date.
assert await secondary_up_to_date(
ops_test, primary.public_address, actual_writes, app_name=app_name
), "secondary not up to date with the cluster after restarting."
logger.error(">>>>>>>>>>>>>>>>>>> test_freeze_db_process 8")


@pytest.mark.skip("skip")
async def test_restart_db_process(ops_test, continuous_writes, change_logging):
# locate primary unit
app_name = await get_app_name(ops_test)
Expand Down Expand Up @@ -502,7 +484,6 @@ async def test_restart_db_process(ops_test, continuous_writes, change_logging):
), "secondary not up to date with the cluster after restarting."


@pytest.mark.skip("skip")
async def test_full_cluster_crash(ops_test: OpsTest, continuous_writes, reset_restart_delay):
app_name = await get_app_name(ops_test)

Expand Down Expand Up @@ -553,9 +534,8 @@ async def test_full_cluster_crash(ops_test: OpsTest, continuous_writes, reset_re
assert actual_writes == total_expected_writes["number"], "db writes missing."


@pytest.mark.skip("skip")
async def test_full_cluster_restart(ops_test: OpsTest, continuous_writes, reset_restart_delay):
app_name = await get_app_name(ops_test, app_name=app_name)
app_name = await get_app_name(ops_test)

# update all units to have a new RESTART_DELAY, Modifying the Restart delay to 3 minutes
# should ensure enough time for all replicas to be down at the same time.
Expand Down Expand Up @@ -602,7 +582,6 @@ async def test_full_cluster_restart(ops_test: OpsTest, continuous_writes, reset_
assert total_expected_writes["number"] == actual_writes, "writes to the db were missed."


@pytest.mark.skip("skip")
async def test_network_cut(ops_test, continuous_writes):
# locate primary unit
app_name = await get_app_name(ops_test)
Expand Down Expand Up @@ -682,7 +661,6 @@ async def test_network_cut(ops_test, continuous_writes):
), "secondary not up to date with the cluster after restarting."


@pytest.mark.skip("skip")
@pytest.mark.abort_on_fail
@pytest.mark.unstable
async def test_scale_up_down(ops_test: OpsTest, continuous_writes):
Expand All @@ -693,7 +671,6 @@ async def test_scale_up_down(ops_test: OpsTest, continuous_writes):
await verify_writes(ops_test)


@pytest.mark.skip("skip")
@pytest.mark.abort_on_fail
@pytest.mark.unstable
async def test_scale_up_down_removing_leader(ops_test: OpsTest, continuous_writes):
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ async def get_password(ops_test: OpsTest, username="operator", app_name=None) ->
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=2, max=30),
)
async def count_primaries(ops_test: OpsTest, password: str) -> int:
async def count_primaries(ops_test: OpsTest, password: str, app_name=None) -> int:
"""Counts the number of primaries in a replica set.
Will retry counting when the number of primaries is 0 at most 5 times.
"""
app_name = await get_app_name(ops_test)
app_name = app_name or await get_app_name(ops_test)
number_of_primaries = 0
for unit_id in UNIT_IDS:
# get unit
Expand All @@ -84,9 +84,9 @@ async def count_primaries(ops_test: OpsTest, password: str) -> int:
return number_of_primaries


async def find_unit(ops_test: OpsTest, leader: bool) -> ops.model.Unit:
async def find_unit(ops_test: OpsTest, leader: bool, app_name=None) -> ops.model.Unit:
"""Helper function identifies the a unit, based on need for leader or non-leader."""
app_name = await get_app_name(ops_test)
app_name = app_name or await get_app_name(ops_test)
ret_unit = None
for unit in ops_test.model.applications[app_name].units:
if await unit.is_leader_from_status() == leader:
Expand Down
13 changes: 7 additions & 6 deletions tests/integration/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,17 @@ async def test_unit_is_running_as_replica_set(ops_test: OpsTest, unit_id: int) -

async def test_leader_is_primary_on_deployment(ops_test: OpsTest) -> None:
"""Tests that right after deployment that the primary unit is the leader."""
app_name = await get_app_name(ops_test)
# grab leader unit
leader_unit = await find_unit(ops_test, leader=True)
leader_unit = await find_unit(ops_test, leader=True, app_name=app_name)

# verify that we have a leader
assert leader_unit is not None, "No unit is leader"

# connect to mongod
password = await get_password(ops_test)
user_app_name = await get_app_name(ops_test)
password = await get_password(ops_test, app_name=app_name)
client = MongoClient(
unit_uri(leader_unit.public_address, password, user_app_name), directConnection=True
unit_uri(leader_unit.public_address, password, app_name), directConnection=True
)

# verify primary status
Expand All @@ -102,9 +102,10 @@ async def test_leader_is_primary_on_deployment(ops_test: OpsTest) -> None:

async def test_exactly_one_primary(ops_test: OpsTest) -> None:
"""Tests that there is exactly one primary in the deployed units."""
app_name = await get_app_name(ops_test)
try:
password = await get_password(ops_test)
number_of_primaries = await count_primaries(ops_test, password)
password = await get_password(ops_test, app_name=app_name)
number_of_primaries = await count_primaries(ops_test, password, app_name=app_name)
except RetryError:
number_of_primaries = 0

Expand Down

0 comments on commit 4965837

Please sign in to comment.