Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-3501] - add restore test for sharded cluster #359

Merged
merged 8 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 103 additions & 1 deletion tests/integration/sharding_tests/test_sharding_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,41 @@

import secrets
import string
import time

import pytest
from pytest_operator.plugin import OpsTest
from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed

from ..backup_tests import helpers as backup_helpers

# from .writes_helpers import writes_helpers
from ..helpers import get_leader_id, get_password, set_password
from . import writes_helpers

S3_APP_NAME = "s3-integrator"
SHARD_ONE_APP_NAME = "shard-one"
SHARD_TWO_APP_NAME = "shard-two"
SHARD_APPS = [SHARD_ONE_APP_NAME, SHARD_TWO_APP_NAME]
CONFIG_SERVER_APP_NAME = "config-server-one"
CONFIG_SERVER_APP_NAME = "config-server"
SHARD_REL_NAME = "sharding"
CONFIG_SERVER_REL_NAME = "config-server"
S3_REL_NAME = "s3-credentials"
TIMEOUT = 10 * 60


@pytest.fixture()
async def add_writes_to_db(ops_test: OpsTest):
"""Adds writes to DB before test starts and clears writes at the end of the test."""
await writes_helpers.start_continous_writes(
ops_test, 1, config_server_name=CONFIG_SERVER_APP_NAME
)
time.sleep(20)
await writes_helpers.stop_continous_writes(ops_test, config_server_name=CONFIG_SERVER_APP_NAME)
yield
await writes_helpers.clear_db_writes(ops_test)


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_build_and_deploy(ops_test: OpsTest) -> None:
Expand Down Expand Up @@ -200,3 +216,89 @@ async def test_rotate_backup_password(ops_test: OpsTest) -> None:
assert backups == 2
except RetryError:
assert backups == 2, "Backup not created after password rotation."


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_restore_backup(ops_test: OpsTest, add_writes_to_db) -> None:
phvalguima marked this conversation as resolved.
Show resolved Hide resolved
"""Tests that sharded Charmed MongoDB cluster supports restores."""
# count total writes
cluster_writes = await writes_helpers.get_cluster_writes_count(
ops_test, shard_app_names=SHARD_APPS
)
assert cluster_writes["total_writes"] > 0, "no writes to backup"

leader_unit = await backup_helpers.get_leader_unit(
ops_test, db_app_name=CONFIG_SERVER_APP_NAME
)
prev_backups = await backup_helpers.count_logical_backups(leader_unit)
await ops_test.model.wait_for_idle(
apps=[CONFIG_SERVER_APP_NAME], status="active", idle_period=20
),
action = await leader_unit.run_action(action_name="create-backup")
first_backup = await action.wait()
assert first_backup.status == "completed", "First backup not started."

# verify that backup was made on the bucket
try:
for attempt in Retrying(stop=stop_after_delay(4), wait=wait_fixed(5)):
with attempt:
backups = await backup_helpers.count_logical_backups(leader_unit)
assert backups == prev_backups + 1, "Backup not created."
except RetryError:
assert backups == prev_backups + 1, "Backup not created."
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved

await ops_test.model.wait_for_idle(
apps=[CONFIG_SERVER_APP_NAME], status="active", idle_period=20
),

# add writes to be cleared after restoring the backup. Note these are written to the same
# collection that was backed up.
await writes_helpers.insert_unwanted_data(ops_test)
new_total_writes = await writes_helpers.get_cluster_writes_count(
ops_test, shard_app_names=SHARD_APPS
)
assert (
new_total_writes["total_writes"] > cluster_writes["total_writes"]
), "No writes to be cleared after restoring."

# find most recent backup id and restore
action = await leader_unit.run_action(action_name="list-backups")
list_result = await action.wait()
list_result = list_result.results["backups"]
most_recent_backup = list_result.split("\n")[-1]
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
backup_id = most_recent_backup.split()[0]
action = await leader_unit.run_action(action_name="restore", **{"backup-id": backup_id})
restore = await action.wait()
assert restore.results["restore-status"] == "restore started", "restore not successful"

await ops_test.model.wait_for_idle(
apps=[CONFIG_SERVER_APP_NAME], status="active", idle_period=20
),

# verify all writes are present
try:
for attempt in Retrying(stop=stop_after_delay(4), wait=wait_fixed(20)):
with attempt:
restored_total_writes = await writes_helpers.get_cluster_writes_count(
ops_test, shard_app_names=SHARD_APPS
)
assert (
restored_total_writes["total_writes"] == cluster_writes["total_writes"]
), "writes not correctly restored to whole cluster"
assert (
restored_total_writes[SHARD_ONE_APP_NAME] == cluster_writes[SHARD_ONE_APP_NAME]
), f"writes not correctly restored to {SHARD_ONE_APP_NAME}"
assert (
restored_total_writes[SHARD_TWO_APP_NAME] == cluster_writes[SHARD_TWO_APP_NAME]
), f"writes not correctly restored to {SHARD_TWO_APP_NAME}"
except RetryError:
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
assert (
restored_total_writes["total_writes"] == cluster_writes["total_writes"]
), "writes not correctly restored to whole cluster"
assert (
restored_total_writes[SHARD_ONE_APP_NAME] == cluster_writes[SHARD_ONE_APP_NAME]
), f"writes not correctly restored to {SHARD_ONE_APP_NAME}"
assert (
restored_total_writes[SHARD_TWO_APP_NAME] == cluster_writes[SHARD_TWO_APP_NAME]
), f"writes not correctly restored to {SHARD_TWO_APP_NAME}"
140 changes: 140 additions & 0 deletions tests/integration/sharding_tests/writes_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

import logging
import subprocess
from pathlib import Path
from typing import Dict, List

import yaml
from pymongo import MongoClient
from pytest_operator.plugin import OpsTest

from ..helpers import get_password

# TODO move these to a separate file for constants \ config
METADATA = yaml.safe_load(Path("./metadata.yaml").read_text())
MONGOS_PORT = 27018
MONGOD_PORT = 27017
APP_NAME = "config-server"

logger = logging.getLogger(__name__)


class ProcessError(Exception):
"""Raised when a process fails."""


class ProcessRunningError(Exception):
"""Raised when a process is running when it is not expected to be."""


async def mongos_uri(ops_test: OpsTest, config_server_name=APP_NAME) -> str:
"""Returns a uri for connecting to mongos."""
password = await get_password(ops_test, app_name=config_server_name)
hosts = [
f"{unit.public_address}:{MONGOS_PORT}"
for unit in ops_test.model.applications[config_server_name].units
]
hosts = ",".join(hosts)
return f"mongodb://operator:{password}@{hosts}/admin"


async def clear_db_writes(ops_test: OpsTest, config_server_name=APP_NAME) -> bool:
"""Stop the DB process and remove any writes to the test collection."""
await stop_continous_writes(ops_test)

# remove collection from database
connection_string = await mongos_uri(ops_test, config_server_name)

client = MongoClient(connection_string)
db = client["new-db"]

# collection for continuous writes
test_collection = db["test_collection"]
test_collection.drop()

client.close()


async def start_continous_writes(
ops_test: OpsTest, starting_number: int, config_server_name=APP_NAME
) -> None:
"""Starts continuous writes to MongoDB."""
connection_string = await mongos_uri(ops_test, config_server_name)

# run continuous writes in the background.
subprocess.Popen(
[
"python3",
"tests/integration/ha_tests/continuous_writes.py",
connection_string,
str(starting_number),
]
)


async def stop_continous_writes(ops_test: OpsTest, config_server_name=APP_NAME) -> int:
"""Stops continuous writes to MongoDB and returns the last written value."""
# stop the process
proc = subprocess.Popen(["pkill", "-9", "-f", "continuous_writes.py"])

# wait for process to be killed
proc.communicate()

connection_string = await mongos_uri(ops_test, config_server_name)

client = MongoClient(connection_string)
db = client["new-db"]
test_collection = db["test_collection"]
client.admin.command("enableSharding", "new-db")

# last written value should be the highest number in the database.
last_written_value = test_collection.find_one(sort=[("number", -1)])
client.close()
return last_written_value


async def count_shard_writes(ops_test: OpsTest, shard_app_name=APP_NAME) -> int:
"""New versions of pymongo no longer support the count operation, instead find is used."""
connection_string = await mongos_uri(ops_test, shard_app_name)
password = await get_password(ops_test, app_name=shard_app_name)
hosts = [
f"{unit.public_address}:{MONGOD_PORT}"
for unit in ops_test.model.applications[shard_app_name].units
]
hosts = ",".join(hosts)
connection_string = f"mongodb://operator:{password}@{hosts}/admin"

client = MongoClient(connection_string)
db = client["new-db"]
test_collection = db["test_collection"]
count = test_collection.count_documents({})
client.close()
return count


async def get_cluster_writes_count(ops_test, shard_app_names=List[str]) -> Dict:
"""Returns a dictionary of the writes for each cluster_component and the total writes."""
cluster_write_count = {}
total_writes = 0
for app_name in shard_app_names:
component_writes = await count_shard_writes(ops_test, app_name)
cluster_write_count[app_name] = component_writes
total_writes += component_writes

cluster_write_count["total_writes"] = total_writes
return cluster_write_count


async def insert_unwanted_data(ops_test: OpsTest, config_server_name=APP_NAME) -> None:
"""Inserts the data into the MongoDB cluster via primary replica."""
connection_string = await mongos_uri(ops_test, config_server_name)

client = MongoClient(connection_string)
db = client["new-db"]
test_collection = db["test_collection"]
test_collection.insert_one({"unwanted_data": "bad data 1"})
test_collection.insert_one({"unwanted_data": "bad data 2"})
test_collection.insert_one({"unwanted_data": "bad data 3"})
client.close()
Loading