Skip to content

Commit

Permalink
feat(transformer): Add CSV to YAML with empty writer (#48)
Browse files Browse the repository at this point in the history
Bootstraps component definitions with a rules view
and a template. Also adds class for CSV to YAML
transformations.

Related PSCE-238
Majority of code generated by chatGPT
Signed-off-by: Alex Flom <[email protected]>
  • Loading branch information
Alex Flom committed Oct 3, 2023
1 parent ec5a932 commit fb1ad0b
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 1 deletion.
12 changes: 11 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

88 changes: 88 additions & 0 deletions tests/trestlebot/transformers/test_csv_to_yaml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/python

# Copyright 2023 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import csv
import pathlib
from dataclasses import fields

import pytest
import ruamel.yaml as yaml

from trestlebot.transformers.csv_to_yaml import YAMLBuilder
from trestlebot.transformers.trestle_rule import TrestleRule


@pytest.fixture(scope="function")
def setup_yaml_builder() -> YAMLBuilder:
return YAMLBuilder()


def write_sample_csv(csv_file: pathlib.Path) -> None:
with open(csv_file, "w", newline="") as csvfile:
fieldnames = [
"RULE_ID",
"RULE_DESCRIPTION",
"PROFILE_DESCRIPTION",
"PROFILE_SOURCE",
"CONTROL_ID_LIST",
"COMPONENT_TITLE",
"COMPONENT_DESCRIPTION",
"COMPONENT_TYPE",
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerow(
{
"RULE_ID": "Rule1",
"RULE_DESCRIPTION": "Description1",
"PROFILE_DESCRIPTION": "ProfileDesc1",
"PROFILE_SOURCE": "http://example.com",
"CONTROL_ID_LIST": "C1, C2",
"COMPONENT_TITLE": "Component1",
"COMPONENT_DESCRIPTION": "ComponentDesc1",
"COMPONENT_TYPE": "Type1",
}
)


def test_read_from_csv(setup_yaml_builder: YAMLBuilder, tmp_trestle_dir: str) -> None:
csv_file = pathlib.Path(tmp_trestle_dir) / "test.csv"
write_sample_csv(csv_file)
setup_yaml_builder.read_from_csv(csv_file)
assert len(setup_yaml_builder._rules) == 1


def test_write_to_yaml(setup_yaml_builder: YAMLBuilder, tmp_trestle_dir: str) -> None:
csv_file = pathlib.Path(tmp_trestle_dir) / "test.csv"
yaml_file = pathlib.Path(tmp_trestle_dir) / "test.yaml"
write_sample_csv(csv_file)
setup_yaml_builder.read_from_csv(csv_file)
setup_yaml_builder.write_to_yaml(yaml_file)
with open(yaml_file, "r") as f:
data = yaml.safe_load(f)
assert len(data) == 1


def test_write_empty_trestle_rule_keys(
setup_yaml_builder: YAMLBuilder, tmp_trestle_dir: str
) -> None:
yaml_file = pathlib.Path(tmp_trestle_dir) / "test.yaml"
setup_yaml_builder.write_empty_trestle_rule_keys(yaml_file)
with open(yaml_file, "r") as f:
data = yaml.safe_load(f)
assert all(value == "" for value in data.values())
expected_keys = {field.name for field in fields(TrestleRule)}
assert expected_keys == set(data.keys())
2 changes: 2 additions & 0 deletions trestlebot/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,5 @@
COMPONENT_INFO_TAG = trestle_const.TRESTLE_TAG + "component-info"

YAML_EXTENSION = ".yaml"

RULES_VIEW_DIR = "rules"
14 changes: 14 additions & 0 deletions trestlebot/tasks/authored/compdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
from trestle.core.profile_resolver import ProfileResolver
from trestle.core.repository import AgileAuthoring

from trestlebot.const import RULES_VIEW_DIR
from trestlebot.tasks.authored.base_authored import (
AuthoredObjectException,
AuthorObjectBase,
)
from trestlebot.transformers.csv_to_yaml import YAMLBuilder


class AuthoredComponentsDefinition(AuthorObjectBase):
Expand Down Expand Up @@ -170,6 +172,18 @@ def create_new_default(
cd_path.parent.mkdir(parents=True, exist_ok=True)
comp_data.oscal_write(path=cd_path) # type: ignore

for component in comp_data.components:
ruledir: pathlib.Path = trestle_root.joinpath(
RULES_VIEW_DIR,
compdef_name,
component.title,
"rule_template.yaml",
)
ruledir.parent.mkdir(parents=True, exist_ok=True)

empty_yaml = YAMLBuilder()
empty_yaml.write_empty_trestle_rule_keys(ruledir)


def get_control_implementation(
component: comp.DefinedComponent, source: str, description: str, controls: List[str]
Expand Down
137 changes: 137 additions & 0 deletions trestlebot/transformers/csv_to_yaml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#!/usr/bin/python

# Copyright 2023 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import csv
import json
import pathlib
from dataclasses import asdict, fields
from typing import Dict, List, Optional

import ruamel.yaml as yaml
import trestle.tasks.csv_to_oscal_cd as csv_to_oscal_cd

from trestlebot import const
from trestlebot.transformers.trestle_rule import (
ComponentInfo,
Control,
Parameter,
Profile,
TrestleRule,
)


class YAMLBuilder:
def __init__(self) -> None:
"""Initialize."""
self._rules: List[TrestleRule] = []

def read_from_csv(self, filepath: pathlib.Path) -> None:
"""Read from a CSV file and populate self._rules."""
try:
with open(filepath, mode="r", newline="") as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
self._rules.append(self._csv_to_rule(row))
except Exception as e:
raise CSVReadError(f"Failed to read from CSV file: {e}")

def _csv_to_rule(self, row: Dict[str, str]) -> TrestleRule:
"""Transform a CSV row to a TrestleRule object."""
rule_info = self._extract_rule_info(row)
profile = self._extract_profile(row)
component_info = self._extract_component_info(row)
parameter = self._extract_parameter(row)

return TrestleRule(
name=rule_info[const.NAME],
description=rule_info[const.DESCRIPTION],
component=component_info,
parameter=parameter,
profile=profile,
)

def _extract_rule_info(self, row: Dict[str, str]) -> Dict[str, str]:
"""Extract rule information from a CSV row."""
return {
"name": row.get(csv_to_oscal_cd.RULE_ID, ""),
"description": row.get(csv_to_oscal_cd.RULE_DESCRIPTION, ""),
}

def _extract_profile(self, row: Dict[str, str]) -> Profile:
"""Extract profile information from a CSV row."""
controls_list = row.get(csv_to_oscal_cd.CONTROL_ID_LIST, "").split(", ")
return Profile(
description=row.get(csv_to_oscal_cd.PROFILE_DESCRIPTION, ""),
href=row.get(csv_to_oscal_cd.PROFILE_SOURCE, ""),
include_controls=[
Control(id=control_id.strip()) for control_id in controls_list
],
)

def _extract_parameter(self, row: Dict[str, str]) -> Optional[Parameter]:
"""Extract parameter information from a CSV row."""
parameter_name = row.get(csv_to_oscal_cd.PARAMETER_ID, None)
if parameter_name:
return Parameter(
name=parameter_name,
description=row.get(csv_to_oscal_cd.PARAMETER_DESCRIPTION, ""),
alternative_values=json.loads(
row.get(csv_to_oscal_cd.PARAMETER_VALUE_ALTERNATIVES, "{}")
),
default_value=row.get(csv_to_oscal_cd.PARAMETER_VALUE_DEFAULT, ""),
)
return None

def _extract_component_info(self, row: Dict[str, str]) -> ComponentInfo:
"""Extract component information from a CSV row."""
return ComponentInfo(
name=row.get(csv_to_oscal_cd.COMPONENT_TITLE, ""),
type=row.get(csv_to_oscal_cd.COMPONENT_TYPE, ""),
description=row.get(csv_to_oscal_cd.COMPONENT_DESCRIPTION, ""),
)

def write_to_yaml(self, filepath: pathlib.Path) -> None:
"""Write the rules to a YAML file."""
try:
with open(filepath, "w") as yaml_file:
yaml.dump(
[asdict(rule) for rule in self._rules], yaml_file
) # Use Python's built-in asdict
except Exception as e:
raise YAMLWriteError(f"Failed to write rules to YAML file: {e}")

def write_empty_trestle_rule_keys(self, filepath: pathlib.Path) -> None:
"""Write empty TrestleRule keys to a YAML file."""
try:
empty_dict = {f.name: "" for f in fields(TrestleRule)}
with open(filepath, "w") as yaml_file:
yaml.dump(empty_dict, yaml_file)
except Exception as e:
raise YAMLWriteError(
f"Failed to write empty TrestleRule keys to YAML file: {e}"
)


class YAMLWriteError(Exception):
"""Exception raised for errors during YAML writing."""

pass


class CSVReadError(Exception):
"""Exception raised for errors during CSV reading."""

pass

0 comments on commit fb1ad0b

Please sign in to comment.