diff --git a/anta/models.py b/anta/models.py index 4bc9e4043..8ec23dfd4 100644 --- a/anta/models.py +++ b/anta/models.py @@ -8,6 +8,7 @@ import hashlib import logging +import re import time from abc import ABC, abstractmethod from copy import deepcopy @@ -34,6 +35,9 @@ DEFAULT_TAG = "all" +# TODO - make this configurable - with an env var maybe? +BLACKLIST_REGEX = [r"^reload.*", r"^conf\w*\s*(terminal|session)*", r"^wr\w*\s*\w+"] + logger = logging.getLogger(__name__) @@ -412,12 +416,25 @@ def render(self, template: AntaTemplate) -> list[AntaCommand]: no AntaTemplate for this test.""" raise NotImplementedError(f"AntaTemplate are provided but render() method has not been implemented for {self.__module__}.{self.name}") + @property + def blocked(self) -> bool: + """Check if CLI commands contain a blocked keyword.""" + state = False + for command in self.instance_commands: + for pattern in BLACKLIST_REGEX: + if re.match(pattern, command.command): + self.logger.error(f"Command <{command.command}> is blocked for security reason matching {BLACKLIST_REGEX}") + self.result.is_error(f"<{command.command}> is blocked for security reason") + state = True + return state + async def collect(self) -> None: """ Method used to collect outputs of all commands of this test class from the device of this test instance. """ try: - await self.device.collect_commands(self.instance_commands) + if self.blocked is False: + await self.device.collect_commands(self.instance_commands) except Exception as e: # pylint: disable=broad-exception-caught message = f"Exception raised while collecting commands for test {self.name} (on device {self.device.name})" anta_log_exception(e, message, self.logger) diff --git a/docs/api/models.md b/docs/api/models.md index 662eed657..c0f449486 100644 --- a/docs/api/models.md +++ b/docs/api/models.md @@ -21,6 +21,13 @@ ![](../imgs/uml/anta.models.AntaCommand.jpeg) ### ::: anta.models.AntaCommand +!!! warning + CLI commands are protected to avoid execution of critical commands such as `reload` or `write erase`. + + - Reload command: `^reload\s*\w*` + - Configure mode: `^conf\w*\s*(terminal|session)*` + - Write: `^wr\w*\s*\w+` + # Template definition ## UML Diagram diff --git a/tests/units/test_models.py b/tests/units/test_models.py index 96af6e7c2..9837aaf29 100644 --- a/tests/units/test_models.py +++ b/tests/units/test_models.py @@ -266,3 +266,29 @@ def test_test(self, mocked_device: MagicMock, data: dict[str, Any]) -> None: # Test that the test() code works as expected if "message" in data["expected"]["test"]: assert data["expected"]["test"]["message"] in test.result.messages + + +ANTATEST_BLACKLIST_DATA = ["reload", "reload --force", "write", "wr mem"] + + +@pytest.mark.parametrize("data", ANTATEST_BLACKLIST_DATA) +def test_blacklist(mocked_device: MagicMock, data: str) -> None: + """Test for blacklisting function.""" + + class FakeTestWithBlacklist(AntaTest): + """Fake Test for blacklist""" + + name = "FakeTestWithBlacklist" + description = "ANTA test that has blacklisted command" + categories = [] + commands = [AntaCommand(command=data)] + + @AntaTest.anta_test + def test(self) -> None: + self.result.is_success() + + test_instance = FakeTestWithBlacklist(mocked_device, inputs=None) + + # Run the test() method + asyncio.run(test_instance.test()) + assert test_instance.result.result == "error"