Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-2561] POC - Start with provided config option: config-server, shard #246

Merged
merged 17 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,9 @@ options:
When a relation is removed, auto-delete ensures that any relevant databases
associated with the relation are also removed
default: false
role:
description: |
role config option exists to deploy the charmed-mongodb application as a shard,
config-server, or as a replica set.
type: string
default: replication
59 changes: 37 additions & 22 deletions lib/charms/mongodb/v0/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import secrets
import string
import subprocess
from typing import List, Optional, Union
from typing import List

from charms.mongodb.v0.mongodb import MongoDBConfiguration, MongoDBConnection
from ops.model import (
Expand All @@ -19,6 +19,8 @@
)
from pymongo.errors import AutoReconnect, ServerSelectionTimeoutError

from config import Config

# The unique Charmhub library identifier, never change it
LIBID = "b9a7fe0c38d8486a9d1ce94c27d4758e"

Expand All @@ -27,8 +29,7 @@

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 7

LIBPATCH = 8

# path to store mongodb ketFile
KEY_FILE = "keyFile"
Expand Down Expand Up @@ -80,10 +81,37 @@ def get_create_user_cmd(
]


def get_mongos_args(config: MongoDBConfiguration) -> str:
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
"""Returns the arguments used for starting mongos on a config-server side application.

Returns:
A string representing the arguments to be passed to mongos.
"""
# mongos running on the config server communicates through localhost
config_server_uri = f"{config.replset}/localhost"

# no need to add TLS since no network calls are used, since mongos is configured to listen
# on local host
cmd = [
# mongos on config server side should run on 0.0.0.0 so it can be accessed by other units
# in the sharded cluster
"--bind_ip 0.0.0.0",
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
# todo figure out this one
f"--configdb {config_server_uri}",
# config server is already using 27017
f"--port {Config.MONGOS_PORT}",
# todo followup PR add keyfile and auth
"\n",
]

return " ".join(cmd)


def get_mongod_args(
config: MongoDBConfiguration,
auth: bool = True,
snap_install: bool = False,
role: str = "replication",
) -> str:
"""Construct the MongoDB startup command line.

Expand Down Expand Up @@ -137,6 +165,12 @@ def get_mongod_args(
]
)

if role == "config-server":
cmd.append("--configsvr")

if role == "shard":
cmd.append("--shardsvr")

cmd.append("\n")
return " ".join(cmd)

Expand Down Expand Up @@ -202,25 +236,6 @@ def copy_licenses_to_unit():
)


_StrOrBytes = Union[str, bytes]


def process_pbm_error(error_string: Optional[_StrOrBytes]) -> str:
"""Parses pbm error string and returns a user friendly message."""
message = "couldn't configure s3 backup option"
if not error_string:
return message
if type(error_string) == bytes:
error_string = error_string.decode("utf-8")
if "status code: 403" in error_string: # type: ignore
message = "s3 credentials are incorrect."
elif "status code: 404" in error_string: # type: ignore
message = "s3 configurations are incompatible."
elif "status code: 301" in error_string: # type: ignore
message = "s3 configurations are incompatible."
return message


def current_pbm_op(pbm_status: str) -> str:
"""Parses pbm status for the operation that pbm is running."""
pbm_status = json.loads(pbm_status)
Expand Down
77 changes: 61 additions & 16 deletions lib/charms/mongodb/v0/mongodb_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,10 @@
import re
import subprocess
import time
from typing import Dict, List
from typing import Dict, List, Optional, Union

from charms.data_platform_libs.v0.s3 import CredentialsChangedEvent, S3Requirer
from charms.mongodb.v0.helpers import (
current_pbm_op,
process_pbm_error,
process_pbm_status,
)
from charms.mongodb.v0.helpers import current_pbm_op, process_pbm_status
from charms.operator_libs_linux.v1 import snap
from ops.framework import Object
from ops.model import BlockedStatus, MaintenanceStatus, StatusBase, WaitingStatus
Expand All @@ -34,6 +30,8 @@
wait_fixed,
)

from config import Config

# The unique Charmhub library identifier, never change it
LIBID = "9f2b91c6128d48d6ba22724bf365da3b"

Expand All @@ -42,7 +40,7 @@

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 6
LIBPATCH = 7

logger = logging.getLogger(__name__)

Expand All @@ -63,6 +61,9 @@
BACKUP_RESTORE_ATTEMPT_COOLDOWN = 15


_StrOrBytes = Union[str, bytes]


class ResyncError(Exception):
"""Raised when pbm is resyncing configurations and is not ready to be used."""

Expand Down Expand Up @@ -316,7 +317,7 @@ def _configure_pbm_options(self, event) -> None:
),
return
except ExecError as e:
self.charm.unit.status = BlockedStatus(process_pbm_error(e.stdout))
self.charm.unit.status = BlockedStatus(self.process_pbm_error(e.stdout))
return
except subprocess.CalledProcessError as e:
logger.error("Syncing configurations failed: %s", str(e))
Expand Down Expand Up @@ -418,7 +419,7 @@ def _wait_pbm_status(self) -> None:
)
raise ResyncError
except ExecError as e:
self.charm.unit.status = BlockedStatus(process_pbm_error(e.stdout))
self.charm.unit.status = BlockedStatus(self.process_pbm_error(e.stdout))

def _get_pbm_status(self) -> StatusBase:
"""Retrieve pbm status."""
Expand All @@ -428,15 +429,14 @@ def _get_pbm_status(self) -> StatusBase:
try:
previous_pbm_status = self.charm.unit.status
pbm_status = self.charm.run_pbm_command(PBM_STATUS_CMD)

# pbm errors are outputted in json and do not raise CLI errors
pbm_error = self.process_pbm_error(pbm_status)
if pbm_error:
return BlockedStatus(pbm_error)

self._log_backup_restore_result(pbm_status, previous_pbm_status)
return process_pbm_status(pbm_status)
except ExecError as e:
logger.error(f"Failed to get pbm status. {e}")
return BlockedStatus(process_pbm_error(e.stdout))
except subprocess.CalledProcessError as e:
# pbm pipes a return code of 1, but its output shows the true error code so it is
# necessary to parse the output
return BlockedStatus(process_pbm_error(e.output))
except Exception as e:
# pbm pipes a return code of 1, but its output shows the true error code so it is
# necessary to parse the output
Expand Down Expand Up @@ -652,3 +652,48 @@ def _get_backup_restore_operation_result(self, current_pbm_status, previous_pbm_
return f"Backup {backup_id} completed successfully"

return "Unknown operation result"

def retrieve_error_message(self, pbm_status: Dict) -> str:
"""Parses pbm status for an error message from the current unit.

If pbm_agent is in the error state, the command `pbm status` does not raise an error.
Instead, it is in the log messages. pbm_agent also shows all the error messages for other
replicas in the set.
"""
try:
clusters = pbm_status["cluster"]
for cluster in clusters:
if cluster["rs"] == self.charm.app.name:
break

for host_info in cluster["nodes"]:
replica_info = (
f"mongodb/{self.charm._unit_ip(self.charm.unit)}:{Config.MONGOS_PORT}"
)
if host_info["host"] == replica_info:
break

return str(host_info["errors"])
except KeyError:
return ""

def process_pbm_error(self, pbm_status: Optional[_StrOrBytes]) -> str:
"""Returns errors found in PBM status."""
if type(pbm_status) == bytes:
pbm_status = pbm_status.decode("utf-8")

try:
error_message = self.retrieve_error_message(json.loads(pbm_status))
except json.decoder.JSONDecodeError:
# if pbm status doesn't return a parsable dictionary it is an error message
# represented as a string
error_message = pbm_status

message = None
if "status code: 403" in error_message:
message = "s3 credentials are incorrect."
elif "status code: 404" in error_message:
message = "s3 configurations are incompatible."
elif "status code: 301" in error_message:
message = "s3 configurations are incompatible."
return message
63 changes: 52 additions & 11 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ class MongodbOperatorCharm(CharmBase):
def __init__(self, *args):
super().__init__(*args)
self._port = Config.MONGODB_PORT

self.framework.observe(self.on.install, self._on_install)
self.framework.observe(self.on.start, self._on_start)
self.framework.observe(self.on.update_status, self._on_update_status)
Expand Down Expand Up @@ -234,6 +233,15 @@ def db_initialised(self) -> bool:
"""Check if MongoDB is initialised."""
return "db_initialised" in self.app_peer_data

@property
def role(self) -> str:
"""Returns role of MongoDB deployment."""
return self.model.config["role"]

def is_role(self, role_name: str) -> bool:
"""Checks if application is running in provided role."""
return self.role == role_name

@db_initialised.setter
def db_initialised(self, value):
"""Set the db_initialised flag."""
Expand Down Expand Up @@ -278,7 +286,10 @@ def _on_install(self, event: InstallEvent) -> None:

# Construct the mongod startup commandline args for systemd and reload the daemon.
update_mongod_service(
auth=auth, machine_ip=self._unit_ip(self.unit), config=self.mongodb_config
auth=auth,
machine_ip=self._unit_ip(self.unit),
config=self.mongodb_config,
role=self.role,
)

# add licenses
Expand All @@ -297,9 +308,7 @@ def _on_start(self, event: StartEvent) -> None:
try:
logger.debug("starting MongoDB.")
self.unit.status = MaintenanceStatus("starting MongoDB")
snap_cache = snap.SnapCache()
mongodb_snap = snap_cache["charmed-mongodb"]
mongodb_snap.start(services=["mongod"])
self.start_mongod_service()
self.unit.status = ActiveStatus()
except snap.SnapError as e:
logger.error("An exception occurred when starting mongod agent, error: %s.", str(e))
Expand Down Expand Up @@ -950,12 +959,17 @@ def _initialise_replica_set(self, event: StartEvent) -> None:
self._peers.data[self.app]["replica_set_hosts"] = json.dumps(
[self._unit_ip(self.unit)]
)

logger.info("User initialization")
self._init_operator_user()
self._init_backup_user()
self._init_monitor_user()
logger.info("Manage relations")
self.client_relations.oversee_users(None, None)

# in sharding, user management is handled by mongos subordinate charm
if self.is_role(Config.Role.REPLICATION):
logger.info("Manage user")
self.client_relations.oversee_users(None, None)

except subprocess.CalledProcessError as e:
logger.error(
"Deferring on_start: exit code: %i, stderr: %s", e.exit_code, e.stderr
Expand Down Expand Up @@ -1021,21 +1035,48 @@ def set_secret(self, scope: str, key: str, value: Optional[str]) -> Optional[str
else:
raise RuntimeError("Unknown secret scope.")

def start_mongod_service(self):
"""Starts the mongod service and if necessary starts mongos.

Raises:
snap.SnapError
"""
snap_cache = snap.SnapCache()
mongodb_snap = snap_cache["charmed-mongodb"]
mongodb_snap.start(services=["mongod"])

# charms running as config server are responsible for maintaining a server side mongos
if self.is_role(Config.Role.CONFIG_SERVER):
mongodb_snap.start(services=["mongos"])

def stop_mongod_service(self):
"""Stops the mongod service and if necessary stops mongos.

Raises:
snap.SnapError
"""
snap_cache = snap.SnapCache()
mongodb_snap = snap_cache["charmed-mongodb"]
mongodb_snap.stop(services=["mongod"])

# charms running as config server are responsible for maintaining a server side mongos
if self.is_role(Config.Role.CONFIG_SERVER):
mongodb_snap.stop(services=["mongos"])

def restart_mongod_service(self, auth=None):
"""Restarts the mongod service with its associated configuration."""
if auth is None:
auth = self.auth_enabled()

try:
snap_cache = snap.SnapCache()
mongodb_snap = snap_cache["charmed-mongodb"]
mongodb_snap.stop(services=["mongod"])
self.stop_mongod_service()
update_mongod_service(
auth,
self._unit_ip(self.unit),
config=self.mongodb_config,
role=self.role,
)
mongodb_snap.start(services=["mongod"])
self.start_mongod_service()
except snap.SnapError as e:
logger.error("An exception occurred when starting mongod agent, error: %s.", str(e))
self.unit.status = BlockedStatus("couldn't start MongoDB")
Expand Down
13 changes: 10 additions & 3 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,21 @@
class Config:
"""Configuration for MongoDB Charm."""

SUBSTRATE = "vm"
# We expect the MongoDB container to use the default ports
MONGOS_PORT = 27018
MONGODB_PORT = 27017
SUBSTRATE = "vm"
ENV_VAR_PATH = "/etc/environment"
delgod marked this conversation as resolved.
Show resolved Hide resolved
MONGODB_SNAP_DATA_DIR = "/var/snap/charmed-mongodb/current"
MONGOD_CONF_DIR = f"{MONGODB_SNAP_DATA_DIR}/etc/mongod"
MONGOD_CONF_FILE_PATH = f"{MONGOD_CONF_DIR}/mongod.conf"
SNAP_PACKAGES = [("charmed-mongodb", "5/edge", 82)]
SNAP_PACKAGES = [("charmed-mongodb", "5/edge", 84)]

class Role:
"""Role config names for MongoDB Charm."""

CONFIG_SERVER = "config-server"
REPLICATION = "replication"
SHARD = "shard"

class Actions:
"""Actions related config for MongoDB Charm."""
Expand Down
Loading
Loading