Skip to content

Commit

Permalink
feat(anta): Add support for command blacklist to secure tests (#416)
Browse files Browse the repository at this point in the history
* feat(anta): Add support for command blacklist to secure tests

* doc: Update with blacklist support

* Update anta/models.py

Co-authored-by: Guillaume Mulocher <[email protected]>

* code review

* fix(anta): Use rendered commands in blocked

* ci(anta): Add unit tests for blacklist commands

---------

Co-authored-by: Guillaume Mulocher <[email protected]>
  • Loading branch information
titom73 and gmuloc authored Oct 4, 2023
1 parent 032ca14 commit c6b1d2a
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 1 deletion.
19 changes: 18 additions & 1 deletion anta/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import hashlib
import logging
import re
import time
from abc import ABC, abstractmethod
from copy import deepcopy
Expand All @@ -34,6 +35,9 @@

DEFAULT_TAG = "all"

# TODO - make this configurable - with an env var maybe?
BLACKLIST_REGEX = [r"^reload.*", r"^conf\w*\s*(terminal|session)*", r"^wr\w*\s*\w+"]

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -412,12 +416,25 @@ def render(self, template: AntaTemplate) -> list[AntaCommand]:
no AntaTemplate for this test."""
raise NotImplementedError(f"AntaTemplate are provided but render() method has not been implemented for {self.__module__}.{self.name}")

@property
def blocked(self) -> bool:
"""Check if CLI commands contain a blocked keyword."""
state = False
for command in self.instance_commands:
for pattern in BLACKLIST_REGEX:
if re.match(pattern, command.command):
self.logger.error(f"Command <{command.command}> is blocked for security reason matching {BLACKLIST_REGEX}")
self.result.is_error(f"<{command.command}> is blocked for security reason")
state = True
return state

async def collect(self) -> None:
"""
Method used to collect outputs of all commands of this test class from the device of this test instance.
"""
try:
await self.device.collect_commands(self.instance_commands)
if self.blocked is False:
await self.device.collect_commands(self.instance_commands)
except Exception as e: # pylint: disable=broad-exception-caught
message = f"Exception raised while collecting commands for test {self.name} (on device {self.device.name})"
anta_log_exception(e, message, self.logger)
Expand Down
7 changes: 7 additions & 0 deletions docs/api/models.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
![](../imgs/uml/anta.models.AntaCommand.jpeg)
### ::: anta.models.AntaCommand

!!! warning
CLI commands are protected to avoid execution of critical commands such as `reload` or `write erase`.

- Reload command: `^reload\s*\w*`
- Configure mode: `^conf\w*\s*(terminal|session)*`
- Write: `^wr\w*\s*\w+`

# Template definition

## UML Diagram
Expand Down
26 changes: 26 additions & 0 deletions tests/units/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,29 @@ def test_test(self, mocked_device: MagicMock, data: dict[str, Any]) -> None:
# Test that the test() code works as expected
if "message" in data["expected"]["test"]:
assert data["expected"]["test"]["message"] in test.result.messages


ANTATEST_BLACKLIST_DATA = ["reload", "reload --force", "write", "wr mem"]


@pytest.mark.parametrize("data", ANTATEST_BLACKLIST_DATA)
def test_blacklist(mocked_device: MagicMock, data: str) -> None:
"""Test for blacklisting function."""

class FakeTestWithBlacklist(AntaTest):
"""Fake Test for blacklist"""

name = "FakeTestWithBlacklist"
description = "ANTA test that has blacklisted command"
categories = []
commands = [AntaCommand(command=data)]

@AntaTest.anta_test
def test(self) -> None:
self.result.is_success()

test_instance = FakeTestWithBlacklist(mocked_device, inputs=None)

# Run the test() method
asyncio.run(test_instance.test())
assert test_instance.result.result == "error"

0 comments on commit c6b1d2a

Please sign in to comment.