-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DPE-3684] Implement DA139 #663
Changes from all commits
e0a09ae
42899c1
5d972ac
7123659
e06c077
c0fece5
634a9bb
5088114
cc16d5f
ae5b560
da33c75
782979b
ebf1361
07fd969
5e18530
1e17ced
e224caf
2308e8c
a20cf72
55a7a7e
dd88198
74b4d48
ffd6d58
e459d05
5f3609f
df65bd0
c062fc0
86d14b9
9b3bcfd
720acf0
66088cb
a39bae9
5f6ad5e
b76412e
e1de0c4
f7cef49
7d4bb84
b3f44bc
ecb70d5
caf076f
84c6601
9c00278
d084026
7d2070a
adb8ba9
72ddcac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
import requests | ||
from charms.operator_libs_linux.v2 import snap | ||
from jinja2 import Template | ||
from ops import BlockedStatus | ||
from pysyncobj.utility import TcpUtility, UtilityException | ||
from tenacity import ( | ||
AttemptManager, | ||
|
@@ -746,15 +747,18 @@ def stop_patroni(self) -> bool: | |
logger.exception(error_message, exc_info=e) | ||
return False | ||
|
||
def switchover(self) -> None: | ||
def switchover(self, candidate: str | None = None) -> None: | ||
"""Trigger a switchover.""" | ||
# Try to trigger the switchover. | ||
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): | ||
with attempt: | ||
current_primary = self.get_primary() | ||
body = {"leader": current_primary} | ||
if candidate: | ||
body["candidate"] = candidate | ||
r = requests.post( | ||
f"{self._patroni_url}/switchover", | ||
json={"leader": current_primary}, | ||
json=body, | ||
verify=self.verify, | ||
auth=self._patroni_auth, | ||
timeout=PATRONI_TIMEOUT, | ||
|
@@ -774,6 +778,19 @@ def primary_changed(self, old_primary: str) -> bool: | |
primary = self.get_primary() | ||
return primary != old_primary | ||
|
||
def has_raft_quorum(self) -> bool: | ||
"""Check if raft cluster has quorum.""" | ||
# Get the status of the raft cluster. | ||
syncobj_util = TcpUtility(password=self.raft_password, timeout=3) | ||
|
||
raft_host = "127.0.0.1:2222" | ||
try: | ||
raft_status = syncobj_util.executeCommand(raft_host, ["status"]) | ||
except UtilityException: | ||
logger.warning("Has raft quorum: Cannot connect to raft cluster") | ||
return False | ||
return raft_status["has_quorum"] | ||
|
||
def remove_raft_data(self) -> None: | ||
"""Stops Patroni and removes the raft journals.""" | ||
logger.info("Stopping patroni") | ||
|
@@ -827,6 +844,21 @@ def reinitialise_raft_data(self) -> None: | |
raise RaftPostgresqlNotUpError() | ||
logger.info("Raft should be unstuck") | ||
|
||
def get_running_cluster_members(self) -> list[str]: | ||
"""List running patroni members.""" | ||
try: | ||
members = requests.get( | ||
f"{self._patroni_url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}", | ||
verify=self.verify, | ||
timeout=API_REQUEST_TIMEOUT, | ||
auth=self._patroni_auth, | ||
).json()["members"] | ||
return [ | ||
member["name"] for member in members if member["state"] in ("streaming", "running") | ||
] | ||
except Exception: | ||
return [] | ||
|
||
def remove_raft_member(self, member_ip: str) -> None: | ||
"""Remove a member from the raft cluster. | ||
|
||
|
@@ -860,18 +892,9 @@ def remove_raft_member(self, member_ip: str) -> None: | |
if not raft_status["has_quorum"] and ( | ||
not raft_status["leader"] or raft_status["leader"].host == member_ip | ||
): | ||
self.charm.unit.status = BlockedStatus("Raft majority loss, run: promote-to-primary") | ||
logger.warning("Remove raft member: Stuck raft cluster detected") | ||
data_flags = {"raft_stuck": "True"} | ||
try: | ||
health_status = self.get_patroni_health() | ||
except Exception: | ||
logger.warning("Remove raft member: Unable to get health status") | ||
health_status = {} | ||
if health_status.get("role") in ("leader", "master") or health_status.get( | ||
"sync_standby" | ||
): | ||
logger.info(f"{self.charm.unit.name} is raft candidate") | ||
data_flags["raft_candidate"] = "True" | ||
Comment on lines
-865
to
-874
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wait for the action to start reinit |
||
self.charm.unit_peer_data.update(data_flags) | ||
|
||
# Leader doesn't always trigger when changing it's own peer data. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -106,9 +106,6 @@ def __init__(self, charm): | |
self.framework.observe( | ||
self.charm.on.create_replication_action, self._on_create_replication | ||
) | ||
self.framework.observe( | ||
self.charm.on.promote_to_primary_action, self._on_promote_to_primary | ||
) | ||
Comment on lines
-109
to
-111
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved to the main charm code, since it's no longer used only for async promotion. |
||
|
||
self.framework.observe(self.charm.on.secret_changed, self._on_secret_changed) | ||
|
||
|
@@ -583,7 +580,7 @@ def _on_create_replication(self, event: ActionEvent) -> None: | |
# Set the status. | ||
self.charm.unit.status = MaintenanceStatus("Creating replication...") | ||
|
||
def _on_promote_to_primary(self, event: ActionEvent) -> None: | ||
def promote_to_primary(self, event: ActionEvent) -> None: | ||
"""Promote this cluster to the primary cluster.""" | ||
if ( | ||
self.charm.app.status.message != READ_ONLY_MODE_BLOCKING_MESSAGE | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
#!/usr/bin/env python3 | ||
# Copyright 2024 Canonical Ltd. | ||
# See LICENSE file for licensing details. | ||
import logging | ||
from asyncio import exceptions, gather, sleep | ||
|
||
import pytest | ||
from pytest_operator.plugin import OpsTest | ||
|
||
from .. import markers | ||
from ..helpers import ( | ||
CHARM_BASE, | ||
DATABASE_APP_NAME, | ||
get_machine_from_unit, | ||
stop_machine, | ||
) | ||
from .conftest import APPLICATION_NAME | ||
from .helpers import ( | ||
app_name, | ||
are_writes_increasing, | ||
check_writes, | ||
get_cluster_roles, | ||
start_continuous_writes, | ||
) | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
charm = None | ||
|
||
|
||
@pytest.mark.group(1) | ||
@markers.juju3 | ||
@pytest.mark.abort_on_fail | ||
async def test_build_and_deploy(ops_test: OpsTest) -> None: | ||
"""Build and deploy two PostgreSQL clusters.""" | ||
# This is a potentially destructive test, so it shouldn't be run against existing clusters | ||
charm = await ops_test.build_charm(".") | ||
async with ops_test.fast_forward(): | ||
# Deploy the first cluster with reusable storage | ||
await gather( | ||
ops_test.model.deploy( | ||
charm, | ||
application_name=DATABASE_APP_NAME, | ||
num_units=3, | ||
base=CHARM_BASE, | ||
config={"profile": "testing"}, | ||
), | ||
ops_test.model.deploy( | ||
APPLICATION_NAME, | ||
application_name=APPLICATION_NAME, | ||
base=CHARM_BASE, | ||
channel="edge", | ||
), | ||
) | ||
|
||
await ops_test.model.wait_for_idle(status="active", timeout=1500) | ||
|
||
|
||
@pytest.mark.group(1) | ||
@markers.juju3 | ||
@pytest.mark.parametrize( | ||
"roles", | ||
[ | ||
["primaries"], | ||
["sync_standbys"], | ||
["replicas"], | ||
["primaries", "replicas"], | ||
["sync_standbys", "replicas"], | ||
], | ||
) | ||
@pytest.mark.abort_on_fail | ||
async def test_removing_unit(ops_test: OpsTest, roles: list[str], continuous_writes) -> None: | ||
logger.info(f"removing {', '.join(roles)}") | ||
# Start an application that continuously writes data to the database. | ||
app = await app_name(ops_test) | ||
original_roles = await get_cluster_roles( | ||
ops_test, ops_test.model.applications[DATABASE_APP_NAME].units[0].name | ||
) | ||
await start_continuous_writes(ops_test, app) | ||
units = [original_roles[role][0] for role in roles] | ||
for unit in units: | ||
logger.info(f"Stopping unit {unit}") | ||
await stop_machine(ops_test, await get_machine_from_unit(ops_test, unit)) | ||
await sleep(15) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sleep for the Juju leadership to drift. |
||
for unit in units: | ||
logger.info(f"Deleting unit {unit}") | ||
await ops_test.model.destroy_unit(unit, force=True, destroy_storage=False, max_wait=1500) | ||
|
||
if len(roles) > 1: | ||
for left_unit in ops_test.model.applications[DATABASE_APP_NAME].units: | ||
if left_unit.name not in units: | ||
break | ||
try: | ||
await ops_test.model.block_until( | ||
lambda: left_unit.workload_status == "blocked" | ||
and left_unit.workload_status_message | ||
== "Raft majority loss, run: promote-to-primary", | ||
timeout=600, | ||
) | ||
|
||
run_action = ( | ||
await ops_test.model.applications[DATABASE_APP_NAME] | ||
.units[0] | ||
.run_action("promote-to-primary", scope="unit", force=True) | ||
) | ||
await run_action.wait() | ||
except exceptions.TimeoutError: | ||
# Check if Patroni self healed | ||
assert ( | ||
left_unit.workload_status == "active" | ||
and left_unit.workload_status_message == "Primary" | ||
) | ||
logger.warning(f"Patroni self-healed without raft reinitialisation for roles {roles}") | ||
Comment on lines
+108
to
+113
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sometimes when removing the primary and async replica, Patroni manages to survive, so adding an exception for this case. Should I nail it down further? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there is no need for that. |
||
|
||
await ops_test.model.wait_for_idle(status="active", timeout=600, idle_period=45) | ||
|
||
await are_writes_increasing(ops_test, units) | ||
|
||
logger.info("Scaling back up") | ||
await ops_test.model.applications[DATABASE_APP_NAME].add_unit(count=len(roles)) | ||
await ops_test.model.wait_for_idle(status="active", timeout=1500) | ||
|
||
new_roles = await get_cluster_roles( | ||
ops_test, ops_test.model.applications[DATABASE_APP_NAME].units[0].name | ||
) | ||
assert len(new_roles["primaries"]) == 1 | ||
assert len(new_roles["sync_standbys"]) == 1 | ||
assert len(new_roles["replicas"]) == 1 | ||
if "primaries" in roles: | ||
assert new_roles["primaries"][0] == original_roles["sync_standbys"][0] | ||
else: | ||
assert new_roles["primaries"][0] == original_roles["primaries"][0] | ||
|
||
await check_writes(ops_test) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pass a candidate when promoting a specific unit.