Skip to content

Commit

Permalink
feat(entrypoint): adds sync-upstreams entrypoint
Browse files Browse the repository at this point in the history
E2E tests are updated to include a step with sync_upstreams
to ssp-authoring workflow

Signed-off-by: Jennifer Power <[email protected]>
  • Loading branch information
jpower432 committed Jan 11, 2024
1 parent ddc4d3c commit f0aec98
Show file tree
Hide file tree
Showing 7 changed files with 532 additions and 33 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ repository = 'https://github.com/RedHatProductSecurity/trestle-bot'
trestlebot-autosync = "trestlebot.entrypoints.autosync:main"
trestlebot-rules-transform = "trestlebot.entrypoints.rule_transform:main"
trestlebot-create-cd = "trestlebot.entrypoints.create_cd:main"
trestlebot-sync-upstreams = "trestlebot.entrypoints.sync_upstreams:main"

[tool.poetry.dependencies]
python = '^3.8.1'
Expand Down
24 changes: 2 additions & 22 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
TRESTLEBOT_TEST_IMAGE_NAME,
build_test_image,
clean,
repo_setup,
)
from trestlebot import const
from trestlebot.transformers.trestle_rule import (
Expand All @@ -58,28 +59,7 @@ def tmp_repo() -> YieldFixture[Tuple[str, Repo]]:
"""Create a temporary git repository with an initialized trestle workspace root"""
with TemporaryDirectory(prefix=_TEST_PREFIX) as tmpdir:
tmp_path = pathlib.Path(tmpdir)
try:
args = argparse.Namespace(
verbose=0,
trestle_root=tmp_path,
full=True,
local=False,
govdocs=False,
)
init = InitCmd()
init._run(args)
except Exception as e:
raise TrestleError(
f"Initialization failed for temporary trestle directory: {e}."
)
repo = Repo.init(tmpdir)
with repo.config_writer() as config:
config.set_value("user", "email", "[email protected]")
config.set_value("user", "name", "Test User")
repo.git.add(all=True)
repo.index.commit("Initial commit")
# Create a default branch (main)
repo.git.checkout("-b", "main")
repo: Repo = repo_setup(tmp_path)
yield tmpdir, repo

try:
Expand Down
143 changes: 143 additions & 0 deletions tests/data/json/simplified_nist_profile_upstream.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
{
"profile": {
"uuid": "1019f424-1556-4aa3-9df3-337b97c2c856",
"metadata": {
"title": "NIST Special Publication 800-53 Revision 5 MODERATE IMPACT BASELINE",
"last-modified": "2021-06-08T13:57:34.337491-04:00",
"version": "Final",
"oscal-version": "1.0.0",
"roles": [
{
"id": "creator",
"title": "Document Creator"
},
{
"id": "contact",
"title": "Contact"
}
],
"parties": [
{
"uuid": "cde369ce-57f8-4ec1-847f-2681a9a881e7",
"type": "organization",
"name": "Joint Task Force, Transformation Initiative",
"email-addresses": [
"[email protected]"
],
"addresses": [
{
"addr-lines": [
"National Institute of Standards and Technology",
"Attn: Computer Security Division",
"Information Technology Laboratory",
"100 Bureau Drive (Mail Stop 8930)"
],
"city": "Gaithersburg",
"state": "MD",
"postal-code": "20899-8930"
}
]
}
],
"responsible-parties": [
{
"role-id": "creator",
"party-uuids": [
"cde369ce-57f8-4ec1-847f-2681a9a881e7"
]
},
{
"role-id": "contact",
"party-uuids": [
"cde369ce-57f8-4ec1-847f-2681a9a881e7"
]
}
]
},
"imports": [
{
"href": "trestle://catalogs/simplified_nist_catalog/catalog.json",
"include-controls": [
{
"with-ids": [
"ac-1",
"ac-2",
"ac-2.1",
"ac-2.2",
"ac-2.3",
"ac-2.4",
"ac-2.5",
"ac-2.13",
"ac-3",
"ac-4",
"ac-4.4",
"ac-5",
"ac-6"
]
}
]
}
],
"merge": {
"as-is": true
},
"modify": {
"set-parameters": [
{
"param_id": "ac-1_prm_1",
"class": "newclassfromprof",
"depends-on": "newdependsonfromprof",
"usage": "new usage from prof",
"props": [
{
"name": "param_1_prop",
"value": "prop value from prof"
},
{
"name": "param_1_prop_2",
"value": "new prop value from prof"
}
],
"links": [
{
"href": "#123456789",
"text": "new text from prof"
},
{
"href": "#new_link",
"text": "new link text"
}
],
"constraints": [
{
"description": "new constraint"
}
],
"guidelines": [
{
"prose": "new guideline"
}
]
},
{
"param_id": "ac-4.4_prm_3",
"values": [
"hacking the system"
]
},
{
"param_id": "loose_2",
"values": [
"loose_2_val_from_prof"
]
},
{
"param_id": "bad_param_id",
"values": [
"this will cause warning"
]
}
]
}
}
}
52 changes: 49 additions & 3 deletions tests/e2e/test_e2e_ssp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
E2E tests for SSP creation and autosync workflow.
Notes that this should be the only E2E for auto-syncing since the UX is the same for each model.
Any model specific test should be under workflows.
The SSP model is used here as a stand-in for all models because it is the most complex process.
"""

import logging
Expand All @@ -34,7 +34,13 @@
from trestle.core.models.file_content_type import FileContentType
from trestle.oscal.ssp import SystemSecurityPlan

from tests.testutils import build_test_command, setup_for_ssp
from tests.testutils import (
UPSTREAM_REPO,
build_test_command,
clean,
prepare_upstream_repo,
setup_for_ssp,
)
from trestlebot.const import ERROR_EXIT_CODE, INVALID_ARGS_EXIT_CODE, SUCCESS_EXIT_CODE
from trestlebot.tasks.authored.ssp import AuthoredSSP, SSPIndex

Expand Down Expand Up @@ -112,7 +118,6 @@ def test_ssp_editing_e2e(
args = setup_for_ssp(tmp_repo_path, test_prof, [test_comp_name], test_ssp_md)

# Create or generate the SSP

if not skip_create:
index_path = os.path.join(tmp_repo_str, "ssp-index.json")
ssp_index = SSPIndex(index_path)
Expand Down Expand Up @@ -157,3 +162,44 @@ def test_ssp_editing_e2e(
assert ssp_index.get_comps_by_ssp(test_ssp_name) == [test_comp_name]
assert ssp_index.get_leveraged_by_ssp(test_ssp_name) is None
assert ssp_path.exists()

# Check that if run again, the ssp is not pushed again
command = build_test_command(tmp_repo_str, "autosync", command_args, image_name)
run_response = subprocess.run(command, capture_output=True)
assert run_response.returncode == SUCCESS_EXIT_CODE
assert "Nothing to commit" in run_response.stdout.decode("utf-8")

# Check that if the upstream profile is updated, the ssp is updated
local_upstream_path = prepare_upstream_repo()
upstream_repos_arg = f"{UPSTREAM_REPO}@main"
upstream_command_args = {
"branch": command_args["branch"],
"committer-name": command_args["committer-name"],
"committer-email": command_args["committer-email"],
"sources": upstream_repos_arg,
}
command = build_test_command(
tmp_repo_str,
"sync-upstreams",
upstream_command_args,
image_name,
local_upstream_path,
)
run_response = subprocess.run(command, capture_output=True)
assert run_response.returncode == SUCCESS_EXIT_CODE
assert (
f"Changes pushed to {command_args['branch']} successfully."
in run_response.stdout.decode("utf-8")
)

# Autosync again to check that the ssp is updated
command = build_test_command(tmp_repo_str, "autosync", command_args, image_name)
run_response = subprocess.run(command, capture_output=True)
assert run_response.returncode == SUCCESS_EXIT_CODE
assert (
f"Changes pushed to {command_args['branch']} successfully."
in run_response.stdout.decode("utf-8")
)

# Clean up the upstream repo
clean(local_upstream_path, None)
91 changes: 83 additions & 8 deletions tests/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import pathlib
import shutil
import subprocess
import tempfile
from typing import Dict, List, Optional

from git.repo import Repo
from trestle.common.err import TrestleError
from trestle.common.model_utils import ModelUtils
from trestle.core.base_model import OscalBaseModel
from trestle.core.commands.init import InitCmd
from trestle.core.models.file_content_type import FileContentType
from trestle.oscal import catalog as cat
from trestle.oscal import component as comp
Expand All @@ -49,6 +52,9 @@
E2E_BUILD_CONTEXT = "tests/e2e"
CONTAINER_FILE_NAME = "Dockerfile"

# Location the upstream repo is mounted to in the container
UPSTREAM_REPO = "/upstream"


def clean(repo_path: str, repo: Optional[Repo]) -> None:
"""Clean up the temporary Git repository."""
Expand All @@ -57,6 +63,33 @@ def clean(repo_path: str, repo: Optional[Repo]) -> None:
shutil.rmtree(repo_path)


def repo_setup(repo_path: pathlib.Path) -> Repo:
"""Create a temporary Git repository."""
try:
args = argparse.Namespace(
verbose=0,
trestle_root=repo_path,
full=True,
local=False,
govdocs=False,
)
init = InitCmd()
init._run(args)
except Exception as e:
raise TrestleError(
f"Initialization failed for temporary trestle directory: {e}."
)
repo = Repo.init(repo_path)
with repo.config_writer() as config:
config.set_value("user", "email", "[email protected]")
config.set_value("user", "name", "Test User")
repo.git.add(all=True)
repo.index.commit("Initial commit")
# Create a default branch (main)
repo.git.checkout("-b", "main")
return repo


def args_dict_to_list(args_dict: Dict[str, str]) -> List[str]:
"""Transform dictionary of args to a list of args."""
args = []
Expand Down Expand Up @@ -321,20 +354,62 @@ def build_test_command(
command_name: str,
command_args: Dict[str, str],
image_name: str = TRESTLEBOT_TEST_IMAGE_NAME,
upstream_repo: str = "",
) -> List[str]:
"""Build a command to be run in the shell for trestlebot"""
return [
"""
Build a command to be run in the shell for trestlebot
Args:
data_path (str): Path to the data directory. This is the working directory/trestle_root.
command_name (str): Name of the command to run. It should be a trestlebot command.
command_args (Dict[str, str]): Arguments to pass to the command
image_name (str, optional): Name of the image to run. Defaults to TRESTLEBOT_TEST_IMAGE_NAME.
upstream_repo (str, optional): Path to the upstream repo. Defaults to "" and is not mounted.
Returns:
List[str]: Command to be run in the shell
"""
command = [
"podman",
"run",
"--pod",
TRESTLEBOT_TEST_POD_NAME,
"--entrypoint",
f"trestlebot-{command_name}",
"--rm",
"-v",
f"{data_path}:/trestle",
"-w",
"/trestle",
image_name,
*args_dict_to_list(command_args),
]

# Add mounts
if upstream_repo:
# Add a volume and mount it to the container
command.extend(["-v", f"{upstream_repo}:{UPSTREAM_REPO}"])
command.extend(
[
"-v",
f"{data_path}:/trestle",
"-w",
"/trestle",
image_name,
*args_dict_to_list(command_args),
]
)
return command


def prepare_upstream_repo() -> str:
"""Prepare a temporary upstream repo for testing."""
tmp_dir = pathlib.Path(tempfile.mkdtemp())
repo: Repo = repo_setup(tmp_dir)
load_from_json(
tmp_dir, "simplified_nist_catalog", "simplified_nist_catalog", cat.Catalog
)
load_from_json(
tmp_dir,
"simplified_nist_profile_upstream",
"simplified_nist_profile",
prof.Profile,
)
repo.git.add(all=True)
repo.index.commit("Add updated profile")
repo.close()
return str(tmp_dir)
Loading

0 comments on commit f0aec98

Please sign in to comment.