diff --git a/.flake8 b/.flake8 index bebbaeb3..0438b81d 100644 --- a/.flake8 +++ b/.flake8 @@ -1,2 +1,2 @@ [flake8] -max-line-length=100 \ No newline at end of file +max-line-length=105 \ No newline at end of file diff --git a/action.yml b/action.yml index f89b75a2..9a003e82 100644 --- a/action.yml +++ b/action.yml @@ -6,13 +6,29 @@ inputs: markdown_path: description: Path relative to the repository path where the Trestle markdown files are located. See project README.md for more information. required: true - assemble_model: + oscal_model: description: OSCAL Model type to assemble. Values can be catalog, profile, compdef, or ssp. required: true + check_only: + description: "Runs tasks and exits with an error if there is a diff. Defaults to false" + required: false + default: false + skip_assemble: + description: "Skip assembly task. Defaults to false" + required: false + default: false + skip_regenerate: + description: "Skip regenerate task. Defaults to false." + required: false + default: false + skip_items: + description: "Comma-separated list of content by Trestle name to skip during task execution. For example `profile_x,profile_y`." + required: false + default: "" ssp_index_path: description: Path relative to the repository path where the ssp index is located. See project README.md for information about the ssp index. required: false - default: "ssp-index.txt" + default: "ssp-index.json" commit_message: description: Commit message required: false @@ -22,7 +38,7 @@ inputs: required: false default: ${{ github.ref_name }} file_pattern: - description: File pattern used for `git add`. For example `component-definitions/*`. Defaults to (`.`) + description: Comma separated file pattern list used for `git add`. For example `component-definitions/*,*json`. Defaults to (`.`) required: false default: '.' repository: diff --git a/entrypoint.sh b/entrypoint.sh index 281ad4ef..2b845bcf 100644 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -23,18 +23,37 @@ exec 3>&1 trap exec 3>&- EXIT -output=$(python3.8 -m trestlebot \ - --markdown-path="${INPUT_MARKDOWN_PATH}" \ - --assemble-model="${INPUT_ASSEMBLE_MODEL}" \ - --ssp-index-path="${INPUT_SSP_INDEX_PATH}" \ - --commit-message="${INPUT_COMMIT_MESSAGE}" \ - --branch="${INPUT_BRANCH}" \ - --patterns="${INPUT_FILE_PATTERN}" \ - --committer-name="${INPUT_COMMIT_USER_NAME}" \ - --committer-email="${INPUT_COMMIT_USER_EMAIL}" \ - --author-name="${INPUT_COMMIT_AUTHOR_NAME}" \ - --author-email="${INPUT_COMMIT_AUTHOR_EMAIL}" \ - --working-dir="${INPUT_REPOSITORY}" | tee /dev/fd/3) + +# Initialize the command variable +command="python3.8 -m trestlebot \ + --markdown-path=\"${INPUT_MARKDOWN_PATH}\" \ + --oscal-model=\"${INPUT_OSCAL_MODEL}\" \ + --ssp-index-path=\"${INPUT_SSP_INDEX_PATH}\" \ + --commit-message=\"${INPUT_COMMIT_MESSAGE}\" \ + --branch=\"${INPUT_BRANCH}\" \ + --file-patterns=\"${INPUT_FILE_PATTERN}\" \ + --committer-name=\"${INPUT_COMMIT_USER_NAME}\" \ + --committer-email=\"${INPUT_COMMIT_USER_EMAIL}\" \ + --author-name=\"${INPUT_COMMIT_AUTHOR_NAME}\" \ + --author-email=\"${INPUT_COMMIT_AUTHOR_EMAIL}\" \ + --working-dir=\"${INPUT_REPOSITORY}\" \ + --skip-items=\"${INPUT_SKIP_ITEMS}\"" + +# Conditionally include flags +if [[ ${INPUT_SKIP_ASSEMBLE} == true ]]; then + command+=" --skip-assemble" +fi + +if [[ ${INPUT_SKIP_REGENERATE} == true ]]; then + command+=" --skip-regenerate" +fi + +if [[ ${INPUT_CHECK_ONLY} == true ]]; then + command+=" --check-only" +fi + +output=$(eval "$command" 2>&1 > >(tee /dev/fd/3)) + commit=$(echo "$output" | grep "Commit Hash:" | sed 's/.*: //') diff --git a/tests/testutils.py b/tests/testutils.py index 07e8f670..65c9887d 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -17,9 +17,10 @@ """Helper functions for unit test setup and teardown.""" import argparse +import json import pathlib import shutil -from typing import Optional +from typing import List, Optional from git.repo import Repo from trestle.common.model_utils import ModelUtils @@ -157,3 +158,15 @@ def setup_for_compdef( ) return args + + +def write_index_json( + file_path: str, ssp_name: str, profile: str, component_definitions: List[str] +) -> None: + """Write out ssp index JSON for tests""" + data = { + ssp_name: {"profile": profile, "component_definitions": component_definitions} + } + + with open(file_path, "w") as file: + json.dump(data, file, indent=4) diff --git a/tests/trestlebot/tasks/authored/test_ssp.py b/tests/trestlebot/tasks/authored/test_ssp.py index a064ba44..6c543210 100644 --- a/tests/trestlebot/tasks/authored/test_ssp.py +++ b/tests/trestlebot/tasks/authored/test_ssp.py @@ -16,17 +16,19 @@ """Test for Trestle Bot Authored SSP.""" +import os import pathlib -from typing import Dict import pytest +from trestle.common import const from trestle.common.model_utils import ModelUtils from trestle.core.commands.author.ssp import SSPGenerate from trestle.core.models.file_content_type import FileContentType from trestle.oscal import ssp as ossp from tests import testutils -from trestlebot.tasks.authored.ssp import AuthoredSSP +from trestlebot.tasks.authored.base_authored import AuthoredObjectException +from trestlebot.tasks.authored.ssp import AuthoredSSP, SSPIndex test_prof = "simplified_nist_profile" @@ -44,10 +46,11 @@ def test_assemble(tmp_trestle_dir: str) -> None: ssp_generate = SSPGenerate() assert ssp_generate._run(args) == 0 - comps_by_ssp: Dict[str, str] = {} - comps_by_ssp[test_ssp_output] = test_comp + ssp_index_path = os.path.join(tmp_trestle_dir, "ssp-index.json") + testutils.write_index_json(ssp_index_path, test_ssp_output, test_prof, [test_comp]) + ssp_index: SSPIndex = SSPIndex(ssp_index_path) - authored_ssp = AuthoredSSP(tmp_trestle_dir, comps_by_ssp) + authored_ssp = AuthoredSSP(tmp_trestle_dir, ssp_index) # Run to ensure no exceptions are raised authored_ssp.assemble(md_path) @@ -68,10 +71,77 @@ def test_assemble_no_ssp_entry(tmp_trestle_dir: str) -> None: ssp_generate = SSPGenerate() assert ssp_generate._run(args) == 0 - comps_by_ssp: Dict[str, str] = {} - comps_by_ssp["fake"] = test_comp + ssp_index_path = os.path.join(tmp_trestle_dir, "ssp-index.json") + testutils.write_index_json(ssp_index_path, "fake", test_prof, [test_comp]) + ssp_index: SSPIndex = SSPIndex(ssp_index_path) - authored_ssp = AuthoredSSP(tmp_trestle_dir, comps_by_ssp) + authored_ssp = AuthoredSSP(tmp_trestle_dir, ssp_index) - with pytest.raises(ValueError): + with pytest.raises( + AuthoredObjectException, match="SSP test-ssp does not exists in the index" + ): authored_ssp.assemble(md_path) + + +def test_regenerate(tmp_trestle_dir: str) -> None: + """Test to test regenerate functionality for SSPs""" + # Prepare the workspace and generate the markdown + trestle_root = pathlib.Path(tmp_trestle_dir) + md_path = os.path.join(markdown_dir, test_ssp_output) + _ = testutils.setup_for_ssp(trestle_root, test_prof, test_comp, md_path) + + ssp_index_path = os.path.join(tmp_trestle_dir, "ssp-index.json") + testutils.write_index_json(ssp_index_path, test_ssp_output, test_prof, [test_comp]) + ssp_index: SSPIndex = SSPIndex(ssp_index_path) + + authored_ssp = AuthoredSSP(tmp_trestle_dir, ssp_index) + + # Run to ensure no exceptions are raised + model_path = os.path.join(const.MODEL_DIR_SSP, test_ssp_output) + authored_ssp.regenerate(model_path, md_path) + + assert os.path.exists(os.path.join(tmp_trestle_dir, markdown_dir, test_ssp_output)) + + +def test_regenerate_no_ssp_entry(tmp_trestle_dir: str) -> None: + """Test to trigger failure for missing SSP index""" + # Prepare the workspace and generate the markdown + trestle_root = pathlib.Path(tmp_trestle_dir) + md_path = os.path.join(markdown_dir, test_ssp_output) + _ = testutils.setup_for_ssp(trestle_root, test_prof, test_comp, md_path) + + ssp_index_path = os.path.join(tmp_trestle_dir, "ssp-index.json") + testutils.write_index_json(ssp_index_path, "fake", test_prof, [test_comp]) + ssp_index: SSPIndex = SSPIndex(ssp_index_path) + + authored_ssp = AuthoredSSP(tmp_trestle_dir, ssp_index) + + model_path = os.path.join(const.MODEL_DIR_SSP, test_ssp_output) + with pytest.raises( + AuthoredObjectException, match="SSP test-ssp does not exists in the index" + ): + authored_ssp.regenerate(model_path, md_path) + + +# SSPIndex tests + + +def test_get_comps_by_ssp(tmp_trestle_dir: str) -> None: + """Test to get component definition list from index""" + ssp_index_path = os.path.join(tmp_trestle_dir, "ssp-index.json") + testutils.write_index_json( + ssp_index_path, test_ssp_output, test_prof, [test_comp, "another_comp"] + ) + ssp_index: SSPIndex = SSPIndex(ssp_index_path) + + assert test_comp in ssp_index.get_comps_by_ssp(test_ssp_output) + assert "another_comp" in ssp_index.get_comps_by_ssp(test_ssp_output) + + +def test_get_profile_by_ssp(tmp_trestle_dir: str) -> None: + """Test to get profile from index""" + ssp_index_path = os.path.join(tmp_trestle_dir, "ssp-index.json") + testutils.write_index_json(ssp_index_path, test_ssp_output, test_prof, [test_comp]) + ssp_index: SSPIndex = SSPIndex(ssp_index_path) + + assert ssp_index.get_profile_by_ssp(test_ssp_output) == test_prof diff --git a/tests/trestlebot/tasks/authored/test_types.py b/tests/trestlebot/tasks/authored/test_types.py new file mode 100644 index 00000000..07522116 --- /dev/null +++ b/tests/trestlebot/tasks/authored/test_types.py @@ -0,0 +1,100 @@ +#!/usr/bin/python + +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Test author types for Trestlebot""" + +import os + +import pytest + +from tests import testutils +from trestlebot.tasks.authored import types +from trestlebot.tasks.authored.base_authored import ( + AuthoredObjectException, + AuthorObjectBase, +) +from trestlebot.tasks.authored.catalog import AuthoredCatalog +from trestlebot.tasks.authored.compdef import AuthoredComponentsDefinition +from trestlebot.tasks.authored.profile import AuthoredProfile +from trestlebot.tasks.authored.ssp import AuthoredSSP + + +test_prof = "simplified_nist_profile" +test_comp = "test_comp" +test_ssp_output = "test-ssp" +markdown_dir = "md_ssp" + + +def test_get_authored_catalog(tmp_trestle_dir: str) -> None: + """Test get authored type for catalogs""" + + authored_object: AuthorObjectBase = types.get_authored_object( + types.AuthoredType.CATALOG.value, tmp_trestle_dir, "" + ) + + assert authored_object.get_trestle_root() == tmp_trestle_dir + assert isinstance(authored_object, AuthoredCatalog) + + +def test_get_authored_profile(tmp_trestle_dir: str) -> None: + """Test get authored type for catalogs""" + + authored_object: AuthorObjectBase = types.get_authored_object( + types.AuthoredType.PROFILE.value, tmp_trestle_dir, "" + ) + + assert authored_object.get_trestle_root() == tmp_trestle_dir + assert isinstance(authored_object, AuthoredProfile) + + +def test_get_authored_compdef(tmp_trestle_dir: str) -> None: + """Test get authored type for catalogs""" + + # Test with profile + authored_object: AuthorObjectBase = types.get_authored_object( + types.AuthoredType.COMPDEF.value, tmp_trestle_dir, "" + ) + + assert authored_object.get_trestle_root() == tmp_trestle_dir + assert isinstance(authored_object, AuthoredComponentsDefinition) + + +def test_get_authored_ssp(tmp_trestle_dir: str) -> None: + """Test get authored type for catalogs""" + ssp_index_path = os.path.join(tmp_trestle_dir, "ssp-index.json") + testutils.write_index_json(ssp_index_path, test_ssp_output, test_prof, [test_comp]) + + with pytest.raises( + FileNotFoundError, + ): + _ = types.get_authored_object(types.AuthoredType.SSP.value, tmp_trestle_dir, "") + + # Test with profile + authored_object: AuthorObjectBase = types.get_authored_object( + types.AuthoredType.SSP.value, tmp_trestle_dir, ssp_index_path + ) + + assert authored_object.get_trestle_root() == tmp_trestle_dir + assert isinstance(authored_object, AuthoredSSP) + + +def test_invalid_authored_type(tmp_trestle_dir: str) -> None: + """Test triggering an error with an invalid type""" + with pytest.raises( + AuthoredObjectException, + match="Invalid authored type fake", + ): + _ = types.get_authored_object("fake", tmp_trestle_dir, "") diff --git a/tests/trestlebot/tasks/test_assemble_task.py b/tests/trestlebot/tasks/test_assemble_task.py index ab53bb50..ce314851 100644 --- a/tests/trestlebot/tasks/test_assemble_task.py +++ b/tests/trestlebot/tasks/test_assemble_task.py @@ -14,7 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. -"""Test for Trestle Bot assemble""" +"""Test for Trestle Bot assemble task""" import os import pathlib @@ -51,7 +51,7 @@ def test_catalog_assemble_task(tmp_trestle_dir: str) -> None: assemble_task = AssembleTask( tmp_trestle_dir, - AuthoredType.CATALOG, + AuthoredType.CATALOG.value, cat_md_dir, "", ) @@ -67,7 +67,7 @@ def test_profile_assemble_task(tmp_trestle_dir: str) -> None: assert profile_generate._run(args) == 0 assemble_task = AssembleTask( tmp_trestle_dir, - AuthoredType.PROFILE, + AuthoredType.PROFILE.value, prof_md_dir, "", ) @@ -83,7 +83,7 @@ def test_compdef_assemble_task(tmp_trestle_dir: str) -> None: assert comp_generate._run(args) == 0 assemble_task = AssembleTask( tmp_trestle_dir, - AuthoredType.COMPDEF, + AuthoredType.COMPDEF.value, compdef_md_dir, "", ) @@ -92,9 +92,8 @@ def test_compdef_assemble_task(tmp_trestle_dir: str) -> None: def test_ssp_assemble_task(tmp_trestle_dir: str) -> None: """Test ssp assemble at the task level""" - ssp_index_path = os.path.join(tmp_trestle_dir, "ssp-index.txt") - with open(ssp_index_path, "w") as f: - f.write(f"{test_ssp_output}:{test_comp}") + ssp_index_path = os.path.join(tmp_trestle_dir, "ssp-index.json") + testutils.write_index_json(ssp_index_path, test_ssp_output, test_prof, [test_comp]) trestle_root = pathlib.Path(tmp_trestle_dir) md_path = os.path.join(ssp_md_dir, test_ssp_output) @@ -104,7 +103,7 @@ def test_ssp_assemble_task(tmp_trestle_dir: str) -> None: assemble_task = AssembleTask( tmp_trestle_dir, - AuthoredType.SSP, + AuthoredType.SSP.value, ssp_md_dir, ssp_index_path, ) @@ -113,10 +112,6 @@ def test_ssp_assemble_task(tmp_trestle_dir: str) -> None: def test_ssp_assemble_task_no_index_path(tmp_trestle_dir: str) -> None: """Test ssp assemble at the task level with failure""" - ssp_index_path = os.path.join(tmp_trestle_dir, "ssp-index.txt") - with open(ssp_index_path, "w") as f: - f.write(f"{test_ssp_output}:{test_comp}") - trestle_root = pathlib.Path(tmp_trestle_dir) md_path = os.path.join(ssp_md_dir, test_ssp_output) args = testutils.setup_for_ssp(trestle_root, test_prof, test_comp, md_path) @@ -125,7 +120,7 @@ def test_ssp_assemble_task_no_index_path(tmp_trestle_dir: str) -> None: assemble_task = AssembleTask( tmp_trestle_dir, - AuthoredType.SSP, + AuthoredType.SSP.value, ssp_md_dir, "", ) diff --git a/tests/trestlebot/tasks/test_regenerate_task .py b/tests/trestlebot/tasks/test_regenerate_task .py new file mode 100644 index 00000000..b6983df7 --- /dev/null +++ b/tests/trestlebot/tasks/test_regenerate_task .py @@ -0,0 +1,171 @@ +#!/usr/bin/python + +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Test for Trestle Bot regenerate task""" + +import argparse +import os +import pathlib + +import pytest +from trestle.core.commands.author.ssp import SSPAssemble, SSPGenerate + +from tests import testutils +from trestlebot.tasks.authored.types import AuthoredType +from trestlebot.tasks.regenerate_task import RegenerateTask + + +test_prof = "simplified_nist_profile" +test_comp = "test_comp" +test_cat = "simplified_nist_catalog" +test_ssp_output = "my-ssp" + +cat_md_dir = "md_cat" +prof_md_dir = "md_prof" +compdef_md_dir = "md_comp" +ssp_md_dir = "md_ssp" + + +def test_catalog_regenerate_task(tmp_trestle_dir: str) -> None: + """Test catalog regenerate at the task level""" + trestle_root = pathlib.Path(tmp_trestle_dir) + md_path = os.path.join(cat_md_dir, test_cat) + _ = testutils.setup_for_catalog(trestle_root, test_cat, md_path) + + regenerate_task = RegenerateTask( + tmp_trestle_dir, + AuthoredType.CATALOG.value, + cat_md_dir, + "", + ) + assert regenerate_task.execute() == 0 + assert os.path.exists(os.path.join(tmp_trestle_dir, md_path)) + + +def test_catalog_regenerate_task_with_skip(tmp_trestle_dir: str) -> None: + """Test catalog regenerate at the task level""" + trestle_root = pathlib.Path(tmp_trestle_dir) + md_path = os.path.join(cat_md_dir, test_cat) + _ = testutils.setup_for_catalog(trestle_root, test_cat, md_path) + + regenerate_task = RegenerateTask( + tmp_trestle_dir, AuthoredType.CATALOG.value, cat_md_dir, "", [test_cat] + ) + assert regenerate_task.execute() == 0 + assert not os.path.exists(os.path.join(tmp_trestle_dir, md_path)) + + +def test_profile_regenerate_task(tmp_trestle_dir: str) -> None: + """Test profile regenerate at the task level""" + trestle_root = pathlib.Path(tmp_trestle_dir) + md_path = os.path.join(prof_md_dir, test_prof) + _ = testutils.setup_for_profile(trestle_root, test_prof, md_path) + + regenerate_task = RegenerateTask( + tmp_trestle_dir, + AuthoredType.PROFILE.value, + prof_md_dir, + "", + ) + assert regenerate_task.execute() == 0 + assert os.path.exists(os.path.join(tmp_trestle_dir, md_path)) + + +def test_compdef_regenerate_task(tmp_trestle_dir: str) -> None: + """Test compdef regenerate at the task level""" + trestle_root = pathlib.Path(tmp_trestle_dir) + md_path = os.path.join(compdef_md_dir, test_comp) + _ = testutils.setup_for_compdef(trestle_root, test_comp, md_path) + + regenerate_task = RegenerateTask( + tmp_trestle_dir, + AuthoredType.COMPDEF.value, + compdef_md_dir, + "", + ) + assert regenerate_task.execute() == 0 + assert os.path.exists(os.path.join(tmp_trestle_dir, md_path)) + + +def test_ssp_regenerate_task(tmp_trestle_dir: str) -> None: + """Test ssp regenerate at the task level""" + ssp_index_path = os.path.join(tmp_trestle_dir, "ssp-index.json") + testutils.write_index_json(ssp_index_path, test_ssp_output, test_prof, [test_comp]) + + trestle_root = pathlib.Path(tmp_trestle_dir) + md_path = os.path.join(ssp_md_dir, test_ssp_output) + + # Create initial SSP for testing + args = testutils.setup_for_ssp(trestle_root, test_prof, test_comp, md_path) + ssp_generate = SSPGenerate() + assert ssp_generate._run(args) == 0 + + # create ssp from the markdown + ssp_assemble = SSPAssemble() + args = argparse.Namespace( + trestle_root=tmp_trestle_dir, + markdown=md_path, + output=test_ssp_output, + verbose=0, + name=None, + version=None, + regenerate=False, + compdefs=args.compdefs, + ) + assert ssp_assemble._run(args) == 0 + + regenerate_task = RegenerateTask( + tmp_trestle_dir, + AuthoredType.SSP.value, + ssp_md_dir, + ssp_index_path, + ) + assert regenerate_task.execute() == 0 + assert os.path.exists(os.path.join(tmp_trestle_dir, md_path)) + + +def test_ssp_regenerate_task_no_index_path(tmp_trestle_dir: str) -> None: + """Test ssp regenerate at the task level with failure""" + trestle_root = pathlib.Path(tmp_trestle_dir) + md_path = os.path.join(ssp_md_dir, test_ssp_output) + + # Create initial SSP for testing + args = testutils.setup_for_ssp(trestle_root, test_prof, test_comp, md_path) + ssp_generate = SSPGenerate() + assert ssp_generate._run(args) == 0 + + # create ssp from the markdown + ssp_assemble = SSPAssemble() + args = argparse.Namespace( + trestle_root=tmp_trestle_dir, + markdown=md_path, + output=test_ssp_output, + verbose=0, + name=None, + version=None, + regenerate=False, + compdefs=args.compdefs, + ) + assert ssp_assemble._run(args) == 0 + regenerate_task = RegenerateTask( + tmp_trestle_dir, + AuthoredType.SSP.value, + ssp_md_dir, + "", + ) + + with pytest.raises(FileNotFoundError): + regenerate_task.execute() diff --git a/tests/trestlebot/test_bot.py b/tests/trestlebot/test_bot.py index 2d89550d..5c7e35a3 100644 --- a/tests/trestlebot/test_bot.py +++ b/tests/trestlebot/test_bot.py @@ -19,6 +19,7 @@ import os from typing import Tuple +import pytest from git.repo import Repo import trestlebot.bot as bot @@ -201,3 +202,30 @@ def test_empty_commit(tmp_repo: Tuple[str, Repo]) -> None: assert commit_sha == "" clean(repo_path, repo) + + +def test_run_check_only(tmp_repo: Tuple[str, Repo]) -> None: + """Test bot run with check_only""" + repo_path, repo = tmp_repo + + # Create a test file + test_file_path = os.path.join(repo_path, "test.txt") + with open(test_file_path, "w") as f: + f.write("Test content") + + with pytest.raises( + bot.RepoException, + match="Check only mode is enabled and diff detected. Manual intervention on main is required.", + ): + _ = bot.run( + working_dir=repo_path, + branch="main", + commit_name="Test User", + commit_email="test@example.com", + commit_message="Test commit message", + author_name="The Author", + author_email="author@test.com", + patterns=["*.txt"], + dry_run=True, + check_only=True, + ) diff --git a/tests/trestlebot/test_cli.py b/tests/trestlebot/test_cli.py index 6a13ad8d..4891419e 100644 --- a/tests/trestlebot/test_cli.py +++ b/tests/trestlebot/test_cli.py @@ -30,11 +30,11 @@ def valid_args_dict() -> dict: return { "branch": "main", "markdown-path": "/my/path", - "assemble-model": "profile", + "oscal-model": "profile", "committer-name": "test", "committer-email": "test@email.com", "working-dir": "tmp", - "patterns": ".", + "file-patterns": ".", } @@ -47,10 +47,10 @@ def args_dict_to_list(args_dict: dict) -> List[str]: return args -def test_invalid_assemble_model(monkeypatch, valid_args_dict, caplog): - """Test invalid assemble model""" +def test_invalid_oscal_model(monkeypatch, valid_args_dict, caplog): + """Test invalid oscal model""" args_dict = valid_args_dict - args_dict["assemble-model"] = "fake" + args_dict["oscal-model"] = "fake" monkeypatch.setattr(sys, "argv", ["trestlebot", *args_dict_to_list(args_dict)]) with pytest.raises(SystemExit): @@ -59,15 +59,15 @@ def test_invalid_assemble_model(monkeypatch, valid_args_dict, caplog): assert any( record.levelno == logging.ERROR and record.message - == "Invalid value fake for assemble model. Please use catalog, profile, compdef, or ssp." + == "Invalid value fake for oscal model. Please use catalog, profile, compdef, or ssp." for record in caplog.records ) def test_no_ssp_index(monkeypatch, valid_args_dict, caplog): - """Test invalid assemble model""" + """Test missing index file for ssp""" args_dict = valid_args_dict - args_dict["assemble-model"] = "ssp" + args_dict["oscal-model"] = "ssp" args_dict["ssp-index-path"] = "" monkeypatch.setattr(sys, "argv", ["trestlebot", *args_dict_to_list(args_dict)]) @@ -76,7 +76,6 @@ def test_no_ssp_index(monkeypatch, valid_args_dict, caplog): assert any( record.levelno == logging.ERROR - and record.message - == "Must set ssp_index_path when using SSP as assemble model." + and record.message == "Must set ssp_index_path when using SSP as oscal model." for record in caplog.records ) diff --git a/trestlebot/bot.py b/trestlebot/bot.py index 84f57a03..56fc3bd5 100644 --- a/trestlebot/bot.py +++ b/trestlebot/bot.py @@ -90,6 +90,7 @@ def run( author_email: str, patterns: List[str], pre_tasks: Optional[List[TaskBase]] = None, + check_only: bool = False, dry_run: bool = False, ) -> str: """Run Trestle Bot and return exit code @@ -124,6 +125,12 @@ def run( # Check if there are any unstaged files if repo.is_dirty(untracked_files=True): + if check_only: + raise RepoException( + "Check only mode is enabled and diff detected. " + f"Manual intervention on {branch} is required." + ) + _stage_files(repo, patterns) if repo.is_dirty(): diff --git a/trestlebot/cli.py b/trestlebot/cli.py index ff26c33a..f2fcb877 100644 --- a/trestlebot/cli.py +++ b/trestlebot/cli.py @@ -26,6 +26,7 @@ from trestlebot.tasks.assemble_task import AssembleTask from trestlebot.tasks.authored import types from trestlebot.tasks.base_task import TaskBase +from trestlebot.tasks.regenerate_task import RegenerateTask logging.basicConfig( @@ -50,10 +51,41 @@ def _parse_cli_arguments() -> argparse.Namespace: help="Path to Trestle markdown files", ) parser.add_argument( - "--assemble-model", + "--oscal-model", required=True, type=str, - help="OSCAL Model type to assemble. Values can be catalog, profile, compdef, or ssp", + help="OSCAL model type to run tasks on. Values can be catalog, profile, compdef, or ssp", + ) + parser.add_argument( + "--file-patterns", + required=True, + type=str, + help="Comma-separated list of file patterns to be used with `git add` in repository updates", + ) + parser.add_argument( + "--skip-items", + type=str, + required=False, + default="", + help="Comma-separated list of items of the chosen model type to skip when running tasks", + ) + parser.add_argument( + "--skip-assemble", + required=False, + action="store_true", + help="Skip assembly task. Defaults to false", + ) + parser.add_argument( + "--skip-regenerate", + required=False, + action="store_true", + help="Skip regenerate task. Defaults to false.", + ) + parser.add_argument( + "--check-only", + required=False, + action="store_true", + help="Runs tasks and exits with an error if there is a diff", ) parser.add_argument( "--working-dir", @@ -99,16 +131,9 @@ def _parse_cli_arguments() -> argparse.Namespace: "--ssp-index-path", required=False, type=str, - default="ssp-index.txt", + default="ssp-index.json", help="Path to ssp index file", ) - parser.add_argument( - "--patterns", - nargs="+", - type=str, - required=True, - help="List of file patterns to include in repository updates", - ) return parser.parse_args() @@ -126,14 +151,14 @@ def run() -> None: args = _parse_cli_arguments() pre_tasks: List[TaskBase] = [] + authored_list: List[str] = [model.value for model in types.AuthoredType] + # Pre-process flags - if args.assemble_model: - assembled_type: types.AuthoredType - try: - assembled_type = types.check_authored_type(args.assemble_model) - except ValueError: + + if args.oscal_model: + if args.oscal_model not in authored_list: logging.error( - f"Invalid value {args.assemble_model} for assemble model. " + f"Invalid value {args.oscal_model} for oscal model. " f"Please use catalog, profile, compdef, or ssp." ) sys.exit(1) @@ -142,17 +167,35 @@ def run() -> None: logging.error("Must set markdown path with assemble model.") sys.exit(1) - if args.assemble_model == "ssp" and args.ssp_index_path == "": - logging.error("Must set ssp_index_path when using SSP as assemble model.") + if args.oscal_model == "ssp" and args.ssp_index_path == "": + logging.error("Must set ssp_index_path when using SSP as oscal model.") sys.exit(1) - assemble_task = AssembleTask( - args.working_dir, - assembled_type, - args.markdown_path, - args.ssp_index_path, - ) - pre_tasks.append(assemble_task) + # Assuming an edit has occurred assemble would be run before regenerate. + # Adding this to the list first + if not args.skip_assemble: + assemble_task = AssembleTask( + args.working_dir, + args.oscal_model, + args.markdown_path, + args.ssp_index_path, + comma_sep_to_list(args.skip_items), + ) + pre_tasks.append(assemble_task) + else: + logging.info("Assemble task skipped") + + if not args.skip_regenerate: + regenerate_task = RegenerateTask( + args.working_dir, + args.oscal_model, + args.markdown_path, + args.ssp_index_path, + comma_sep_to_list(args.skip_items), + ) + pre_tasks.append(regenerate_task) + else: + logging.info("Regeneration task skipped") exit_code: int = 0 @@ -168,7 +211,8 @@ def run() -> None: author_name=args.author_name, author_email=args.author_email, pre_tasks=pre_tasks, - patterns=args.patterns, + patterns=comma_sep_to_list(args.file_patterns), + check_only=args.check_only, ) # Print the full commit sha @@ -179,3 +223,9 @@ def run() -> None: exit_code = handle_exception(e) sys.exit(exit_code) + + +def comma_sep_to_list(string: str) -> List[str]: + """Convert comma-sep string to list of strings and strip.""" + string = string.strip() if string else "" + return list(map(str.strip, string.split(","))) if string else [] diff --git a/trestlebot/tasks/assemble_task.py b/trestlebot/tasks/assemble_task.py index 014b0b4e..900c7450 100644 --- a/trestlebot/tasks/assemble_task.py +++ b/trestlebot/tasks/assemble_task.py @@ -17,17 +17,13 @@ """Trestle Bot Assembly Tasks""" import os -from typing import Dict +from typing import List from trestlebot.tasks.authored import types from trestlebot.tasks.authored.base_authored import ( AuthoredObjectException, AuthorObjectBase, ) -from trestlebot.tasks.authored.catalog import AuthoredCatalog -from trestlebot.tasks.authored.compdef import AuthoredComponentsDefinition -from trestlebot.tasks.authored.profile import AuthoredProfile -from trestlebot.tasks.authored.ssp import AuthoredSSP from trestlebot.tasks.base_task import TaskBase, TaskException @@ -39,24 +35,26 @@ class AssembleTask(TaskBase): def __init__( self, working_dir: str, - authored_model: types.AuthoredType, + authored_model: str, markdown_dir: str, ssp_index_path: str = "", + skip_model_list: List[str] = [], ) -> None: """ Initialize assemble task. Args: - working_dir: Working directory to complete operations in. - authored_model: Model type . - markdown_dir: Working directory to complete operations in. - ssp_index_path: - + working_dir: Working directory to complete operations in + authored_model: String representation of model type + markdown_dir: Location of directory to write Markdown in + ssp_index_path: Path of ssp index JSON in the workspace + skip_model_list: List of model names to be skipped during processing """ self._authored_model = authored_model self._markdown_dir = markdown_dir self._ssp_index_path = ssp_index_path + self._skip_model_list = skip_model_list super().__init__(working_dir) def execute(self) -> int: @@ -65,35 +63,20 @@ def execute(self) -> int: def _assemble(self) -> int: """Assemble all objects in markdown directory""" - authored_object: AuthorObjectBase = self._get_authored_object() - search_path = os.path.join(self._working_dir, self._markdown_dir) + authored_object: AuthorObjectBase = types.get_authored_object( + self._authored_model, self.get_working_dir(), self._ssp_index_path + ) + search_path = os.path.join(self.get_working_dir(), self._markdown_dir) for model in os.listdir(search_path): - # Construct model path from markdown. AuthoredObject already - # have the working dir data. + # Construct model path from markdown path. AuthoredObject already has + # the working dir data as part of object construction. + if model in self._skip_model_list or model == ".keep": + continue model_path = os.path.join(self._markdown_dir, model) + try: - authored_object.assemble(model_path=model_path) + authored_object.assemble(markdown_path=model_path) except AuthoredObjectException as e: raise TaskException(f"Assemble task failed for model {model_path}: {e}") - return 0 - def _get_authored_object(self) -> AuthorObjectBase: - """Determine and configure author object context""" - if self._authored_model is types.AuthoredType.CATALOG: # noqa: E721 - return AuthoredCatalog(self._working_dir) - elif self._authored_model is types.AuthoredType.PROFILE: # noqa: E721 - return AuthoredProfile(self._working_dir) - elif self._authored_model is types.AuthoredType.COMPDEF: # noqa: E721 - return AuthoredComponentsDefinition(self._working_dir) - elif self._authored_model is types.AuthoredType.SSP: # noqa: E721 - comps_by_ssp: Dict[str, str] = {} - - with open(self._ssp_index_path, "r") as file: - for line in file: - line = line.strip() - if line: - key, value = line.split(":", 1) - comps_by_ssp[key] = value - return AuthoredSSP(self._working_dir, comps_by_ssp) - else: - raise TaskException(f"Invalid authored type {self._authored_model}") + return 0 diff --git a/trestlebot/tasks/authored/base_authored.py b/trestlebot/tasks/authored/base_authored.py index 7892d561..5f2bef5d 100644 --- a/trestlebot/tasks/authored/base_authored.py +++ b/trestlebot/tasks/authored/base_authored.py @@ -32,6 +32,14 @@ def __init__(self, trestle_root: str) -> None: """Initialize task base and store trestle root path""" self._trestle_root = trestle_root + def get_trestle_root(self) -> str: + """Return the trestle root directory""" + return self._trestle_root + @abstractmethod - def assemble(self, model_path: str, version_tag: str = "") -> None: + def assemble(self, markdown_path: str, version_tag: str = "") -> None: """Execute assemble for model path""" + + @abstractmethod + def regenerate(self, model_path: str, markdown_path: str) -> None: + """Execute regeneration for model path""" diff --git a/trestlebot/tasks/authored/catalog.py b/trestlebot/tasks/authored/catalog.py index 70311b88..d4e13883 100644 --- a/trestlebot/tasks/authored/catalog.py +++ b/trestlebot/tasks/authored/catalog.py @@ -22,7 +22,7 @@ import sys from trestle.common.err import TrestleError -from trestle.core.commands.author.catalog import CatalogAssemble +from trestle.core.commands.author.catalog import CatalogAssemble, CatalogGenerate from trestle.core.commands.common.return_codes import CmdReturnCodes from trestlebot.tasks.authored.base_authored import ( @@ -38,7 +38,7 @@ class AuthoredCatalog(AuthorObjectBase): """ - Functions for authoring OSCAL catalogs in automation + Class for authoring OSCAL catalogs in automation """ def __init__(self, trestle_root: str) -> None: @@ -47,13 +47,13 @@ def __init__(self, trestle_root: str) -> None: """ super().__init__(trestle_root) - def assemble(self, model_path: str, version_tag: str = "") -> None: - trestle_root = pathlib.Path(self._trestle_root) - catalog = os.path.basename(model_path) + def assemble(self, markdown_path: str, version_tag: str = "") -> None: + trestle_root = pathlib.Path(self.get_trestle_root()) + catalog = os.path.basename(markdown_path) try: exit_code = CatalogAssemble.assemble_catalog( trestle_root=trestle_root, - md_name=model_path, + md_name=markdown_path, assem_cat_name=catalog, parent_cat_name="", set_parameters_flag=True, @@ -66,3 +66,24 @@ def assemble(self, model_path: str, version_tag: str = "") -> None: ) except TrestleError as e: raise AuthoredObjectException(f"Trestle assemble failed for {catalog}: {e}") + + def regenerate(self, model_path: str, markdown_path: str) -> None: + trestle_root = self.get_trestle_root() + trestle_path = pathlib.Path(trestle_root) + catalog_generate: CatalogGenerate = CatalogGenerate() + + catalog = os.path.basename(model_path) + try: + exit_code = catalog_generate.generate_markdown( + trestle_root=trestle_path, + catalog_path=pathlib.Path(trestle_root, model_path, "catalog.json"), + markdown_path=pathlib.Path(trestle_root, markdown_path, catalog), + yaml_header={}, + overwrite_header_values=False, + ) + if exit_code != CmdReturnCodes.SUCCESS.value: + raise AuthoredObjectException( + f"Unknown error occurred while regenerating {catalog}" + ) + except TrestleError as e: + raise AuthoredObjectException(f"Trestle generate failed for {catalog}: {e}") diff --git a/trestlebot/tasks/authored/compdef.py b/trestlebot/tasks/authored/compdef.py index 3284ba40..7f287d16 100644 --- a/trestlebot/tasks/authored/compdef.py +++ b/trestlebot/tasks/authored/compdef.py @@ -22,7 +22,7 @@ import sys from trestle.common.err import TrestleError -from trestle.core.commands.author.component import ComponentAssemble +from trestle.core.commands.author.component import ComponentAssemble, ComponentGenerate from trestle.core.commands.common.return_codes import CmdReturnCodes from trestlebot.tasks.authored.base_authored import ( @@ -38,7 +38,7 @@ class AuthoredComponentsDefinition(AuthorObjectBase): """ - Functions for authoring OSCAL Component Definitions in automation + Class for authoring OSCAL Component Definitions in automation """ def __init__(self, trestle_root: str) -> None: @@ -47,13 +47,13 @@ def __init__(self, trestle_root: str) -> None: """ super().__init__(trestle_root) - def assemble(self, model_path: str, version_tag: str = "") -> None: - trestle_root = pathlib.Path(self._trestle_root) - compdef = os.path.basename(model_path) + def assemble(self, markdown_path: str, version_tag: str = "") -> None: + trestle_root = pathlib.Path(self.get_trestle_root()) + compdef = os.path.basename(markdown_path) try: exit_code = ComponentAssemble.assemble_component( trestle_root=trestle_root, - md_name=model_path, + md_name=markdown_path, assem_comp_name=compdef, parent_comp_name="", regenerate=False, @@ -65,3 +65,24 @@ def assemble(self, model_path: str, version_tag: str = "") -> None: ) except TrestleError as e: raise AuthoredObjectException(f"Trestle assemble failed for {compdef}: {e}") + + def regenerate(self, model_path: str, markdown_path: str) -> None: + trestle_root = self.get_trestle_root() + trestle_path = pathlib.Path(trestle_root) + comp_generate: ComponentGenerate = ComponentGenerate() + + comp_name = os.path.basename(model_path) + try: + exit_code = comp_generate.component_generate_all( + trestle_root=trestle_path, + comp_def_name=comp_name, + markdown_dir_name=os.path.join(trestle_root, markdown_path, comp_name), + ) + if exit_code != CmdReturnCodes.SUCCESS.value: + raise AuthoredObjectException( + f"Unknown error occurred while regenerating {comp_name}" + ) + except TrestleError as e: + raise AuthoredObjectException( + f"Trestle generate failed for {comp_name}: {e}" + ) diff --git a/trestlebot/tasks/authored/profile.py b/trestlebot/tasks/authored/profile.py index fad96f74..5111c57f 100644 --- a/trestlebot/tasks/authored/profile.py +++ b/trestlebot/tasks/authored/profile.py @@ -22,7 +22,7 @@ import sys from trestle.common.err import TrestleError -from trestle.core.commands.author.profile import ProfileAssemble +from trestle.core.commands.author.profile import ProfileAssemble, ProfileGenerate from trestle.core.commands.common.return_codes import CmdReturnCodes from trestlebot.tasks.authored.base_authored import ( @@ -38,7 +38,7 @@ class AuthoredProfile(AuthorObjectBase): """ - Functions for authoring OSCAL Profiles in automation + Class for authoring OSCAL Profiles in automation """ def __init__(self, trestle_root: str) -> None: @@ -47,13 +47,13 @@ def __init__(self, trestle_root: str) -> None: """ super().__init__(trestle_root) - def assemble(self, model_path: str, version_tag: str = "") -> None: - trestle_root = pathlib.Path(self._trestle_root) - profile = os.path.basename(model_path) + def assemble(self, markdown_path: str, version_tag: str = "") -> None: + trestle_root = pathlib.Path(self.get_trestle_root()) + profile = os.path.basename(markdown_path) try: exit_code = ProfileAssemble.assemble_profile( trestle_root=trestle_root, - md_name=model_path, + md_name=markdown_path, assem_prof_name=profile, parent_prof_name="", set_parameters_flag=True, @@ -69,3 +69,26 @@ def assemble(self, model_path: str, version_tag: str = "") -> None: ) except TrestleError as e: raise AuthoredObjectException(f"Trestle assemble failed for {profile}: {e}") + + def regenerate(self, model_path: str, markdown_path: str) -> None: + trestle_root = self.get_trestle_root() + trestle_path = pathlib.Path(trestle_root) + profile_generate: ProfileGenerate = ProfileGenerate() + + profile = os.path.basename(model_path) + try: + exit_code = profile_generate.generate_markdown( + trestle_root=trestle_path, + profile_path=pathlib.Path(trestle_root, model_path, "profile.json"), + markdown_path=pathlib.Path(trestle_root, markdown_path, profile), + yaml_header={}, + overwrite_header_values=False, + sections_dict={}, + required_sections=[], + ) + if exit_code != CmdReturnCodes.SUCCESS.value: + raise AuthoredObjectException( + f"Unknown error occurred while regenerating {profile}" + ) + except TrestleError as e: + raise AuthoredObjectException(f"Trestle generate failed for {profile}: {e}") diff --git a/trestlebot/tasks/authored/ssp.py b/trestlebot/tasks/authored/ssp.py index 3406c145..59b62cff 100644 --- a/trestlebot/tasks/authored/ssp.py +++ b/trestlebot/tasks/authored/ssp.py @@ -17,13 +17,15 @@ """Trestle Bot functions for SSP authoring""" import argparse +import json import logging import os +import pathlib import sys -from typing import Dict +from typing import Dict, List from trestle.common.err import TrestleError -from trestle.core.commands.author.ssp import SSPAssemble +from trestle.core.commands.author.ssp import SSPAssemble, SSPGenerate from trestle.core.commands.common.return_codes import CmdReturnCodes from trestlebot.tasks.authored.base_authored import ( @@ -37,37 +39,88 @@ ) +class SSPIndex: + """ + Class for managing the SSP index that stores relationship data by Trestle name + for SSPs. + """ + + def __init__(self, index_path: str) -> None: + """ + Initialize ssp index. + """ + self.profile_by_ssp: Dict[str, str] = {} + self.comps_by_ssp: Dict[str, List[str]] = {} + + with open(index_path, "r") as file: + json_data = json.load(file) + + for ssp_name, ssp_info in json_data.items(): + try: + profile = ssp_info["profile"] + component_definitions = ssp_info["component_definitions"] + except KeyError: + raise AuthoredObjectException( + f"SSP {ssp_name} entry is missing profile or component data" + ) + + if profile is not None and component_definitions is not None: + self.profile_by_ssp[ssp_name] = profile + self.comps_by_ssp[ssp_name] = component_definitions + + def get_comps_by_ssp(self, ssp_name: str) -> List[str]: + """Returns list of compdefs associated with the SSP""" + try: + return self.comps_by_ssp[ssp_name] + except KeyError: + raise AuthoredObjectException( + f"SSP {ssp_name} does not exists in the index" + ) + + def get_profile_by_ssp(self, ssp_name: str) -> str: + """Returns the profile associated with the SSP""" + try: + return self.profile_by_ssp[ssp_name] + except KeyError: + raise AuthoredObjectException( + f"SSP {ssp_name} does not exists in the index" + ) + + +# TODO: Move away from using private run to a public function. +# Done initially because a lot of required high level logic for SSP is private. + + class AuthoredSSP(AuthorObjectBase): """ - Functions for authoring OSCAL SSPs in automation + Class for authoring OSCAL SSPs in automation """ - def __init__(self, trestle_root: str, comps_by_ssp: Dict[str, str]) -> None: + def __init__(self, trestle_root: str, ssp_index: SSPIndex) -> None: """ Initialize authored ssps object. """ + self.ssp_index = ssp_index super().__init__(trestle_root) - self.comps_by_ssp = comps_by_ssp - def assemble(self, model_path: str, version_tag: str = "") -> None: + def assemble(self, markdown_path: str, version_tag: str = "") -> None: + """Run assemble actions for ssp type at the provided path""" ssp_assemble: SSPAssemble = SSPAssemble() - ssp = os.path.basename(model_path) + ssp = os.path.basename(markdown_path) - try: - comps = self.comps_by_ssp[ssp] - except KeyError: - raise ValueError(f"{ssp} not in ssp index") + comps = self.ssp_index.get_comps_by_ssp(ssp) + component_str = ",".join(comps) try: args = argparse.Namespace( - trestle_root=self._trestle_root, - markdown=model_path, + trestle_root=self.get_trestle_root(), + markdown=markdown_path, output=ssp, verbose=0, regenerate=False, version=version_tag, name=None, - compdefs=comps, + compdefs=component_str, ) exit_code = ssp_assemble._run(args) @@ -77,3 +130,30 @@ def assemble(self, model_path: str, version_tag: str = "") -> None: ) except TrestleError as e: raise AuthoredObjectException(f"Trestle assemble failed for {ssp}: {e}") + + def regenerate(self, model_path: str, markdown_path: str) -> None: + """Run regenerate actions for ssp type at the provided path""" + trestle_root = self.get_trestle_root() + trestle_path = pathlib.Path(trestle_root) + ssp_generate: SSPGenerate = SSPGenerate() + + ssp = os.path.basename(model_path) + comps = self.ssp_index.get_comps_by_ssp(ssp) + profile = self.ssp_index.get_profile_by_ssp(ssp) + + try: + exit_code = ssp_generate._generate_ssp_markdown( + trestle_root=trestle_path, + profile_name_or_href=profile, + compdef_name_list=comps, + md_path=pathlib.Path(trestle_root, markdown_path, ssp), + yaml_header={}, + overwrite_header_values=False, + force_overwrite=False, + ) + if exit_code != CmdReturnCodes.SUCCESS.value: + raise AuthoredObjectException( + f"Unknown error occurred while regenerating {ssp}" + ) + except TrestleError as e: + raise AuthoredObjectException(f"Trestle generate failed for {ssp}: {e}") diff --git a/trestlebot/tasks/authored/types.py b/trestlebot/tasks/authored/types.py index 0d6eddb7..9a411669 100644 --- a/trestlebot/tasks/authored/types.py +++ b/trestlebot/tasks/authored/types.py @@ -14,10 +14,21 @@ # License for the specific language governing permissions and limitations # under the License. -"""Trestle Bot constants""" +"""Trestle Bot authoring type information""" from enum import Enum +from trestle.common import const + +from trestlebot.tasks.authored.base_authored import ( + AuthoredObjectException, + AuthorObjectBase, +) +from trestlebot.tasks.authored.catalog import AuthoredCatalog +from trestlebot.tasks.authored.compdef import AuthoredComponentsDefinition +from trestlebot.tasks.authored.profile import AuthoredProfile +from trestlebot.tasks.authored.ssp import AuthoredSSP, SSPIndex + class AuthoredType(Enum): """Top-level OSCAL models that have authoring support""" @@ -28,10 +39,32 @@ class AuthoredType(Enum): COMPDEF = "compdef" -def check_authored_type(input_type: str) -> AuthoredType: - """Check if the string has a matching AuthoredType, if not raise an error""" - try: - item_type = AuthoredType[input_type.upper()] - return item_type - except KeyError: - raise ValueError(f"Invalid item type: {input_type}") +def get_authored_object( + input_type: str, working_dir: str, ssp_index_path: str = "" +) -> AuthorObjectBase: + """Determine and configure author object context""" + if input_type == AuthoredType.CATALOG.value: + return AuthoredCatalog(working_dir) + elif input_type == AuthoredType.PROFILE.value: + return AuthoredProfile(working_dir) + elif input_type == AuthoredType.COMPDEF.value: + return AuthoredComponentsDefinition(working_dir) + elif input_type == AuthoredType.SSP.value: + ssp_index: SSPIndex = SSPIndex(ssp_index_path) + return AuthoredSSP(working_dir, ssp_index) + else: + raise AuthoredObjectException(f"Invalid authored type {input_type}") + + +def get_trestle_model_dir(input_type: str) -> str: + """Determine directory for JSON content in trestle""" + if input_type == AuthoredType.CATALOG.value: + return const.MODEL_DIR_CATALOG + elif input_type == AuthoredType.PROFILE.value: + return const.MODEL_DIR_PROFILE + elif input_type == AuthoredType.COMPDEF.value: + return const.MODEL_DIR_COMPDEF + elif input_type is AuthoredType.SSP.value: + return const.MODEL_DIR_SSP + else: + raise AuthoredObjectException(f"Invalid authored type {input_type}") diff --git a/trestlebot/tasks/regenerate_task.py b/trestlebot/tasks/regenerate_task.py new file mode 100644 index 00000000..9477d05b --- /dev/null +++ b/trestlebot/tasks/regenerate_task.py @@ -0,0 +1,85 @@ +#!/usr/bin/python + +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Trestle Bot Regenerate Tasks""" + +import os +from typing import List + +from trestlebot.tasks.authored import types +from trestlebot.tasks.authored.base_authored import ( + AuthoredObjectException, + AuthorObjectBase, +) +from trestlebot.tasks.base_task import TaskBase, TaskException + + +class RegenerateTask(TaskBase): + """ + Regenerate Trestle Markdown from OSCAL JSON content changes + """ + + def __init__( + self, + working_dir: str, + authored_model: str, + markdown_dir: str, + ssp_index_path: str = "", + skip_model_list: List[str] = [], + ) -> None: + """ + Initialize regenerate task. + + Args: + working_dir: Working directory to complete operations in + authored_model: String representation of model type + markdown_dir: Location of directory to write Markdown in + ssp_index_path: Path of ssp index JSON in the workspace + skip_model_list: List of model names to be skipped during processing + """ + + self._authored_model = authored_model + self._markdown_dir = markdown_dir + self._ssp_index_path = ssp_index_path + self._skip_model_list = skip_model_list + super().__init__(working_dir) + + def execute(self) -> int: + """Execute task""" + return self._regenerate() + + def _regenerate(self) -> int: + """Regenerate all objects in model JSON directory""" + authored_object: AuthorObjectBase = types.get_authored_object( + self._authored_model, self.get_working_dir(), self._ssp_index_path + ) + + model_dir = types.get_trestle_model_dir(self._authored_model) + + search_path = os.path.join(self.get_working_dir(), model_dir) + for model in os.listdir(search_path): + if model in self._skip_model_list or model == ".keep": + continue + model_path = os.path.join(model_dir, model) + + try: + authored_object.regenerate( + model_path=model_path, markdown_path=self._markdown_dir + ) + except AuthoredObjectException as e: + raise TaskException(f"Regenerate task failed for model {model}: {e}") + + return 0