Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: improves readability of the SSP end to end tests #198

Merged
merged 4 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions tests/e2e/e2e_testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,19 @@ def build_test_command(

def invoke_command(
self, command: List[str], working_dir: Optional[pathlib.Path] = None
) -> Tuple[int, str]:
) -> Tuple[int, str, str]:
"""
Invoke a command in the e2e test.

Args:
command (str): Command to run in the shell

Returns:
Tuple[int, str]: Return code and stdout of the command
Tuple[int, str]: Return code, stdout, stderr of the command
"""
result = subprocess.run(command, cwd=working_dir, capture_output=True)
return result.returncode, result.stdout.decode("utf-8")
return (
result.returncode,
result.stdout.decode("utf-8"),
result.stderr.decode("utf-8"),
)
25 changes: 12 additions & 13 deletions tests/e2e/test_e2e_compdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,20 @@ def test_rules_transform_e2e(
command: List[str] = e2e_runner.build_test_command(
tmp_repo_str, "rules-transform", command_args
)
exit_code, response_stdout = e2e_runner.invoke_command(command)
exit_code, response_stdout, _ = e2e_runner.invoke_command(command)
assert exit_code == SUCCESS_EXIT_CODE

# Check that the component definition was created
if exit_code == SUCCESS_EXIT_CODE:
if "skip-items" in command_args:
assert f"input: {test_comp_name}.csv" not in response_stdout
else:
comp_path: pathlib.Path = ModelUtils.get_model_path_for_name_and_class(
tmp_repo_path, test_comp_name, ComponentDefinition, FileContentType.JSON
)
assert comp_path.exists()
assert f"input: {test_comp_name}.csv" in response_stdout
branch = command_args["branch"]
assert f"Changes pushed to {branch} successfully." in response_stdout
if "skip-items" in command_args:
assert f"input: {test_comp_name}.csv" not in response_stdout
else:
comp_path: pathlib.Path = ModelUtils.get_model_path_for_name_and_class(
tmp_repo_path, test_comp_name, ComponentDefinition, FileContentType.JSON
)
assert comp_path.exists()
assert f"input: {test_comp_name}.csv" in response_stdout
branch = command_args["branch"]
assert f"Changes pushed to {branch} successfully." in response_stdout


@pytest.mark.slow
Expand Down Expand Up @@ -138,7 +137,7 @@ def test_create_cd_e2e(
load_from_json(tmp_repo_path, test_filter_prof, test_filter_prof, Profile)

command = e2e_runner.build_test_command(tmp_repo_str, "create-cd", command_args)
exit_code, _ = e2e_runner.invoke_command(command, tmp_repo_path)
exit_code, _, _ = e2e_runner.invoke_command(command, tmp_repo_path)
assert exit_code == SUCCESS_EXIT_CODE

# Check that all expected files were created
Expand Down
269 changes: 140 additions & 129 deletions tests/e2e/test_e2e_ssp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import logging
import os
import pathlib
import subprocess
from typing import Dict, Tuple

import pytest
Expand All @@ -44,142 +43,154 @@
test_ssp_name = "test_ssp"


@pytest.fixture
def valid_args_dict() -> Dict[str, str]:
return {
"branch": "test",
"markdown-path": test_ssp_md,
"oscal-model": "ssp",
"committer-name": "test",
"committer-email": "[email protected]",
"ssp-index": "ssp-index.json",
}


def replace_line_in_file_after_tag(
file_path: pathlib.Path, tag: str, new_line: str
) -> bool:
"""Replace the line after tag with new string."""
with file_path.open("r") as f:
lines = f.readlines()

for i, line in enumerate(lines):
if tag in line:
lines[i + 1] = new_line

with file_path.open("w") as f:
f.writelines(lines)
return True
return False


@pytest.mark.slow
@pytest.mark.parametrize(
"test_name, command_args, response, skip_create",
[
(
"success/happy path",
{
"branch": "test",
"markdown-path": test_ssp_md,
"oscal-model": "ssp",
"committer-name": "test",
"committer-email": "[email protected]",
"ssp-index": "ssp-index.json",
},
SUCCESS_EXIT_CODE,
False,
),
(
"failure/missing-ssp-index",
{
"branch": "test",
"markdown-path": test_ssp_md,
"oscal-model": "ssp",
"committer-name": "test",
"committer-email": "[email protected]",
},
ERROR_EXIT_CODE,
True,
),
],
)
def test_ssp_editing_e2e(
tmp_repo: Tuple[str, Repo],
e2e_runner: E2ETestRunner,
test_name: str,
command_args: Dict[str, str],
response: int,
skip_create: bool,
valid_args_dict: Dict[str, str],
) -> None:
"""Test the trestlebot autosync command with SSPs."""
logger.info(f"Running test: {test_name}")
tmp_repo_str, _ = tmp_repo
tmp_repo_path = pathlib.Path(tmp_repo_str)

ssp_md_path = pathlib.Path(test_ssp_md) / test_ssp_name
_ = setup_for_ssp(tmp_repo_path, test_prof, [test_comp_name], str(ssp_md_path))

# Get command arguments for the test
branch = valid_args_dict["branch"]
markdown_path = valid_args_dict["markdown-path"]
committer_name = valid_args_dict["committer-name"]
committer_email = valid_args_dict["committer-email"]

create_args: Dict[str, str] = {
"markdown-path": markdown_path,
"branch": branch,
"committer-name": committer_name,
"committer-email": committer_email,
"ssp-name": test_ssp_name,
"profile-name": test_prof,
"compdefs": test_comp_name,
}
create_command = e2e_runner.build_test_command(
tmp_repo_str,
"create-ssp",
create_args,
)
exit_code, _, _ = e2e_runner.invoke_command(create_command)
assert exit_code == SUCCESS_EXIT_CODE
assert (tmp_repo_path / markdown_path).exists()

# Check that the correct files are present with the correct content
ssp_path = ModelUtils.get_model_path_for_name_and_class(
tmp_repo_path, test_ssp_name, SystemSecurityPlan, FileContentType.JSON
)
index_path = os.path.join(tmp_repo_str, "ssp-index.json")
ssp_index = SSPIndex(index_path)
assert ssp_index.get_profile_by_ssp(test_ssp_name) == test_prof
assert ssp_index.get_comps_by_ssp(test_ssp_name) == [test_comp_name]
assert ssp_index.get_leveraged_by_ssp(test_ssp_name) is None
assert ssp_path.exists()

# Make a change to the SSP
ac_1_path = tmp_repo_path / ssp_md_path / "ac" / "ac-1.md"
assert replace_line_in_file_after_tag(
ac_1_path, "ac-1_prm_6:", " values:\n ssp-values:\n - my ssp val\n"
)

autosync_command = e2e_runner.build_test_command(
tmp_repo_str, "autosync", valid_args_dict
)
exit_code, response_stdout, _ = e2e_runner.invoke_command(autosync_command)
assert exit_code == SUCCESS_EXIT_CODE
# Check that the ssp was pushed to the remote
assert f"Changes pushed to {branch} successfully." in response_stdout

# Check that if run again, the ssp is not pushed again
exit_code, response_stdout, _ = e2e_runner.invoke_command(autosync_command)
assert exit_code == SUCCESS_EXIT_CODE
assert "Nothing to commit" in response_stdout

# Check that if the upstream profile is updated, the ssp is updated
local_upstream_path = prepare_upstream_repo()
upstream_repos_arg = f"{e2e_runner.UPSTREAM_REPO}@main"
upstream_command_args = {
"branch": branch,
"committer-name": committer_name,
"committer-email": committer_email,
"sources": upstream_repos_arg,
}
sync_upstreams_command = e2e_runner.build_test_command(
tmp_repo_str,
"sync-upstreams",
upstream_command_args,
local_upstream_path,
)
exit_code, response_stdout, _ = e2e_runner.invoke_command(sync_upstreams_command)
assert exit_code == SUCCESS_EXIT_CODE
assert f"Changes pushed to {branch} successfully." in response_stdout

# Autosync again to check that the ssp is updated
exit_code, response_stdout, _ = e2e_runner.invoke_command(autosync_command)
assert exit_code == SUCCESS_EXIT_CODE
assert f"Changes pushed to {branch} successfully." in response_stdout

# Clean up the upstream repo
clean(local_upstream_path, None)


@pytest.mark.slow
def test_ssp_e2e_editing_failure(
tmp_repo: Tuple[str, Repo],
e2e_runner: E2ETestRunner,
valid_args_dict: Dict[str, str],
) -> None:
"""
Test the trestlebot autosync command with SSPs with failure.

Notes: The test should fail because of the missing entry in the ssp-index.
This simulates the use case if an SSP is created outside of the tool.
"""
tmp_repo_str, _ = tmp_repo
tmp_repo_path = pathlib.Path(tmp_repo_str)

args = setup_for_ssp(tmp_repo_path, test_prof, [test_comp_name], test_ssp_md)

# Create or generate the SSP
if not skip_create:
create_args: Dict[str, str] = {
"markdown-path": command_args["markdown-path"],
"branch": command_args["branch"],
"committer-name": command_args["committer-name"],
"committer-email": command_args["committer-email"],
"ssp-name": test_ssp_name,
"profile-name": test_prof,
"compdefs": test_comp_name,
}
command = e2e_runner.build_test_command(
tmp_repo_str,
"create-ssp",
create_args,
)
exit_code, _ = e2e_runner.invoke_command(command)
assert exit_code == response
assert (tmp_repo_path / command_args["markdown-path"]).exists()

# Make a change to the SSP
ssp, ssp_path = ModelUtils.load_model_for_class(
tmp_repo_path,
test_ssp_name,
SystemSecurityPlan,
FileContentType.JSON,
)
ssp.metadata.title = "New Title"
ssp.oscal_write(ssp_path)
else:
ssp_generate = SSPGenerate()
assert ssp_generate._run(args) == 0

command = e2e_runner.build_test_command(tmp_repo_str, "autosync", command_args)
run_response = subprocess.run(command, capture_output=True)
assert run_response.returncode == response
ssp_md_path = pathlib.Path(test_ssp_md) / test_ssp_name
args = setup_for_ssp(tmp_repo_path, test_prof, [test_comp_name], str(ssp_md_path))

# Check that the ssp was pushed to the remote
if response == SUCCESS_EXIT_CODE:
branch = command_args["branch"]
assert (
f"Changes pushed to {branch} successfully."
in run_response.stdout.decode("utf-8")
)

# Check that the correct files are present with the correct content
index_path = os.path.join(tmp_repo_str, "ssp-index.json")
ssp_index = SSPIndex(index_path)
assert ssp_index.get_profile_by_ssp(test_ssp_name) == test_prof
assert ssp_index.get_comps_by_ssp(test_ssp_name) == [test_comp_name]
assert ssp_index.get_leveraged_by_ssp(test_ssp_name) is None
assert ssp_path.exists()

# Check that if run again, the ssp is not pushed again
command = e2e_runner.build_test_command(tmp_repo_str, "autosync", command_args)
exit_code, response_stdout = e2e_runner.invoke_command(command)
assert exit_code == SUCCESS_EXIT_CODE
assert "Nothing to commit" in response_stdout

# Check that if the upstream profile is updated, the ssp is updated
local_upstream_path = prepare_upstream_repo()
upstream_repos_arg = f"{e2e_runner.UPSTREAM_REPO}@main"
upstream_command_args = {
"branch": command_args["branch"],
"committer-name": command_args["committer-name"],
"committer-email": command_args["committer-email"],
"sources": upstream_repos_arg,
}
command = e2e_runner.build_test_command(
tmp_repo_str,
"sync-upstreams",
upstream_command_args,
local_upstream_path,
)
exit_code, response_stdout = e2e_runner.invoke_command(command)
assert exit_code == SUCCESS_EXIT_CODE
assert (
f"Changes pushed to {command_args['branch']} successfully."
in run_response.stdout.decode("utf-8")
)

# Autosync again to check that the ssp is updated
command = e2e_runner.build_test_command(tmp_repo_str, "autosync", command_args)
exit_code, response_stdout = e2e_runner.invoke_command(command)
assert exit_code == SUCCESS_EXIT_CODE
assert (
f"Changes pushed to {command_args['branch']} successfully."
in response_stdout
)

# Clean up the upstream repo
clean(local_upstream_path, None)
ssp_generate = SSPGenerate()
assert ssp_generate._run(args) == 0

autosync_command = e2e_runner.build_test_command(
tmp_repo_str, "autosync", valid_args_dict
)
exit_code, _, response_stderr = e2e_runner.invoke_command(autosync_command)
assert exit_code == ERROR_EXIT_CODE
assert "SSP test_ssp does not exists in the index" in response_stderr
Loading