Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-2781] Test replica relations to sharding interface #298

Merged
merged 18 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/charms/mongodb/v1/mongos.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 3
LIBPATCH = 4

# path to store mongodb ketFile
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -204,7 +204,7 @@ def pre_remove_checks(self, shard_name):
with attempt:
balancer_state = self.client.admin.command("balancerStatus")
if balancer_state["mode"] == "off":
raise BalancerNotEnabledError
raise BalancerNotEnabledError("balancer is not enabled.")

def remove_shard(self, shard_name: str) -> None:
"""Removes shard from the cluster.
Expand Down
104 changes: 85 additions & 19 deletions lib/charms/mongodb/v1/shards_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from ops.model import (
ActiveStatus,
BlockedStatus,
ErrorStatus,
MaintenanceStatus,
StatusBase,
WaitingStatus,
Expand All @@ -57,6 +56,14 @@
HOSTS_KEY = "host"
OPERATOR_PASSWORD_KEY = MongoDBUser.get_password_key_name_for_user(OperatorUser.get_username())
FORBIDDEN_REMOVAL_ERR_CODE = 20
AUTH_FAILED_CODE = 18


class ShardAuthError(Exception):
"""Raised when a shard doesn't have the same auth as the config server."""

def __init__(self, shard: str):
self.shard = shard


class RemoveLastShardError(Exception):
Expand Down Expand Up @@ -201,11 +208,11 @@ def _on_relation_event(self, event):

logger.error("Deferring _on_relation_event for shards interface since: error=%r", e)
event.defer()
except BalancerNotEnabledError:
logger.error("Deferring on _relation_broken_event, balancer is not enabled.")
except ShardAuthError as e:
self.charm.unit.status = WaitingStatus(f"Waiting for {e.shard} to sync credentials.")
event.defer()
return
except (PyMongoError, NotReadyError) as e:
except (PyMongoError, NotReadyError, BalancerNotEnabledError) as e:
logger.error("Deferring _on_relation_event for shards interface since: error=%r", e)
event.defer()
return
Expand All @@ -215,6 +222,7 @@ def add_shards(self, departed_shard_id):

raises: PyMongoError
"""
failed_to_add_shard = None
with MongosConnection(self.charm.mongos_config) as mongo:
cluster_shards = mongo.get_shard_members()
relation_shards = self._get_shards_from_relations(departed_shard_id)
Expand All @@ -231,10 +239,25 @@ def add_shards(self, departed_shard_id):
logger.info("Adding shard: %s ", shard)
mongo.add_shard(shard, shard_hosts)
except PyMongoError as e:
# raise exception after trying to add the remaining shards, as to not prevent
# adding other shards
logger.error("Failed to add shard %s to the config server, error=%r", shard, e)
raise
failed_to_add_shard = (e, shard)

if not failed_to_add_shard:
self.charm.unit.status = ActiveStatus("")
dmitry-ratushnyy marked this conversation as resolved.
Show resolved Hide resolved
return

(error, shard) = failed_to_add_shard

self.charm.unit.status = ActiveStatus("")
# Sometimes it can take up to 20 minutes for the shard to be restarted with the same auth
# as the config server.
if error.code == AUTH_FAILED_CODE:
logger.error(f"{shard} shard does not have the same auth as the config server.")
raise ShardAuthError(shard)

logger.error(f"Failed to add {shard} to cluster")
raise error

def remove_shards(self, departed_shard_id):
"""Removes shards from cluster.
Expand Down Expand Up @@ -280,13 +303,14 @@ def update_mongos_hosts(self):

def get_config_server_status(self) -> Optional[StatusBase]:
"""Returns the current status of the config-server."""
if not self.charm.is_role(Config.Role.CONFIG_SERVER):
logger.info("skipping status check, charm is not running as a shard")
if self.skip_config_server_status():
return None

if not self.charm.db_initialised:
logger.info("No status for shard to report, waiting for db to be initialised.")
return None
if (
self.charm.is_role(Config.Role.REPLICATION)
and self.model.relations[Config.Relations.CONFIG_SERVER_RELATIONS_NAME]
):
return BlockedStatus("sharding interface cannot be used by replicas")

if self.model.relations[LEGACY_REL_NAME]:
return BlockedStatus(f"relation {LEGACY_REL_NAME} to shard not supported.")
Expand All @@ -295,7 +319,7 @@ def get_config_server_status(self) -> Optional[StatusBase]:
return BlockedStatus(f"relation {REL_NAME} to shard not supported.")

if not self.is_mongos_running():
return ErrorStatus("Internal mongos is not running.")
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
return BlockedStatus("Internal mongos is not running.")

shard_draining = self.get_draining_shards()
if shard_draining:
Expand All @@ -308,10 +332,28 @@ def get_config_server_status(self) -> Optional[StatusBase]:
unreachable_shards = self.get_unreachable_shards()
if unreachable_shards:
unreachable_shards = ", ".join(unreachable_shards)
return ErrorStatus(f"Shards {unreachable_shards} are unreachable.")
return BlockedStatus(f"shards {unreachable_shards} are unreachable.")

return ActiveStatus()

def skip_config_server_status(self) -> bool:
"""Returns true if the status check should be skipped."""
if self.charm.is_role(Config.Role.SHARD):
logger.info("skipping config server status check, charm is running as a shard")
return True

if not self.charm.db_initialised:
logger.info("No status for shard to report, waiting for db to be initialised.")
return True

if (
self.charm.is_role(Config.Role.REPLICATION)
and not self.model.relations[Config.Relations.CONFIG_SERVER_RELATIONS_NAME]
):
return True

return False

def _update_relation_data(self, relation_id: int, data: dict) -> None:
"""Updates a set of key-value pairs in the relation.

Expand Down Expand Up @@ -491,6 +533,11 @@ def pass_hook_checks(self, event):
event.defer()
return False

mongos_hosts = event.relation.data[event.relation.app].get(HOSTS_KEY, None)
if isinstance(event, RelationBrokenEvent) and not mongos_hosts:
logger.info("Config-server relation never set up, no need to process broken event.")
return False

return True

def _on_relation_broken(self, event: RelationBrokenEvent) -> None:
Expand Down Expand Up @@ -558,13 +605,14 @@ def get_shard_status(self) -> Optional[StatusBase]:
Note: No need to report if currently draining, since that check block other hooks from
executing.
"""
if not self.charm.is_role(Config.Role.SHARD):
logger.info("skipping status check, charm is not running as a shard")
if self.skip_shard_status():
return None

if not self.charm.db_initialised:
logger.info("No status for shard to report, waiting for db to be initialised.")
return None
if (
self.charm.is_role(Config.Role.REPLICATION)
and self.model.relations[Config.Relations.CONFIG_SERVER_RELATIONS_NAME]
):
return BlockedStatus("sharding interface cannot be used by replicas")

if self.model.get_relation(LEGACY_REL_NAME):
return BlockedStatus(f"relation {LEGACY_REL_NAME} to shard not supported.")
Expand All @@ -579,7 +627,7 @@ def get_shard_status(self) -> Optional[StatusBase]:
return ActiveStatus("Shard drained from cluster, ready for removal")

if not self._is_mongos_reachable():
return ErrorStatus("Config server unreachable")
return BlockedStatus("Config server unreachable")

if not self._is_added_to_cluster():
return MaintenanceStatus("Adding shard to config-server")
Expand All @@ -589,6 +637,24 @@ def get_shard_status(self) -> Optional[StatusBase]:

return ActiveStatus()

def skip_shard_status(self) -> bool:
"""Returns true if the status check should be skipped."""
if self.charm.is_role(Config.Role.CONFIG_SERVER):
logger.info("skipping status check, charm is running as config-server")
return True

if not self.charm.db_initialised:
logger.info("No status for shard to report, waiting for db to be initialised.")
return True

if (
self.charm.is_role(Config.Role.REPLICATION)
and not self.model.relations[Config.Relations.CONFIG_SERVER_RELATIONS_NAME]
):
return True

return False

def drained(self, mongos_hosts: Set[str], shard_name: str) -> bool:
"""Returns whether a shard has been drained from the cluster.

Expand Down
10 changes: 3 additions & 7 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -1357,12 +1357,8 @@ def get_status(self) -> StatusBase:
"""
# retrieve statuses of different services running on Charmed MongoDB
mongodb_status = build_unit_status(self.mongodb_config, self._unit_ip(self.unit))
shard_status = self.shard.get_shard_status() if self.is_role(Config.Role.SHARD) else None
config_server_status = (
self.config_server.get_config_server_status()
if self.is_role(Config.Role.CONFIG_SERVER)
else None
)
shard_status = self.shard.get_shard_status()
config_server_status = self.config_server.get_config_server_status()
pbm_status = self.backups.get_pbm_status()

# failure in mongodb takes precedence over sharding and config server
Expand Down Expand Up @@ -1398,7 +1394,7 @@ def is_relation_feasible(self, rel_interface) -> bool:
not self.is_sharding_component()
and rel_interface == Config.Relations.SHARDING_RELATIONS_NAME
):
self.unit.status = BlockedStatus("role replication does not support sharding")
self.unit.status = BlockedStatus("sharding interface cannot be used by replicas")
logger.error(
"Charm is in sharding role: %s. Does not support %s interface.",
self.role,
Expand Down
82 changes: 79 additions & 3 deletions tests/integration/sharding_tests/test_sharding_relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from juju.errors import JujuAPIError
from pytest_operator.plugin import OpsTest

SHARD_ONE_APP_NAME = "shard-one"
SHARD_ONE_APP_NAME = "shard"
CONFIG_SERVER_ONE_APP_NAME = "config-server-one"
CONFIG_SERVER_TWO_APP_NAME = "config-server-two"
REPLICATION_APP_NAME = "replication"
APP_CHARM_NAME = "application"
LEGACY_APP_CHARM_NAME = "legacy-application"

Expand All @@ -18,7 +19,7 @@
DATABASE_REL_NAME = "first-database"
LEGACY_RELATION_NAME = "obsolete"

RELATION_LIMIT_MESSAGE = 'cannot add relation "shard-one:sharding config-server-two:config-server": establishing a new relation for shard-one:sharding would exceed its maximum relation limit of 1'
RELATION_LIMIT_MESSAGE = 'cannot add relation "shard:sharding config-server-two:config-server": establishing a new relation for shard:sharding would exceed its maximum relation limit of 1'
# for now we have a large timeout due to the slow drainage of the `config.system.sessions`
# collection. More info here:
# https://stackoverflow.com/questions/77364840/mongodb-slow-chunk-migration-for-collection-config-system-sessions-with-remov
Expand All @@ -41,8 +42,9 @@ async def test_build_and_deploy(
application_name=CONFIG_SERVER_TWO_APP_NAME,
)
await ops_test.model.deploy(
database_charm, num_units=1, config={"role": "shard"}, application_name=SHARD_ONE_APP_NAME
database_charm, config={"role": "shard"}, application_name=SHARD_ONE_APP_NAME
)
await ops_test.model.deploy(database_charm, application_name=REPLICATION_APP_NAME)
await ops_test.model.deploy(application_charm, application_name=APP_CHARM_NAME)
await ops_test.model.deploy(legacy_charm, application_name=LEGACY_APP_CHARM_NAME)

Expand Down Expand Up @@ -75,6 +77,19 @@ async def test_only_one_config_server_relation(ops_test: OpsTest) -> None:
juju_error.value.args[0] == RELATION_LIMIT_MESSAGE
), "Shard can relate to multiple config servers."

# clean up relation
await ops_test.model.applications[SHARD_ONE_APP_NAME].remove_relation(
f"{SHARD_ONE_APP_NAME}:{SHARD_REL_NAME}",
f"{CONFIG_SERVER_ONE_APP_NAME}:{CONFIG_SERVER_REL_NAME}",
)

await ops_test.model.wait_for_idle(
apps=[REPLICATION_APP_NAME],
idle_period=20,
raise_on_blocked=False,
timeout=TIMEOUT,
)


async def test_cannot_use_db_relation(ops_test: OpsTest) -> None:
"""Verify that sharding components cannot use the DB relation."""
Expand Down Expand Up @@ -142,3 +157,64 @@ async def test_cannot_use_legacy_db_relation(ops_test: OpsTest) -> None:
raise_on_blocked=False,
timeout=TIMEOUT,
)


async def test_replication_config_server_relation(ops_test: OpsTest):
"""Verifies that using a replica as a shard fails."""
# attempt to add a replication deployment as a shard to the config server.
await ops_test.model.integrate(
f"{REPLICATION_APP_NAME}:{SHARD_REL_NAME}",
f"{CONFIG_SERVER_ONE_APP_NAME}:{CONFIG_SERVER_REL_NAME}",
)

await ops_test.model.wait_for_idle(
apps=[REPLICATION_APP_NAME],
idle_period=20,
raise_on_blocked=False,
timeout=TIMEOUT,
)

replication_unit = ops_test.model.applications[REPLICATION_APP_NAME].units[0]
assert (
replication_unit.workload_status_message == "sharding interface cannot be used by replicas"
), "replication cannot be related to config server."

# clean up relations
await ops_test.model.applications[REPLICATION_APP_NAME].remove_relation(
f"{REPLICATION_APP_NAME}:{SHARD_REL_NAME}",
f"{CONFIG_SERVER_ONE_APP_NAME}:{CONFIG_SERVER_REL_NAME}",
)


async def test_replication_shard_relation(ops_test: OpsTest):
"""Verifies that using a replica as a config-server fails."""
# attempt to add a shard to a replication deployment as a config server.
await ops_test.model.integrate(
f"{SHARD_ONE_APP_NAME}:{SHARD_REL_NAME}",
f"{REPLICATION_APP_NAME}:{CONFIG_SERVER_REL_NAME}",
)

await ops_test.model.wait_for_idle(
apps=[REPLICATION_APP_NAME],
idle_period=20,
raise_on_blocked=False,
timeout=TIMEOUT,
)

replication_unit = ops_test.model.applications[REPLICATION_APP_NAME].units[0]
assert (
replication_unit.workload_status_message == "sharding interface cannot be used by replicas"
), "replication cannot be related to config server."

# clean up relation
await ops_test.model.applications[REPLICATION_APP_NAME].remove_relation(
f"{SHARD_ONE_APP_NAME}:{SHARD_REL_NAME}",
f"{REPLICATION_APP_NAME}:{CONFIG_SERVER_REL_NAME}",
)

await ops_test.model.wait_for_idle(
apps=[REPLICATION_APP_NAME],
idle_period=20,
raise_on_blocked=False,
timeout=TIMEOUT,
)