diff --git a/tests/e2e/e2e_testutils.py b/tests/e2e/e2e_testutils.py index d5ac7f52..77c0efb9 100644 --- a/tests/e2e/e2e_testutils.py +++ b/tests/e2e/e2e_testutils.py @@ -166,7 +166,7 @@ def build_test_command( def invoke_command( self, command: List[str], working_dir: Optional[pathlib.Path] = None - ) -> Tuple[int, str]: + ) -> Tuple[int, str, str]: """ Invoke a command in the e2e test. @@ -174,7 +174,11 @@ def invoke_command( command (str): Command to run in the shell Returns: - Tuple[int, str]: Return code and stdout of the command + Tuple[int, str]: Return code, stdout, stderr of the command """ result = subprocess.run(command, cwd=working_dir, capture_output=True) - return result.returncode, result.stdout.decode("utf-8") + return ( + result.returncode, + result.stdout.decode("utf-8"), + result.stderr.decode("utf-8"), + ) diff --git a/tests/e2e/test_e2e_compdef.py b/tests/e2e/test_e2e_compdef.py index b25622be..c3630f27 100644 --- a/tests/e2e/test_e2e_compdef.py +++ b/tests/e2e/test_e2e_compdef.py @@ -71,21 +71,20 @@ def test_rules_transform_e2e( command: List[str] = e2e_runner.build_test_command( tmp_repo_str, "rules-transform", command_args ) - exit_code, response_stdout = e2e_runner.invoke_command(command) + exit_code, response_stdout, _ = e2e_runner.invoke_command(command) assert exit_code == SUCCESS_EXIT_CODE # Check that the component definition was created - if exit_code == SUCCESS_EXIT_CODE: - if "skip-items" in command_args: - assert f"input: {test_comp_name}.csv" not in response_stdout - else: - comp_path: pathlib.Path = ModelUtils.get_model_path_for_name_and_class( - tmp_repo_path, test_comp_name, ComponentDefinition, FileContentType.JSON - ) - assert comp_path.exists() - assert f"input: {test_comp_name}.csv" in response_stdout - branch = command_args["branch"] - assert f"Changes pushed to {branch} successfully." in response_stdout + if "skip-items" in command_args: + assert f"input: {test_comp_name}.csv" not in response_stdout + else: + comp_path: pathlib.Path = ModelUtils.get_model_path_for_name_and_class( + tmp_repo_path, test_comp_name, ComponentDefinition, FileContentType.JSON + ) + assert comp_path.exists() + assert f"input: {test_comp_name}.csv" in response_stdout + branch = command_args["branch"] + assert f"Changes pushed to {branch} successfully." in response_stdout @pytest.mark.slow @@ -138,7 +137,7 @@ def test_create_cd_e2e( load_from_json(tmp_repo_path, test_filter_prof, test_filter_prof, Profile) command = e2e_runner.build_test_command(tmp_repo_str, "create-cd", command_args) - exit_code, _ = e2e_runner.invoke_command(command, tmp_repo_path) + exit_code, _, _ = e2e_runner.invoke_command(command, tmp_repo_path) assert exit_code == SUCCESS_EXIT_CODE # Check that all expected files were created diff --git a/tests/e2e/test_e2e_ssp.py b/tests/e2e/test_e2e_ssp.py index 06695cb9..6406e9b6 100644 --- a/tests/e2e/test_e2e_ssp.py +++ b/tests/e2e/test_e2e_ssp.py @@ -19,7 +19,6 @@ import logging import os import pathlib -import subprocess from typing import Dict, Tuple import pytest @@ -44,142 +43,154 @@ test_ssp_name = "test_ssp" +@pytest.fixture +def valid_args_dict() -> Dict[str, str]: + return { + "branch": "test", + "markdown-path": test_ssp_md, + "oscal-model": "ssp", + "committer-name": "test", + "committer-email": "test@email.com", + "ssp-index": "ssp-index.json", + } + + +def replace_line_in_file_after_tag( + file_path: pathlib.Path, tag: str, new_line: str +) -> bool: + """Replace the line after tag with new string.""" + with file_path.open("r") as f: + lines = f.readlines() + + for i, line in enumerate(lines): + if tag in line: + lines[i + 1] = new_line + + with file_path.open("w") as f: + f.writelines(lines) + return True + return False + + @pytest.mark.slow -@pytest.mark.parametrize( - "test_name, command_args, response, skip_create", - [ - ( - "success/happy path", - { - "branch": "test", - "markdown-path": test_ssp_md, - "oscal-model": "ssp", - "committer-name": "test", - "committer-email": "test@email.com", - "ssp-index": "ssp-index.json", - }, - SUCCESS_EXIT_CODE, - False, - ), - ( - "failure/missing-ssp-index", - { - "branch": "test", - "markdown-path": test_ssp_md, - "oscal-model": "ssp", - "committer-name": "test", - "committer-email": "test@email.com", - }, - ERROR_EXIT_CODE, - True, - ), - ], -) def test_ssp_editing_e2e( tmp_repo: Tuple[str, Repo], e2e_runner: E2ETestRunner, - test_name: str, - command_args: Dict[str, str], - response: int, - skip_create: bool, + valid_args_dict: Dict[str, str], ) -> None: """Test the trestlebot autosync command with SSPs.""" - logger.info(f"Running test: {test_name}") + tmp_repo_str, _ = tmp_repo + tmp_repo_path = pathlib.Path(tmp_repo_str) + + ssp_md_path = pathlib.Path(test_ssp_md) / test_ssp_name + _ = setup_for_ssp(tmp_repo_path, test_prof, [test_comp_name], str(ssp_md_path)) + + # Get command arguments for the test + branch = valid_args_dict["branch"] + markdown_path = valid_args_dict["markdown-path"] + committer_name = valid_args_dict["committer-name"] + committer_email = valid_args_dict["committer-email"] + + create_args: Dict[str, str] = { + "markdown-path": markdown_path, + "branch": branch, + "committer-name": committer_name, + "committer-email": committer_email, + "ssp-name": test_ssp_name, + "profile-name": test_prof, + "compdefs": test_comp_name, + } + create_command = e2e_runner.build_test_command( + tmp_repo_str, + "create-ssp", + create_args, + ) + exit_code, _, _ = e2e_runner.invoke_command(create_command) + assert exit_code == SUCCESS_EXIT_CODE + assert (tmp_repo_path / markdown_path).exists() + + # Check that the correct files are present with the correct content + ssp_path = ModelUtils.get_model_path_for_name_and_class( + tmp_repo_path, test_ssp_name, SystemSecurityPlan, FileContentType.JSON + ) + index_path = os.path.join(tmp_repo_str, "ssp-index.json") + ssp_index = SSPIndex(index_path) + assert ssp_index.get_profile_by_ssp(test_ssp_name) == test_prof + assert ssp_index.get_comps_by_ssp(test_ssp_name) == [test_comp_name] + assert ssp_index.get_leveraged_by_ssp(test_ssp_name) is None + assert ssp_path.exists() + + # Make a change to the SSP + ac_1_path = tmp_repo_path / ssp_md_path / "ac" / "ac-1.md" + assert replace_line_in_file_after_tag( + ac_1_path, "ac-1_prm_6:", " values:\n ssp-values:\n - my ssp val\n" + ) + + autosync_command = e2e_runner.build_test_command( + tmp_repo_str, "autosync", valid_args_dict + ) + exit_code, response_stdout, _ = e2e_runner.invoke_command(autosync_command) + assert exit_code == SUCCESS_EXIT_CODE + # Check that the ssp was pushed to the remote + assert f"Changes pushed to {branch} successfully." in response_stdout + + # Check that if run again, the ssp is not pushed again + exit_code, response_stdout, _ = e2e_runner.invoke_command(autosync_command) + assert exit_code == SUCCESS_EXIT_CODE + assert "Nothing to commit" in response_stdout + + # Check that if the upstream profile is updated, the ssp is updated + local_upstream_path = prepare_upstream_repo() + upstream_repos_arg = f"{e2e_runner.UPSTREAM_REPO}@main" + upstream_command_args = { + "branch": branch, + "committer-name": committer_name, + "committer-email": committer_email, + "sources": upstream_repos_arg, + } + sync_upstreams_command = e2e_runner.build_test_command( + tmp_repo_str, + "sync-upstreams", + upstream_command_args, + local_upstream_path, + ) + exit_code, response_stdout, _ = e2e_runner.invoke_command(sync_upstreams_command) + assert exit_code == SUCCESS_EXIT_CODE + assert f"Changes pushed to {branch} successfully." in response_stdout + + # Autosync again to check that the ssp is updated + exit_code, response_stdout, _ = e2e_runner.invoke_command(autosync_command) + assert exit_code == SUCCESS_EXIT_CODE + assert f"Changes pushed to {branch} successfully." in response_stdout + + # Clean up the upstream repo + clean(local_upstream_path, None) + +@pytest.mark.slow +def test_ssp_e2e_editing_failure( + tmp_repo: Tuple[str, Repo], + e2e_runner: E2ETestRunner, + valid_args_dict: Dict[str, str], +) -> None: + """ + Test the trestlebot autosync command with SSPs with failure. + + Notes: The test should fail because of the missing entry in the ssp-index. + This simulates the use case if an SSP is created outside of the tool. + """ tmp_repo_str, _ = tmp_repo tmp_repo_path = pathlib.Path(tmp_repo_str) - args = setup_for_ssp(tmp_repo_path, test_prof, [test_comp_name], test_ssp_md) - - # Create or generate the SSP - if not skip_create: - create_args: Dict[str, str] = { - "markdown-path": command_args["markdown-path"], - "branch": command_args["branch"], - "committer-name": command_args["committer-name"], - "committer-email": command_args["committer-email"], - "ssp-name": test_ssp_name, - "profile-name": test_prof, - "compdefs": test_comp_name, - } - command = e2e_runner.build_test_command( - tmp_repo_str, - "create-ssp", - create_args, - ) - exit_code, _ = e2e_runner.invoke_command(command) - assert exit_code == response - assert (tmp_repo_path / command_args["markdown-path"]).exists() - - # Make a change to the SSP - ssp, ssp_path = ModelUtils.load_model_for_class( - tmp_repo_path, - test_ssp_name, - SystemSecurityPlan, - FileContentType.JSON, - ) - ssp.metadata.title = "New Title" - ssp.oscal_write(ssp_path) - else: - ssp_generate = SSPGenerate() - assert ssp_generate._run(args) == 0 - - command = e2e_runner.build_test_command(tmp_repo_str, "autosync", command_args) - run_response = subprocess.run(command, capture_output=True) - assert run_response.returncode == response + ssp_md_path = pathlib.Path(test_ssp_md) / test_ssp_name + args = setup_for_ssp(tmp_repo_path, test_prof, [test_comp_name], str(ssp_md_path)) - # Check that the ssp was pushed to the remote - if response == SUCCESS_EXIT_CODE: - branch = command_args["branch"] - assert ( - f"Changes pushed to {branch} successfully." - in run_response.stdout.decode("utf-8") - ) - - # Check that the correct files are present with the correct content - index_path = os.path.join(tmp_repo_str, "ssp-index.json") - ssp_index = SSPIndex(index_path) - assert ssp_index.get_profile_by_ssp(test_ssp_name) == test_prof - assert ssp_index.get_comps_by_ssp(test_ssp_name) == [test_comp_name] - assert ssp_index.get_leveraged_by_ssp(test_ssp_name) is None - assert ssp_path.exists() - - # Check that if run again, the ssp is not pushed again - command = e2e_runner.build_test_command(tmp_repo_str, "autosync", command_args) - exit_code, response_stdout = e2e_runner.invoke_command(command) - assert exit_code == SUCCESS_EXIT_CODE - assert "Nothing to commit" in response_stdout - - # Check that if the upstream profile is updated, the ssp is updated - local_upstream_path = prepare_upstream_repo() - upstream_repos_arg = f"{e2e_runner.UPSTREAM_REPO}@main" - upstream_command_args = { - "branch": command_args["branch"], - "committer-name": command_args["committer-name"], - "committer-email": command_args["committer-email"], - "sources": upstream_repos_arg, - } - command = e2e_runner.build_test_command( - tmp_repo_str, - "sync-upstreams", - upstream_command_args, - local_upstream_path, - ) - exit_code, response_stdout = e2e_runner.invoke_command(command) - assert exit_code == SUCCESS_EXIT_CODE - assert ( - f"Changes pushed to {command_args['branch']} successfully." - in run_response.stdout.decode("utf-8") - ) - - # Autosync again to check that the ssp is updated - command = e2e_runner.build_test_command(tmp_repo_str, "autosync", command_args) - exit_code, response_stdout = e2e_runner.invoke_command(command) - assert exit_code == SUCCESS_EXIT_CODE - assert ( - f"Changes pushed to {command_args['branch']} successfully." - in response_stdout - ) - - # Clean up the upstream repo - clean(local_upstream_path, None) + ssp_generate = SSPGenerate() + assert ssp_generate._run(args) == 0 + + autosync_command = e2e_runner.build_test_command( + tmp_repo_str, "autosync", valid_args_dict + ) + exit_code, _, response_stderr = e2e_runner.invoke_command(autosync_command) + assert exit_code == ERROR_EXIT_CODE + assert "SSP test_ssp does not exists in the index" in response_stderr