Skip to content

Commit

Permalink
[DPE-2790] Extended TLS testing (6/edge) (#281)
Browse files Browse the repository at this point in the history
## Issue

As already done for K8s:
canonical/mongodb-k8s-operator#206

## Solution
  • Loading branch information
juditnovak authored Oct 25, 2023
1 parent 6f06a48 commit b62aba9
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 57 deletions.
2 changes: 2 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
89 changes: 89 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

import json
from pathlib import Path
from typing import Dict, Optional

import ops
import yaml
Expand Down Expand Up @@ -100,3 +102,90 @@ async def set_password(
)
action = await action.wait()
return action.results


async def get_application_relation_data(
ops_test: OpsTest,
application_name: str,
relation_name: str,
key: str,
relation_id: str = None,
relation_alias: str = None,
) -> Optional[str]:
"""Get relation data for an application.
Args:
ops_test: The ops test framework instance
application_name: The name of the application
relation_name: name of the relation to get connection data from
key: key of data to be retrieved
relation_id: id of the relation to get connection data from
relation_alias: alias of the relation (like a connection name)
to get connection data from
Returns:
the that that was requested or None
if no data in the relation
Raises:
ValueError if it's not possible to get application unit data
or if there is no data for the particular relation endpoint
and/or alias.
"""
unit_name = f"{application_name}/0"
raw_data = (await ops_test.juju("show-unit", unit_name))[1]

if not raw_data:
raise ValueError(f"no unit info could be grabbed for {unit_name}")
data = yaml.safe_load(raw_data)

# Filter the data based on the relation name.
relation_data = [v for v in data[unit_name]["relation-info"] if v["endpoint"] == relation_name]

if relation_id:
# Filter the data based on the relation id.
relation_data = [v for v in relation_data if v["relation-id"] == relation_id]

if relation_alias:
# Filter the data based on the cluster/relation alias.
relation_data = [
v
for v in relation_data
if json.loads(v["application-data"]["data"])["alias"] == relation_alias
]

if len(relation_data) == 0:
raise ValueError(
f"no relation data could be grabbed on relation with endpoint {relation_name} and alias {relation_alias}"
)

return relation_data[0]["application-data"].get(key)


async def get_secret_id(ops_test, app_or_unit: Optional[str] = None) -> str:
"""Retrieve secert ID for an app or unit."""
complete_command = "list-secrets"

prefix = ""
if app_or_unit:
if app_or_unit[-1].isdigit():
# it's a unit
app_or_unit = "-".join(app_or_unit.split("/"))
prefix = "unit-"
else:
prefix = "application-"
complete_command += f" --owner {prefix}{app_or_unit}"

_, stdout, _ = await ops_test.juju(*complete_command.split())
output_lines_split = [line.split() for line in stdout.split("\n")]
if app_or_unit:
return [line[0] for line in output_lines_split if app_or_unit in line][0]
else:
return output_lines_split[1][0]


async def get_secret_content(ops_test, secret_id) -> Dict[str, str]:
"""Retrieve contents of a Juju Secret."""
secret_id = secret_id.split("/")[-1]
complete_command = f"show-secret {secret_id} --reveal --format=json"
_, stdout, _ = await ops_test.juju(*complete_command.split())
data = json.loads(stdout)
return data[secret_id]["content"]["Data"]
59 changes: 2 additions & 57 deletions tests/integration/relation_tests/new_relations/helpers.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,13 @@
#!/usr/bin/env python3
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

import json
from typing import Optional

import yaml
from pytest_operator.plugin import OpsTest
from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed


async def get_application_relation_data(
ops_test: OpsTest,
application_name: str,
relation_name: str,
key: str,
relation_id: str = None,
relation_alias: str = None,
) -> Optional[str]:
"""Get relation data for an application.
Args:
ops_test: The ops test framework instance
application_name: The name of the application
relation_name: name of the relation to get connection data from
key: key of data to be retrieved
relation_id: id of the relation to get connection data from
relation_alias: alias of the relation (like a connection name)
to get connection data from
Returns:
the that that was requested or None
if no data in the relation
Raises:
ValueError if it's not possible to get application unit data
or if there is no data for the particular relation endpoint
and/or alias.
"""
unit_name = f"{application_name}/0"
raw_data = (await ops_test.juju("show-unit", unit_name))[1]

if not raw_data:
raise ValueError(f"no unit info could be grabbed for {unit_name}")
data = yaml.safe_load(raw_data)

# Filter the data based on the relation name.
relation_data = [v for v in data[unit_name]["relation-info"] if v["endpoint"] == relation_name]

if relation_id:
# Filter the data based on the relation id.
relation_data = [v for v in relation_data if v["relation-id"] == relation_id]

if relation_alias:
# Filter the data based on the cluster/relation alias.
relation_data = [
v
for v in relation_data
if json.loads(v["application-data"]["data"])["alias"] == relation_alias
]

if len(relation_data) == 0:
raise ValueError(
f"no relation data could be grabbed on relation with endpoint {relation_name} and alias {relation_alias}"
)

return relation_data[0]["application-data"].get(key)
from tests.integration.helpers import get_application_relation_data


async def verify_application_data(
Expand Down
22 changes: 22 additions & 0 deletions tests/integration/tls_tests/helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
import logging
from datetime import datetime

import ops
Expand All @@ -20,6 +21,8 @@
INTERNAL_CERT_PATH = f"{MONGOD_CONF_DIR}/internal-ca.crt"
EXTERNAL_PEM_PATH = f"{MONGOD_CONF_DIR}/external-cert.pem"

logger = logging.getLogger(__name__)


class ProcessError(Exception):
"""Raised when a process fails."""
Expand Down Expand Up @@ -110,3 +113,22 @@ def process_systemctl_time(systemctl_output):
time_as_str = "T".join(systemctl_output.split("=")[1].split(" ")[1:3])
d = datetime.strptime(time_as_str, "%Y-%m-%dT%H:%M:%S")
return d


async def scp_file_preserve_ctime(ops_test: OpsTest, unit_name: str, path: str) -> int:
"""Returns the unix timestamp of when a file was created on a specified unit."""
# Retrieving the file
filename = path.split("/")[-1]
complete_command = f"scp --container mongod {unit_name}:{path} {filename}"
return_code, scp_output, stderr = await ops_test.juju(*complete_command.split())

if return_code != 0:
logger.error(stderr)
raise ProcessError(
"Expected command %s to succeed instead it failed: %s; %s",
complete_command,
return_code,
stderr,
)

return f"{filename}"
63 changes: 63 additions & 0 deletions tests/integration/tls_tests/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,82 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

import json

import pytest
from ops import Unit
from pytest_operator.plugin import OpsTest

from ..helpers import get_application_relation_data, get_secret_content, get_secret_id
from .helpers import (
EXTERNAL_CERT_PATH,
INTERNAL_CERT_PATH,
check_tls,
scp_file_preserve_ctime,
time_file_created,
time_process_started,
)

MONGO_COMMON_DIR = "/var/snap/charmed-mongodb/common"
TLS_CERTIFICATES_APP_NAME = "tls-certificates-operator"
TLS_RELATION_NAME = "certificates"
DATABASE_APP_NAME = "mongodb"
TLS_TEST_DATA = "tests/integration/tls_tests/data"
DB_SERVICE = "snap.charmed-mongodb.mongod.service"


async def check_certs_correctly_distributed(ops_test: OpsTest, unit: Unit) -> None:
"""Comparing expected vs distributed certificates.
Verifying certificates downloaded on the charm against the ones distributed by the TLS operator
"""
app_secret_id = await get_secret_id(ops_test, DATABASE_APP_NAME)
unit_secret_id = await get_secret_id(ops_test, unit.name)
app_secret_content = await get_secret_content(ops_test, app_secret_id)
unit_secret_content = await get_secret_content(ops_test, unit_secret_id)
app_current_crt = app_secret_content["csr-secret"]
unit_current_crt = unit_secret_content["csr-secret"]

# Get the values for certs from the relation, as provided by TLS Charm
certificates_raw_data = await get_application_relation_data(
ops_test, DATABASE_APP_NAME, TLS_RELATION_NAME, "certificates"
)
certificates_data = json.loads(certificates_raw_data)

external_item = [
data
for data in certificates_data
if data["certificate_signing_request"].rstrip() == unit_current_crt.rstrip()
][0]
internal_item = [
data
for data in certificates_data
if data["certificate_signing_request"].rstrip() == app_current_crt.rstrip()
][0]

# Get a local copy of the external cert
external_copy_path = await scp_file_preserve_ctime(ops_test, unit.name, EXTERNAL_CERT_PATH)

# Get the external cert value from the relation
relation_external_cert = "\n".join(external_item["chain"])

# CHECK: Compare if they are the same
with open(external_copy_path) as f:
external_contents_file = f.read()
assert relation_external_cert == external_contents_file

# Get a local copy of the internal cert
internal_copy_path = await scp_file_preserve_ctime(ops_test, unit.name, INTERNAL_CERT_PATH)

# Get the external cert value from the relation
relation_internal_cert = "\n".join(internal_item["chain"])

# CHECK: Compare if they are the same
with open(internal_copy_path) as f:
internal_contents_file = f.read()
assert relation_internal_cert == internal_contents_file


@pytest.mark.abort_on_fail
async def test_build_and_deploy(ops_test: OpsTest) -> None:
"""Build and deploy one unit of MongoDB and one unit of TLS."""
Expand Down Expand Up @@ -67,6 +125,7 @@ async def test_rotate_tls_key(ops_test: OpsTest) -> None:
original_tls_times[unit.name]["mongod_service"] = await time_process_started(
ops_test, unit.name, DB_SERVICE
)
check_certs_correctly_distributed(ops_test, unit)

# set external and internal key using auto-generated key for each unit
for unit in ops_test.model.applications[DATABASE_APP_NAME].units:
Expand All @@ -87,6 +146,8 @@ async def test_rotate_tls_key(ops_test: OpsTest) -> None:
new_internal_cert_time = await time_file_created(ops_test, unit.name, INTERNAL_CERT_PATH)
new_mongod_service_time = await time_process_started(ops_test, unit.name, DB_SERVICE)

check_certs_correctly_distributed(ops_test, unit)

assert (
new_external_cert_time > original_tls_times[unit.name]["external_cert"]
), f"external cert for {unit.name} was not updated."
Expand Down Expand Up @@ -165,6 +226,8 @@ async def test_set_tls_key(ops_test: OpsTest) -> None:
new_internal_cert_time = await time_file_created(ops_test, unit.name, INTERNAL_CERT_PATH)
new_mongod_service_time = await time_process_started(ops_test, unit.name, DB_SERVICE)

check_certs_correctly_distributed(ops_test, unit)

assert (
new_external_cert_time > original_tls_times[unit.name]["external_cert"]
), f"external cert for {unit.name} was not updated."
Expand Down

0 comments on commit b62aba9

Please sign in to comment.