Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-2790] Extended TLS testing (6/edge) #281

Merged
merged 1 commit into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
89 changes: 89 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

import json
from pathlib import Path
from typing import Dict, Optional

import ops
import yaml
Expand Down Expand Up @@ -100,3 +102,90 @@ async def set_password(
)
action = await action.wait()
return action.results


async def get_application_relation_data(
ops_test: OpsTest,
application_name: str,
relation_name: str,
key: str,
relation_id: str = None,
relation_alias: str = None,
) -> Optional[str]:
"""Get relation data for an application.
Args:
ops_test: The ops test framework instance
application_name: The name of the application
relation_name: name of the relation to get connection data from
key: key of data to be retrieved
relation_id: id of the relation to get connection data from
relation_alias: alias of the relation (like a connection name)
to get connection data from
Returns:
the that that was requested or None
if no data in the relation
Raises:
ValueError if it's not possible to get application unit data
or if there is no data for the particular relation endpoint
and/or alias.
"""
unit_name = f"{application_name}/0"
raw_data = (await ops_test.juju("show-unit", unit_name))[1]

if not raw_data:
raise ValueError(f"no unit info could be grabbed for {unit_name}")
data = yaml.safe_load(raw_data)

# Filter the data based on the relation name.
relation_data = [v for v in data[unit_name]["relation-info"] if v["endpoint"] == relation_name]

if relation_id:
# Filter the data based on the relation id.
relation_data = [v for v in relation_data if v["relation-id"] == relation_id]

if relation_alias:
# Filter the data based on the cluster/relation alias.
relation_data = [
v
for v in relation_data
if json.loads(v["application-data"]["data"])["alias"] == relation_alias
]

if len(relation_data) == 0:
raise ValueError(
f"no relation data could be grabbed on relation with endpoint {relation_name} and alias {relation_alias}"
)

return relation_data[0]["application-data"].get(key)


async def get_secret_id(ops_test, app_or_unit: Optional[str] = None) -> str:
"""Retrieve secert ID for an app or unit."""
complete_command = "list-secrets"

prefix = ""
if app_or_unit:
if app_or_unit[-1].isdigit():
# it's a unit
app_or_unit = "-".join(app_or_unit.split("/"))
prefix = "unit-"
else:
prefix = "application-"
complete_command += f" --owner {prefix}{app_or_unit}"

_, stdout, _ = await ops_test.juju(*complete_command.split())
output_lines_split = [line.split() for line in stdout.split("\n")]
if app_or_unit:
return [line[0] for line in output_lines_split if app_or_unit in line][0]
else:
return output_lines_split[1][0]


async def get_secret_content(ops_test, secret_id) -> Dict[str, str]:
"""Retrieve contents of a Juju Secret."""
secret_id = secret_id.split("/")[-1]
complete_command = f"show-secret {secret_id} --reveal --format=json"
_, stdout, _ = await ops_test.juju(*complete_command.split())
data = json.loads(stdout)
return data[secret_id]["content"]["Data"]
59 changes: 2 additions & 57 deletions tests/integration/relation_tests/new_relations/helpers.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,13 @@
#!/usr/bin/env python3
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

import json
from typing import Optional

import yaml
from pytest_operator.plugin import OpsTest
from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed


async def get_application_relation_data(
ops_test: OpsTest,
application_name: str,
relation_name: str,
key: str,
relation_id: str = None,
relation_alias: str = None,
) -> Optional[str]:
"""Get relation data for an application.
Args:
ops_test: The ops test framework instance
application_name: The name of the application
relation_name: name of the relation to get connection data from
key: key of data to be retrieved
relation_id: id of the relation to get connection data from
relation_alias: alias of the relation (like a connection name)
to get connection data from
Returns:
the that that was requested or None
if no data in the relation
Raises:
ValueError if it's not possible to get application unit data
or if there is no data for the particular relation endpoint
and/or alias.
"""
unit_name = f"{application_name}/0"
raw_data = (await ops_test.juju("show-unit", unit_name))[1]

if not raw_data:
raise ValueError(f"no unit info could be grabbed for {unit_name}")
data = yaml.safe_load(raw_data)

# Filter the data based on the relation name.
relation_data = [v for v in data[unit_name]["relation-info"] if v["endpoint"] == relation_name]

if relation_id:
# Filter the data based on the relation id.
relation_data = [v for v in relation_data if v["relation-id"] == relation_id]

if relation_alias:
# Filter the data based on the cluster/relation alias.
relation_data = [
v
for v in relation_data
if json.loads(v["application-data"]["data"])["alias"] == relation_alias
]

if len(relation_data) == 0:
raise ValueError(
f"no relation data could be grabbed on relation with endpoint {relation_name} and alias {relation_alias}"
)

return relation_data[0]["application-data"].get(key)
from tests.integration.helpers import get_application_relation_data


async def verify_application_data(
Expand Down
22 changes: 22 additions & 0 deletions tests/integration/tls_tests/helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
import logging
from datetime import datetime

import ops
Expand All @@ -20,6 +21,8 @@
INTERNAL_CERT_PATH = f"{MONGOD_CONF_DIR}/internal-ca.crt"
EXTERNAL_PEM_PATH = f"{MONGOD_CONF_DIR}/external-cert.pem"

logger = logging.getLogger(__name__)


class ProcessError(Exception):
"""Raised when a process fails."""
Expand Down Expand Up @@ -110,3 +113,22 @@ def process_systemctl_time(systemctl_output):
time_as_str = "T".join(systemctl_output.split("=")[1].split(" ")[1:3])
d = datetime.strptime(time_as_str, "%Y-%m-%dT%H:%M:%S")
return d


async def scp_file_preserve_ctime(ops_test: OpsTest, unit_name: str, path: str) -> int:
"""Returns the unix timestamp of when a file was created on a specified unit."""
# Retrieving the file
filename = path.split("/")[-1]
complete_command = f"scp --container mongod {unit_name}:{path} {filename}"
return_code, scp_output, stderr = await ops_test.juju(*complete_command.split())

if return_code != 0:
logger.error(stderr)
raise ProcessError(
"Expected command %s to succeed instead it failed: %s; %s",
complete_command,
return_code,
stderr,
)

return f"{filename}"
63 changes: 63 additions & 0 deletions tests/integration/tls_tests/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,82 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

import json

import pytest
from ops import Unit
from pytest_operator.plugin import OpsTest

from ..helpers import get_application_relation_data, get_secret_content, get_secret_id
from .helpers import (
EXTERNAL_CERT_PATH,
INTERNAL_CERT_PATH,
check_tls,
scp_file_preserve_ctime,
time_file_created,
time_process_started,
)

MONGO_COMMON_DIR = "/var/snap/charmed-mongodb/common"
TLS_CERTIFICATES_APP_NAME = "tls-certificates-operator"
TLS_RELATION_NAME = "certificates"
DATABASE_APP_NAME = "mongodb"
TLS_TEST_DATA = "tests/integration/tls_tests/data"
DB_SERVICE = "snap.charmed-mongodb.mongod.service"


async def check_certs_correctly_distributed(ops_test: OpsTest, unit: Unit) -> None:
"""Comparing expected vs distributed certificates.
Verifying certificates downloaded on the charm against the ones distributed by the TLS operator
"""
app_secret_id = await get_secret_id(ops_test, DATABASE_APP_NAME)
unit_secret_id = await get_secret_id(ops_test, unit.name)
app_secret_content = await get_secret_content(ops_test, app_secret_id)
unit_secret_content = await get_secret_content(ops_test, unit_secret_id)
app_current_crt = app_secret_content["csr-secret"]
unit_current_crt = unit_secret_content["csr-secret"]

# Get the values for certs from the relation, as provided by TLS Charm
certificates_raw_data = await get_application_relation_data(
ops_test, DATABASE_APP_NAME, TLS_RELATION_NAME, "certificates"
)
certificates_data = json.loads(certificates_raw_data)

external_item = [
data
for data in certificates_data
if data["certificate_signing_request"].rstrip() == unit_current_crt.rstrip()
][0]
internal_item = [
data
for data in certificates_data
if data["certificate_signing_request"].rstrip() == app_current_crt.rstrip()
][0]

# Get a local copy of the external cert
external_copy_path = await scp_file_preserve_ctime(ops_test, unit.name, EXTERNAL_CERT_PATH)

# Get the external cert value from the relation
relation_external_cert = "\n".join(external_item["chain"])

# CHECK: Compare if they are the same
with open(external_copy_path) as f:
external_contents_file = f.read()
assert relation_external_cert == external_contents_file

# Get a local copy of the internal cert
internal_copy_path = await scp_file_preserve_ctime(ops_test, unit.name, INTERNAL_CERT_PATH)

# Get the external cert value from the relation
relation_internal_cert = "\n".join(internal_item["chain"])

# CHECK: Compare if they are the same
with open(internal_copy_path) as f:
internal_contents_file = f.read()
assert relation_internal_cert == internal_contents_file


@pytest.mark.abort_on_fail
async def test_build_and_deploy(ops_test: OpsTest) -> None:
"""Build and deploy one unit of MongoDB and one unit of TLS."""
Expand Down Expand Up @@ -67,6 +125,7 @@ async def test_rotate_tls_key(ops_test: OpsTest) -> None:
original_tls_times[unit.name]["mongod_service"] = await time_process_started(
ops_test, unit.name, DB_SERVICE
)
check_certs_correctly_distributed(ops_test, unit)

# set external and internal key using auto-generated key for each unit
for unit in ops_test.model.applications[DATABASE_APP_NAME].units:
Expand All @@ -87,6 +146,8 @@ async def test_rotate_tls_key(ops_test: OpsTest) -> None:
new_internal_cert_time = await time_file_created(ops_test, unit.name, INTERNAL_CERT_PATH)
new_mongod_service_time = await time_process_started(ops_test, unit.name, DB_SERVICE)

check_certs_correctly_distributed(ops_test, unit)

assert (
new_external_cert_time > original_tls_times[unit.name]["external_cert"]
), f"external cert for {unit.name} was not updated."
Expand Down Expand Up @@ -165,6 +226,8 @@ async def test_set_tls_key(ops_test: OpsTest) -> None:
new_internal_cert_time = await time_file_created(ops_test, unit.name, INTERNAL_CERT_PATH)
new_mongod_service_time = await time_process_started(ops_test, unit.name, DB_SERVICE)

check_certs_correctly_distributed(ops_test, unit)

assert (
new_external_cert_time > original_tls_times[unit.name]["external_cert"]
), f"external cert for {unit.name} was not updated."
Expand Down