diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 75d4388a1..9da8faaad 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -32,7 +32,7 @@ repos: - name: Check and insert license on Markdown files id: insert-license files: .*\.md$ - # exclude: + exclude: ^tests/data/.*\.md$ args: - --license-filepath - .github/license-short.txt @@ -43,7 +43,7 @@ repos: - '' - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.6.1 + rev: v0.6.4 hooks: - id: ruff name: Run Ruff linter @@ -52,7 +52,7 @@ repos: name: Run Ruff formatter - repo: https://github.com/pycqa/pylint - rev: "v3.2.6" + rev: "v3.2.7" hooks: - id: pylint name: Check code style with pylint @@ -80,7 +80,7 @@ repos: types: [text] - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.11.1 + rev: v1.11.2 hooks: - id: mypy name: Check typing with mypy diff --git a/anta/catalog.py b/anta/catalog.py index 34f67f701..192e17bf1 100644 --- a/anta/catalog.py +++ b/anta/catalog.py @@ -10,9 +10,11 @@ import math from collections import defaultdict from inspect import isclass +from itertools import chain from json import load as json_load from pathlib import Path from typing import TYPE_CHECKING, Any, Literal, Optional, Union +from warnings import warn from pydantic import BaseModel, ConfigDict, RootModel, ValidationError, ValidationInfo, field_validator, model_serializer, model_validator from pydantic.types import ImportString @@ -386,6 +388,21 @@ def from_list(data: ListAntaTestTuples) -> AntaCatalog: raise return AntaCatalog(tests) + @classmethod + def merge_catalogs(cls, catalogs: list[AntaCatalog]) -> AntaCatalog: + """Merge multiple AntaCatalog instances. + + Parameters + ---------- + catalogs: A list of AntaCatalog instances to merge. + + Returns + ------- + A new AntaCatalog instance containing the tests of all the input catalogs. + """ + combined_tests = list(chain(*(catalog.tests for catalog in catalogs))) + return cls(tests=combined_tests) + def merge(self, catalog: AntaCatalog) -> AntaCatalog: """Merge two AntaCatalog instances. @@ -397,7 +414,13 @@ def merge(self, catalog: AntaCatalog) -> AntaCatalog: ------- A new AntaCatalog instance containing the tests of the two instances. """ - return AntaCatalog(tests=self.tests + catalog.tests) + # TODO: Use a decorator to deprecate this method instead. See https://github.com/aristanetworks/anta/issues/754 + warn( + message="AntaCatalog.merge() is deprecated and will be removed in ANTA v2.0. Use AntaCatalog.merge_catalogs() instead.", + category=DeprecationWarning, + stacklevel=2, + ) + return self.merge_catalogs([self, catalog]) def dump(self) -> AntaCatalogFile: """Return an AntaCatalogFile instance from this AntaCatalog instance. diff --git a/anta/cli/debug/commands.py b/anta/cli/debug/commands.py index 14f168ba4..1304758a4 100644 --- a/anta/cli/debug/commands.py +++ b/anta/cli/debug/commands.py @@ -72,13 +72,16 @@ def run_template( revision: int, ) -> None: # pylint: disable=too-many-arguments + # Using \b for click + # ruff: noqa: D301 """Run arbitrary templated command to an ANTA device. Takes a list of arguments (keys followed by a value) to build a dictionary used as template parameters. - Example: + \b + Example ------- - anta debug run-template -d leaf1a -t 'show vlan {vlan_id}' vlan_id 1 + anta debug run-template -d leaf1a -t 'show vlan {vlan_id}' vlan_id 1 """ template_params = dict(zip(params[::2], params[1::2])) diff --git a/anta/cli/debug/utils.py b/anta/cli/debug/utils.py index 04a7a38b1..4e20c5a74 100644 --- a/anta/cli/debug/utils.py +++ b/anta/cli/debug/utils.py @@ -11,7 +11,7 @@ import click -from anta.cli.utils import ExitCode, inventory_options +from anta.cli.utils import ExitCode, core_options if TYPE_CHECKING: from anta.inventory import AntaInventory @@ -22,7 +22,7 @@ def debug_options(f: Callable[..., Any]) -> Callable[..., Any]: """Click common options required to execute a command on a specific device.""" - @inventory_options + @core_options @click.option( "--ofmt", type=click.Choice(["json", "text"]), @@ -44,7 +44,6 @@ def wrapper( ctx: click.Context, *args: tuple[Any], inventory: AntaInventory, - tags: set[str] | None, device: str, **kwargs: Any, ) -> Any: diff --git a/anta/cli/nrfu/__init__.py b/anta/cli/nrfu/__init__.py index a85277102..d573b49c7 100644 --- a/anta/cli/nrfu/__init__.py +++ b/anta/cli/nrfu/__init__.py @@ -5,14 +5,14 @@ from __future__ import annotations -from typing import TYPE_CHECKING, get_args +from typing import TYPE_CHECKING import click from anta.cli.nrfu import commands from anta.cli.utils import AliasedGroup, catalog_options, inventory_options -from anta.custom_types import TestStatus from anta.result_manager import ResultManager +from anta.result_manager.models import AntaTestStatus if TYPE_CHECKING: from anta.catalog import AntaCatalog @@ -49,7 +49,7 @@ def parse_args(self, ctx: click.Context, args: list[str]) -> list[str]: return super().parse_args(ctx, args) -HIDE_STATUS: list[str] = list(get_args(TestStatus)) +HIDE_STATUS: list[str] = list(AntaTestStatus) HIDE_STATUS.remove("unset") @@ -147,3 +147,4 @@ def nrfu( nrfu.add_command(commands.json) nrfu.add_command(commands.text) nrfu.add_command(commands.tpl_report) +nrfu.add_command(commands.md_report) diff --git a/anta/cli/nrfu/commands.py b/anta/cli/nrfu/commands.py index cd750cb85..a5492680b 100644 --- a/anta/cli/nrfu/commands.py +++ b/anta/cli/nrfu/commands.py @@ -13,7 +13,7 @@ from anta.cli.utils import exit_with_code -from .utils import print_jinja, print_json, print_table, print_text, run_tests, save_to_csv +from .utils import print_jinja, print_json, print_table, print_text, run_tests, save_markdown_report, save_to_csv logger = logging.getLogger(__name__) @@ -28,7 +28,7 @@ required=False, ) def table(ctx: click.Context, group_by: Literal["device", "test"] | None) -> None: - """ANTA command to check network states with table result.""" + """ANTA command to check network state with table results.""" run_tests(ctx) print_table(ctx, group_by=group_by) exit_with_code(ctx) @@ -42,10 +42,10 @@ def table(ctx: click.Context, group_by: Literal["device", "test"] | None) -> Non type=click.Path(file_okay=True, dir_okay=False, exists=False, writable=True, path_type=pathlib.Path), show_envvar=True, required=False, - help="Path to save report as a file", + help="Path to save report as a JSON file", ) def json(ctx: click.Context, output: pathlib.Path | None) -> None: - """ANTA command to check network state with JSON result.""" + """ANTA command to check network state with JSON results.""" run_tests(ctx) print_json(ctx, output=output) exit_with_code(ctx) @@ -54,7 +54,7 @@ def json(ctx: click.Context, output: pathlib.Path | None) -> None: @click.command() @click.pass_context def text(ctx: click.Context) -> None: - """ANTA command to check network states with text result.""" + """ANTA command to check network state with text results.""" run_tests(ctx) print_text(ctx) exit_with_code(ctx) @@ -105,3 +105,19 @@ def tpl_report(ctx: click.Context, template: pathlib.Path, output: pathlib.Path run_tests(ctx) print_jinja(results=ctx.obj["result_manager"], template=template, output=output) exit_with_code(ctx) + + +@click.command() +@click.pass_context +@click.option( + "--md-output", + type=click.Path(file_okay=True, dir_okay=False, exists=False, writable=True, path_type=pathlib.Path), + show_envvar=True, + required=True, + help="Path to save the report as a Markdown file", +) +def md_report(ctx: click.Context, md_output: pathlib.Path) -> None: + """ANTA command to check network state with Markdown report.""" + run_tests(ctx) + save_markdown_report(ctx, md_output=md_output) + exit_with_code(ctx) diff --git a/anta/cli/nrfu/utils.py b/anta/cli/nrfu/utils.py index 284c9b709..748578dec 100644 --- a/anta/cli/nrfu/utils.py +++ b/anta/cli/nrfu/utils.py @@ -19,6 +19,7 @@ from anta.models import AntaTest from anta.reporter import ReportJinja, ReportTable from anta.reporter.csv_reporter import ReportCsv +from anta.reporter.md_reporter import MDReportGenerator from anta.runner import main if TYPE_CHECKING: @@ -94,14 +95,21 @@ def print_table(ctx: click.Context, group_by: Literal["device", "test"] | None = def print_json(ctx: click.Context, output: pathlib.Path | None = None) -> None: - """Print result in a json format.""" + """Print results as JSON. If output is provided, save to file instead.""" results = _get_result_manager(ctx) - console.print() - console.print(Panel("JSON results", style="cyan")) - rich.print_json(results.json) - if output is not None: - with output.open(mode="w", encoding="utf-8") as fout: - fout.write(results.json) + + if output is None: + console.print() + console.print(Panel("JSON results", style="cyan")) + rich.print_json(results.json) + else: + try: + with output.open(mode="w", encoding="utf-8") as file: + file.write(results.json) + console.print(f"JSON results saved to {output} ✅", style="cyan") + except OSError: + console.print(f"Failed to save JSON results to {output} ❌", style="cyan") + ctx.exit(ExitCode.USAGE_ERROR) def print_text(ctx: click.Context) -> None: @@ -134,6 +142,22 @@ def save_to_csv(ctx: click.Context, csv_file: pathlib.Path) -> None: ctx.exit(ExitCode.USAGE_ERROR) +def save_markdown_report(ctx: click.Context, md_output: pathlib.Path) -> None: + """Save the markdown report to a file. + + Parameters + ---------- + ctx: Click context containing the result manager. + md_output: Path to save the markdown report. + """ + try: + MDReportGenerator.generate(results=_get_result_manager(ctx), md_filename=md_output) + console.print(f"Markdown report saved to {md_output} ✅", style="cyan") + except OSError: + console.print(f"Failed to save Markdown report to {md_output} ❌", style="cyan") + ctx.exit(ExitCode.USAGE_ERROR) + + # Adding our own ANTA spinner - overriding rich SPINNERS for our own # so ignore warning for redefinition rich.spinner.SPINNERS = { # type: ignore[attr-defined] diff --git a/anta/cli/utils.py b/anta/cli/utils.py index 6d31e55ae..2f6e7d302 100644 --- a/anta/cli/utils.py +++ b/anta/cli/utils.py @@ -112,7 +112,7 @@ def resolve_command(self, ctx: click.Context, args: Any) -> Any: return cmd.name, cmd, args -def inventory_options(f: Callable[..., Any]) -> Callable[..., Any]: +def core_options(f: Callable[..., Any]) -> Callable[..., Any]: """Click common options when requiring an inventory to interact with devices.""" @click.option( @@ -190,22 +190,12 @@ def inventory_options(f: Callable[..., Any]) -> Callable[..., Any]: required=True, type=click.Path(file_okay=True, dir_okay=False, exists=True, readable=True, path_type=Path), ) - @click.option( - "--tags", - help="List of tags using comma as separator: tag1,tag2,tag3.", - show_envvar=True, - envvar="ANTA_TAGS", - type=str, - required=False, - callback=parse_tags, - ) @click.pass_context @functools.wraps(f) def wrapper( ctx: click.Context, *args: tuple[Any], inventory: Path, - tags: set[str] | None, username: str, password: str | None, enable_password: str | None, @@ -219,7 +209,7 @@ def wrapper( # pylint: disable=too-many-arguments # If help is invoke somewhere, do not parse inventory if ctx.obj.get("_anta_help"): - return f(*args, inventory=None, tags=tags, **kwargs) + return f(*args, inventory=None, **kwargs) if prompt: # User asked for a password prompt if password is None: @@ -255,7 +245,37 @@ def wrapper( ) except (TypeError, ValueError, YAMLError, OSError, InventoryIncorrectSchemaError, InventoryRootKeyError): ctx.exit(ExitCode.USAGE_ERROR) - return f(*args, inventory=i, tags=tags, **kwargs) + return f(*args, inventory=i, **kwargs) + + return wrapper + + +def inventory_options(f: Callable[..., Any]) -> Callable[..., Any]: + """Click common options when requiring an inventory to interact with devices.""" + + @core_options + @click.option( + "--tags", + help="List of tags using comma as separator: tag1,tag2,tag3.", + show_envvar=True, + envvar="ANTA_TAGS", + type=str, + required=False, + callback=parse_tags, + ) + @click.pass_context + @functools.wraps(f) + def wrapper( + ctx: click.Context, + *args: tuple[Any], + tags: set[str] | None, + **kwargs: dict[str, Any], + ) -> Any: + # pylint: disable=too-many-arguments + # If help is invoke somewhere, do not parse inventory + if ctx.obj.get("_anta_help"): + return f(*args, tags=tags, **kwargs) + return f(*args, tags=tags, **kwargs) return wrapper diff --git a/anta/constants.py b/anta/constants.py new file mode 100644 index 000000000..175a4adcc --- /dev/null +++ b/anta/constants.py @@ -0,0 +1,19 @@ +# Copyright (c) 2023-2024 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the LICENSE file. +"""Constants used in ANTA.""" + +from __future__ import annotations + +ACRONYM_CATEGORIES: set[str] = {"aaa", "mlag", "snmp", "bgp", "ospf", "vxlan", "stp", "igmp", "ip", "lldp", "ntp", "bfd", "ptp", "lanz", "stun", "vlan"} +"""A set of network protocol or feature acronyms that should be represented in uppercase.""" + +MD_REPORT_TOC = """**Table of Contents:** + +- [ANTA Report](#anta-report) + - [Test Results Summary](#test-results-summary) + - [Summary Totals](#summary-totals) + - [Summary Totals Device Under Test](#summary-totals-device-under-test) + - [Summary Totals Per Category](#summary-totals-per-category) + - [Test Results](#test-results)""" +"""Table of Contents for the Markdown report.""" diff --git a/anta/custom_types.py b/anta/custom_types.py index 56c213977..322fa4aca 100644 --- a/anta/custom_types.py +++ b/anta/custom_types.py @@ -112,9 +112,6 @@ def validate_regex(value: str) -> str: return value -# ANTA framework -TestStatus = Literal["unset", "success", "failure", "error", "skipped"] - # AntaTest.Input types AAAAuthMethod = Annotated[str, AfterValidator(aaa_group_prefix)] Vlan = Annotated[int, Field(ge=0, le=4094)] @@ -198,3 +195,4 @@ def validate_regex(value: str) -> str: "prefixRtMembershipDroppedMaxRouteLimitViolated", ] BgpUpdateError = Literal["inUpdErrWithdraw", "inUpdErrIgnore", "inUpdErrDisableAfiSafi", "disabledAfiSafi", "lastUpdErrTime"] +BfdProtocol = Literal["bgp", "isis", "lag", "ospf", "ospfv3", "pim", "route-input", "static-bfd", "static-route", "vrrp", "vxlan"] diff --git a/anta/reporter/__init__.py b/anta/reporter/__init__.py index 84fe388cc..b5bc381c0 100644 --- a/anta/reporter/__init__.py +++ b/anta/reporter/__init__.py @@ -18,9 +18,8 @@ if TYPE_CHECKING: import pathlib - from anta.custom_types import TestStatus from anta.result_manager import ResultManager - from anta.result_manager.models import TestResult + from anta.result_manager.models import AntaTestStatus, TestResult logger = logging.getLogger(__name__) @@ -80,19 +79,19 @@ def _build_headers(self, headers: list[str], table: Table) -> Table: table.add_column(header, justify="left") return table - def _color_result(self, status: TestStatus) -> str: - """Return a colored string based on the status value. + def _color_result(self, status: AntaTestStatus) -> str: + """Return a colored string based on an AntaTestStatus. Parameters ---------- - status (TestStatus): status value to color. + status: AntaTestStatus enum to color. Returns ------- - str: the colored string + The colored string. """ - color = RICH_COLOR_THEME.get(status, "") + color = RICH_COLOR_THEME.get(str(status), "") return f"[{color}]{status}" if color != "" else str(status) def report_all(self, manager: ResultManager, title: str = "All tests results") -> Table: @@ -154,21 +153,15 @@ def report_summary_tests( self.Headers.list_of_error_nodes, ] table = self._build_headers(headers=headers, table=table) - for test in manager.get_tests(): + for test, stats in sorted(manager.test_stats.items()): if tests is None or test in tests: - results = manager.filter_by_tests({test}).results - nb_failure = len([result for result in results if result.result == "failure"]) - nb_error = len([result for result in results if result.result == "error"]) - list_failure = [result.name for result in results if result.result in ["failure", "error"]] - nb_success = len([result for result in results if result.result == "success"]) - nb_skipped = len([result for result in results if result.result == "skipped"]) table.add_row( test, - str(nb_success), - str(nb_skipped), - str(nb_failure), - str(nb_error), - str(list_failure), + str(stats.devices_success_count), + str(stats.devices_skipped_count), + str(stats.devices_failure_count), + str(stats.devices_error_count), + ", ".join(stats.devices_failure), ) return table @@ -202,21 +195,15 @@ def report_summary_devices( self.Headers.list_of_error_tests, ] table = self._build_headers(headers=headers, table=table) - for device in manager.get_devices(): + for device, stats in sorted(manager.device_stats.items()): if devices is None or device in devices: - results = manager.filter_by_devices({device}).results - nb_failure = len([result for result in results if result.result == "failure"]) - nb_error = len([result for result in results if result.result == "error"]) - list_failure = [result.test for result in results if result.result in ["failure", "error"]] - nb_success = len([result for result in results if result.result == "success"]) - nb_skipped = len([result for result in results if result.result == "skipped"]) table.add_row( device, - str(nb_success), - str(nb_skipped), - str(nb_failure), - str(nb_error), - str(list_failure), + str(stats.tests_success_count), + str(stats.tests_skipped_count), + str(stats.tests_failure_count), + str(stats.tests_error_count), + ", ".join(stats.tests_failure), ) return table diff --git a/anta/reporter/md_reporter.py b/anta/reporter/md_reporter.py new file mode 100644 index 000000000..7b97fb176 --- /dev/null +++ b/anta/reporter/md_reporter.py @@ -0,0 +1,288 @@ +# Copyright (c) 2023-2024 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the LICENSE file. +"""Markdown report generator for ANTA test results.""" + +from __future__ import annotations + +import logging +import re +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, ClassVar + +from anta.constants import MD_REPORT_TOC +from anta.logger import anta_log_exception +from anta.result_manager.models import AntaTestStatus + +if TYPE_CHECKING: + from collections.abc import Generator + from io import TextIOWrapper + from pathlib import Path + + from anta.result_manager import ResultManager + +logger = logging.getLogger(__name__) + + +# pylint: disable=too-few-public-methods +class MDReportGenerator: + """Class responsible for generating a Markdown report based on the provided `ResultManager` object. + + It aggregates different report sections, each represented by a subclass of `MDReportBase`, + and sequentially generates their content into a markdown file. + + The `generate` class method will loop over all the section subclasses and call their `generate_section` method. + The final report will be generated in the same order as the `sections` list of the method. + """ + + @classmethod + def generate(cls, results: ResultManager, md_filename: Path) -> None: + """Generate and write the various sections of the markdown report. + + Parameters + ---------- + results: The ResultsManager instance containing all test results. + md_filename: The path to the markdown file to write the report into. + """ + try: + with md_filename.open("w", encoding="utf-8") as mdfile: + sections: list[MDReportBase] = [ + ANTAReport(mdfile, results), + TestResultsSummary(mdfile, results), + SummaryTotals(mdfile, results), + SummaryTotalsDeviceUnderTest(mdfile, results), + SummaryTotalsPerCategory(mdfile, results), + TestResults(mdfile, results), + ] + for section in sections: + section.generate_section() + except OSError as exc: + message = f"OSError caught while writing the Markdown file '{md_filename.resolve()}'." + anta_log_exception(exc, message, logger) + raise + + +class MDReportBase(ABC): + """Base class for all sections subclasses. + + Every subclasses must implement the `generate_section` method that uses the `ResultManager` object + to generate and write content to the provided markdown file. + """ + + def __init__(self, mdfile: TextIOWrapper, results: ResultManager) -> None: + """Initialize the MDReportBase with an open markdown file object to write to and a ResultManager instance. + + Parameters + ---------- + mdfile: An open file object to write the markdown data into. + results: The ResultsManager instance containing all test results. + """ + self.mdfile = mdfile + self.results = results + + @abstractmethod + def generate_section(self) -> None: + """Abstract method to generate a specific section of the markdown report. + + Must be implemented by subclasses. + """ + msg = "Must be implemented by subclasses" + raise NotImplementedError(msg) + + def generate_rows(self) -> Generator[str, None, None]: + """Generate the rows of a markdown table for a specific report section. + + Subclasses can implement this method to generate the content of the table rows. + """ + msg = "Subclasses should implement this method" + raise NotImplementedError(msg) + + def generate_heading_name(self) -> str: + """Generate a formatted heading name based on the class name. + + Returns + ------- + str: Formatted header name. + + Example + ------- + - `ANTAReport` will become ANTA Report. + - `TestResultsSummary` will become Test Results Summary. + """ + class_name = self.__class__.__name__ + + # Split the class name into words, keeping acronyms together + words = re.findall(r"[A-Z]?[a-z]+|[A-Z]+(?=[A-Z][a-z]|\d|\W|$)|\d+", class_name) + + # Capitalize each word, but keep acronyms in all caps + formatted_words = [word if word.isupper() else word.capitalize() for word in words] + + return " ".join(formatted_words) + + def write_table(self, table_heading: list[str], *, last_table: bool = False) -> None: + """Write a markdown table with a table heading and multiple rows to the markdown file. + + Parameters + ---------- + table_heading: List of strings to join for the table heading. + last_table: Flag to determine if it's the last table of the markdown file to avoid unnecessary new line. Defaults to False. + """ + self.mdfile.write("\n".join(table_heading) + "\n") + for row in self.generate_rows(): + self.mdfile.write(row) + if not last_table: + self.mdfile.write("\n") + + def write_heading(self, heading_level: int) -> None: + """Write a markdown heading to the markdown file. + + The heading name used is the class name. + + Parameters + ---------- + heading_level: The level of the heading (1-6). + + Example + ------- + ## Test Results Summary + """ + # Ensure the heading level is within the valid range of 1 to 6 + heading_level = max(1, min(heading_level, 6)) + heading_name = self.generate_heading_name() + heading = "#" * heading_level + " " + heading_name + self.mdfile.write(f"{heading}\n\n") + + def safe_markdown(self, text: str | None) -> str: + """Escape markdown characters in the text to prevent markdown rendering issues. + + Parameters + ---------- + text: The text to escape markdown characters from. + + Returns + ------- + str: The text with escaped markdown characters. + """ + # Custom field from a TestResult object can be None + if text is None: + return "" + + # Replace newlines with spaces to keep content on one line + text = text.replace("\n", " ") + + # Replace backticks with single quotes + return text.replace("`", "'") + + +class ANTAReport(MDReportBase): + """Generate the `# ANTA Report` section of the markdown report.""" + + def generate_section(self) -> None: + """Generate the `# ANTA Report` section of the markdown report.""" + self.write_heading(heading_level=1) + toc = MD_REPORT_TOC + self.mdfile.write(toc + "\n\n") + + +class TestResultsSummary(MDReportBase): + """Generate the `## Test Results Summary` section of the markdown report.""" + + def generate_section(self) -> None: + """Generate the `## Test Results Summary` section of the markdown report.""" + self.write_heading(heading_level=2) + + +class SummaryTotals(MDReportBase): + """Generate the `### Summary Totals` section of the markdown report.""" + + TABLE_HEADING: ClassVar[list[str]] = [ + "| Total Tests | Total Tests Success | Total Tests Skipped | Total Tests Failure | Total Tests Error |", + "| ----------- | ------------------- | ------------------- | ------------------- | ------------------|", + ] + + def generate_rows(self) -> Generator[str, None, None]: + """Generate the rows of the summary totals table.""" + yield ( + f"| {self.results.get_total_results()} " + f"| {self.results.get_total_results({AntaTestStatus.SUCCESS})} " + f"| {self.results.get_total_results({AntaTestStatus.SKIPPED})} " + f"| {self.results.get_total_results({AntaTestStatus.FAILURE})} " + f"| {self.results.get_total_results({AntaTestStatus.ERROR})} |\n" + ) + + def generate_section(self) -> None: + """Generate the `### Summary Totals` section of the markdown report.""" + self.write_heading(heading_level=3) + self.write_table(table_heading=self.TABLE_HEADING) + + +class SummaryTotalsDeviceUnderTest(MDReportBase): + """Generate the `### Summary Totals Devices Under Tests` section of the markdown report.""" + + TABLE_HEADING: ClassVar[list[str]] = [ + "| Device Under Test | Total Tests | Tests Success | Tests Skipped | Tests Failure | Tests Error | Categories Skipped | Categories Failed |", + "| ------------------| ----------- | ------------- | ------------- | ------------- | ----------- | -------------------| ------------------|", + ] + + def generate_rows(self) -> Generator[str, None, None]: + """Generate the rows of the summary totals device under test table.""" + for device, stat in self.results.device_stats.items(): + total_tests = stat.tests_success_count + stat.tests_skipped_count + stat.tests_failure_count + stat.tests_error_count + categories_skipped = ", ".join(sorted(stat.categories_skipped)) + categories_failed = ", ".join(sorted(stat.categories_failed)) + yield ( + f"| {device} | {total_tests} | {stat.tests_success_count} | {stat.tests_skipped_count} | {stat.tests_failure_count} | {stat.tests_error_count} " + f"| {categories_skipped or '-'} | {categories_failed or '-'} |\n" + ) + + def generate_section(self) -> None: + """Generate the `### Summary Totals Devices Under Tests` section of the markdown report.""" + self.write_heading(heading_level=3) + self.write_table(table_heading=self.TABLE_HEADING) + + +class SummaryTotalsPerCategory(MDReportBase): + """Generate the `### Summary Totals Per Category` section of the markdown report.""" + + TABLE_HEADING: ClassVar[list[str]] = [ + "| Test Category | Total Tests | Tests Success | Tests Skipped | Tests Failure | Tests Error |", + "| ------------- | ----------- | ------------- | ------------- | ------------- | ----------- |", + ] + + def generate_rows(self) -> Generator[str, None, None]: + """Generate the rows of the summary totals per category table.""" + for category, stat in self.results.sorted_category_stats.items(): + total_tests = stat.tests_success_count + stat.tests_skipped_count + stat.tests_failure_count + stat.tests_error_count + yield ( + f"| {category} | {total_tests} | {stat.tests_success_count} | {stat.tests_skipped_count} | {stat.tests_failure_count} " + f"| {stat.tests_error_count} |\n" + ) + + def generate_section(self) -> None: + """Generate the `### Summary Totals Per Category` section of the markdown report.""" + self.write_heading(heading_level=3) + self.write_table(table_heading=self.TABLE_HEADING) + + +class TestResults(MDReportBase): + """Generates the `## Test Results` section of the markdown report.""" + + TABLE_HEADING: ClassVar[list[str]] = [ + "| Device Under Test | Categories | Test | Description | Custom Field | Result | Messages |", + "| ----------------- | ---------- | ---- | ----------- | ------------ | ------ | -------- |", + ] + + def generate_rows(self) -> Generator[str, None, None]: + """Generate the rows of the all test results table.""" + for result in self.results.get_results(sort_by=["name", "test"]): + messages = self.safe_markdown(", ".join(result.messages)) + categories = ", ".join(result.categories) + yield ( + f"| {result.name or '-'} | {categories or '-'} | {result.test or '-'} " + f"| {result.description or '-'} | {self.safe_markdown(result.custom_field) or '-'} | {result.result or '-'} | {messages or '-'} |\n" + ) + + def generate_section(self) -> None: + """Generate the `## Test Results` section of the markdown report.""" + self.write_heading(heading_level=2) + self.write_table(table_heading=self.TABLE_HEADING, last_table=True) diff --git a/anta/result_manager/__init__.py b/anta/result_manager/__init__.py index 906a71aa5..9702689ad 100644 --- a/anta/result_manager/__init__.py +++ b/anta/result_manager/__init__.py @@ -6,14 +6,14 @@ from __future__ import annotations import json -from typing import TYPE_CHECKING +from collections import defaultdict +from functools import cached_property +from itertools import chain -from pydantic import TypeAdapter +from anta.constants import ACRONYM_CATEGORIES +from anta.result_manager.models import AntaTestStatus, TestResult -from anta.custom_types import TestStatus - -if TYPE_CHECKING: - from anta.result_manager.models import TestResult +from .models import CategoryStats, DeviceStats, TestStats class ResultManager: @@ -21,52 +21,52 @@ class ResultManager: Examples -------- - Create Inventory: + Create Inventory: + + inventory_anta = AntaInventory.parse( + filename='examples/inventory.yml', + username='ansible', + password='ansible', + ) + + Create Result Manager: + + manager = ResultManager() + + Run tests for all connected devices: - inventory_anta = AntaInventory.parse( - filename='examples/inventory.yml', - username='ansible', - password='ansible', + for device in inventory_anta.get_inventory().devices: + manager.add( + VerifyNTP(device=device).test() + ) + manager.add( + VerifyEOSVersion(device=device).test(version='4.28.3M') ) - Create Result Manager: - - manager = ResultManager() - - Run tests for all connected devices: - - for device in inventory_anta.get_inventory().devices: - manager.add( - VerifyNTP(device=device).test() - ) - manager.add( - VerifyEOSVersion(device=device).test(version='4.28.3M') - ) - - Print result in native format: - - manager.results - [ - TestResult( - name="pf1", - test="VerifyZeroTouch", - categories=["configuration"], - description="Verifies ZeroTouch is disabled", - result="success", - messages=[], - custom_field=None, - ), - TestResult( - name="pf1", - test='VerifyNTP', - categories=["software"], - categories=['system'], - description='Verifies if NTP is synchronised.', - result='failure', - messages=["The device is not synchronized with the configured NTP server(s): 'NTP is disabled.'"], - custom_field=None, - ), - ] + Print result in native format: + + manager.results + [ + TestResult( + name="pf1", + test="VerifyZeroTouch", + categories=["configuration"], + description="Verifies ZeroTouch is disabled", + result="success", + messages=[], + custom_field=None, + ), + TestResult( + name="pf1", + test='VerifyNTP', + categories=["software"], + categories=['system'], + description='Verifies if NTP is synchronised.', + result='failure', + messages=["The device is not synchronized with the configured NTP server(s): 'NTP is disabled.'"], + custom_field=None, + ), + ] """ def __init__(self) -> None: @@ -91,9 +91,13 @@ def __init__(self) -> None: error_status is set to True. """ self._result_entries: list[TestResult] = [] - self.status: TestStatus = "unset" + self.status: AntaTestStatus = AntaTestStatus.UNSET self.error_status = False + self.device_stats: defaultdict[str, DeviceStats] = defaultdict(DeviceStats) + self.category_stats: defaultdict[str, CategoryStats] = defaultdict(CategoryStats) + self.test_stats: defaultdict[str, TestStats] = defaultdict(TestStats) + def __len__(self) -> int: """Implement __len__ method to count number of results.""" return len(self._result_entries) @@ -105,56 +109,164 @@ def results(self) -> list[TestResult]: @results.setter def results(self, value: list[TestResult]) -> None: + """Set the list of TestResult.""" + # When setting the results, we need to reset the state of the current instance self._result_entries = [] - self.status = "unset" + self.status = AntaTestStatus.UNSET self.error_status = False - for e in value: - self.add(e) + + # Also reset the stats attributes + self.device_stats = defaultdict(DeviceStats) + self.category_stats = defaultdict(CategoryStats) + self.test_stats = defaultdict(TestStats) + + for result in value: + self.add(result) @property def json(self) -> str: """Get a JSON representation of the results.""" return json.dumps([result.model_dump() for result in self._result_entries], indent=4) + @property + def sorted_category_stats(self) -> dict[str, CategoryStats]: + """A property that returns the category_stats dictionary sorted by key name.""" + return dict(sorted(self.category_stats.items())) + + @cached_property + def results_by_status(self) -> dict[AntaTestStatus, list[TestResult]]: + """A cached property that returns the results grouped by status.""" + return {status: [result for result in self._result_entries if result.result == status] for status in AntaTestStatus} + + def _update_status(self, test_status: AntaTestStatus) -> None: + """Update the status of the ResultManager instance based on the test status. + + Parameters + ---------- + test_status: AntaTestStatus to update the ResultManager status. + """ + if test_status == "error": + self.error_status = True + return + if self.status == "unset" or self.status == "skipped" and test_status in {"success", "failure"}: + self.status = test_status + elif self.status == "success" and test_status == "failure": + self.status = AntaTestStatus.FAILURE + + def _update_stats(self, result: TestResult) -> None: + """Update the statistics based on the test result. + + Parameters + ---------- + result: TestResult to update the statistics. + """ + result.categories = [ + " ".join(word.upper() if word.lower() in ACRONYM_CATEGORIES else word.title() for word in category.split()) for category in result.categories + ] + count_attr = f"tests_{result.result}_count" + + # Update device stats + device_stats: DeviceStats = self.device_stats[result.name] + setattr(device_stats, count_attr, getattr(device_stats, count_attr) + 1) + if result.result in ("failure", "error"): + device_stats.tests_failure.add(result.test) + device_stats.categories_failed.update(result.categories) + elif result.result == "skipped": + device_stats.categories_skipped.update(result.categories) + + # Update category stats + for category in result.categories: + category_stats: CategoryStats = self.category_stats[category] + setattr(category_stats, count_attr, getattr(category_stats, count_attr) + 1) + + # Update test stats + count_attr = f"devices_{result.result}_count" + test_stats: TestStats = self.test_stats[result.test] + setattr(test_stats, count_attr, getattr(test_stats, count_attr) + 1) + if result.result in ("failure", "error"): + test_stats.devices_failure.add(result.name) + def add(self, result: TestResult) -> None: """Add a result to the ResultManager instance. + The result is added to the internal list of results and the overall status + of the ResultManager instance is updated based on the added test status. + Parameters ---------- result: TestResult to add to the ResultManager instance. """ + self._result_entries.append(result) + self._update_status(result.result) + self._update_stats(result) - def _update_status(test_status: TestStatus) -> None: - result_validator: TypeAdapter[TestStatus] = TypeAdapter(TestStatus) - result_validator.validate_python(test_status) - if test_status == "error": - self.error_status = True - return - if self.status == "unset" or self.status == "skipped" and test_status in {"success", "failure"}: - self.status = test_status - elif self.status == "success" and test_status == "failure": - self.status = "failure" + # Every time a new result is added, we need to clear the cached property + self.__dict__.pop("results_by_status", None) - self._result_entries.append(result) - _update_status(result.result) + def get_results(self, status: set[AntaTestStatus] | None = None, sort_by: list[str] | None = None) -> list[TestResult]: + """Get the results, optionally filtered by status and sorted by TestResult fields. + + If no status is provided, all results are returned. + + Parameters + ---------- + status: Optional set of AntaTestStatus enum members to filter the results. + sort_by: Optional list of TestResult fields to sort the results. + + Returns + ------- + List of TestResult. + """ + # Return all results if no status is provided, otherwise return results for multiple statuses + results = self._result_entries if status is None else list(chain.from_iterable(self.results_by_status.get(status, []) for status in status)) + + if sort_by: + accepted_fields = TestResult.model_fields.keys() + if not set(sort_by).issubset(set(accepted_fields)): + msg = f"Invalid sort_by fields: {sort_by}. Accepted fields are: {list(accepted_fields)}" + raise ValueError(msg) + results = sorted(results, key=lambda result: [getattr(result, field) for field in sort_by]) + + return results + + def get_total_results(self, status: set[AntaTestStatus] | None = None) -> int: + """Get the total number of results, optionally filtered by status. + + If no status is provided, the total number of results is returned. + + Parameters + ---------- + status: Optional set of AntaTestStatus enum members to filter the results. + + Returns + ------- + Total number of results. + """ + if status is None: + # Return the total number of results + return sum(len(results) for results in self.results_by_status.values()) + + # Return the total number of results for multiple statuses + return sum(len(self.results_by_status.get(status, [])) for status in status) def get_status(self, *, ignore_error: bool = False) -> str: """Return the current status including error_status if ignore_error is False.""" return "error" if self.error_status and not ignore_error else self.status - def filter(self, hide: set[TestStatus]) -> ResultManager: + def filter(self, hide: set[AntaTestStatus]) -> ResultManager: """Get a filtered ResultManager based on test status. Parameters ---------- - hide: set of TestStatus literals to select tests to hide based on their status. + hide: Set of AntaTestStatus enum members to select tests to hide based on their status. Returns ------- A filtered `ResultManager`. """ + possible_statuses = set(AntaTestStatus) manager = ResultManager() - manager.results = [test for test in self._result_entries if test.result not in hide] + manager.results = self.get_results(possible_statuses - hide) return manager def filter_by_tests(self, tests: set[str]) -> ResultManager: @@ -181,7 +293,7 @@ def filter_by_devices(self, devices: set[str]) -> ResultManager: Returns ------- - A filtered `ResultManager`. + A filtered `ResultManager`. """ manager = ResultManager() manager.results = [result for result in self._result_entries if result.name in devices] @@ -192,7 +304,7 @@ def get_tests(self) -> set[str]: Returns ------- - Set of test names. + Set of test names. """ return {str(result.test) for result in self._result_entries} @@ -201,6 +313,6 @@ def get_devices(self) -> set[str]: Returns ------- - Set of device names. + Set of device names. """ return {str(result.name) for result in self._result_entries} diff --git a/anta/result_manager/models.py b/anta/result_manager/models.py index 832a84073..e94c464ef 100644 --- a/anta/result_manager/models.py +++ b/anta/result_manager/models.py @@ -5,9 +5,27 @@ from __future__ import annotations +from dataclasses import dataclass, field +from enum import Enum + from pydantic import BaseModel -from anta.custom_types import TestStatus + +class AntaTestStatus(str, Enum): + """Test status Enum for the TestResult. + + NOTE: This could be updated to StrEnum when Python 3.11 is the minimum supported version in ANTA. + """ + + UNSET = "unset" + SUCCESS = "success" + FAILURE = "failure" + ERROR = "error" + SKIPPED = "skipped" + + def __str__(self) -> str: + """Override the __str__ method to return the value of the Enum, mimicking the behavior of StrEnum.""" + return self.value class TestResult(BaseModel): @@ -15,13 +33,13 @@ class TestResult(BaseModel): Attributes ---------- - name: Device name where the test has run. - test: Test name runs on the device. - categories: List of categories the TestResult belongs to, by default the AntaTest categories. - description: TestResult description, by default the AntaTest description. - result: Result of the test. Can be one of "unset", "success", "failure", "error" or "skipped". - messages: Message to report after the test if any. - custom_field: Custom field to store a string for flexibility in integrating with ANTA + name: Name of the device where the test was run. + test: Name of the test run on the device. + categories: List of categories the TestResult belongs to. Defaults to the AntaTest categories. + description: Description of the TestResult. Defaults to the AntaTest description. + result: Result of the test. Must be one of the AntaTestStatus Enum values: unset, success, failure, error or skipped. + messages: Messages to report after the test, if any. + custom_field: Custom field to store a string for flexibility in integrating with ANTA. """ @@ -29,7 +47,7 @@ class TestResult(BaseModel): test: str categories: list[str] description: str - result: TestStatus = "unset" + result: AntaTestStatus = AntaTestStatus.UNSET messages: list[str] = [] custom_field: str | None = None @@ -41,7 +59,7 @@ def is_success(self, message: str | None = None) -> None: message: Optional message related to the test """ - self._set_status("success", message) + self._set_status(AntaTestStatus.SUCCESS, message) def is_failure(self, message: str | None = None) -> None: """Set status to failure. @@ -51,7 +69,7 @@ def is_failure(self, message: str | None = None) -> None: message: Optional message related to the test """ - self._set_status("failure", message) + self._set_status(AntaTestStatus.FAILURE, message) def is_skipped(self, message: str | None = None) -> None: """Set status to skipped. @@ -61,7 +79,7 @@ def is_skipped(self, message: str | None = None) -> None: message: Optional message related to the test """ - self._set_status("skipped", message) + self._set_status(AntaTestStatus.SKIPPED, message) def is_error(self, message: str | None = None) -> None: """Set status to error. @@ -71,9 +89,9 @@ def is_error(self, message: str | None = None) -> None: message: Optional message related to the test """ - self._set_status("error", message) + self._set_status(AntaTestStatus.ERROR, message) - def _set_status(self, status: TestStatus, message: str | None = None) -> None: + def _set_status(self, status: AntaTestStatus, message: str | None = None) -> None: """Set status and insert optional message. Parameters @@ -89,3 +107,42 @@ def _set_status(self, status: TestStatus, message: str | None = None) -> None: def __str__(self) -> str: """Return a human readable string of this TestResult.""" return f"Test '{self.test}' (on '{self.name}'): Result '{self.result}'\nMessages: {self.messages}" + + +# Pylint does not treat dataclasses differently: https://github.com/pylint-dev/pylint/issues/9058 +# pylint: disable=too-many-instance-attributes +@dataclass +class DeviceStats: + """Device statistics for a run of tests.""" + + tests_success_count: int = 0 + tests_skipped_count: int = 0 + tests_failure_count: int = 0 + tests_error_count: int = 0 + tests_unset_count: int = 0 + tests_failure: set[str] = field(default_factory=set) + categories_failed: set[str] = field(default_factory=set) + categories_skipped: set[str] = field(default_factory=set) + + +@dataclass +class CategoryStats: + """Category statistics for a run of tests.""" + + tests_success_count: int = 0 + tests_skipped_count: int = 0 + tests_failure_count: int = 0 + tests_error_count: int = 0 + tests_unset_count: int = 0 + + +@dataclass +class TestStats: + """Test statistics for a run of tests.""" + + devices_success_count: int = 0 + devices_skipped_count: int = 0 + devices_failure_count: int = 0 + devices_error_count: int = 0 + devices_unset_count: int = 0 + devices_failure: set[str] = field(default_factory=set) diff --git a/anta/tests/bfd.py b/anta/tests/bfd.py index f19e9cc92..f42d80de7 100644 --- a/anta/tests/bfd.py +++ b/anta/tests/bfd.py @@ -13,7 +13,7 @@ from pydantic import BaseModel, Field -from anta.custom_types import BfdInterval, BfdMultiplier +from anta.custom_types import BfdInterval, BfdMultiplier, BfdProtocol from anta.models import AntaCommand, AntaTest from anta.tools import get_value @@ -45,7 +45,7 @@ class VerifyBFDSpecificPeers(AntaTest): name = "VerifyBFDSpecificPeers" description = "Verifies the IPv4 BFD peer's sessions and remote disc in the specified VRF." categories: ClassVar[list[str]] = ["bfd"] - commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show bfd peers", revision=4)] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show bfd peers", revision=1)] class Input(AntaTest.Input): """Input model for the VerifyBFDSpecificPeers test.""" @@ -126,7 +126,7 @@ class VerifyBFDPeersIntervals(AntaTest): name = "VerifyBFDPeersIntervals" description = "Verifies the timers of the IPv4 BFD peers in the specified VRF." categories: ClassVar[list[str]] = ["bfd"] - commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show bfd peers detail", revision=4)] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show bfd peers detail", revision=1)] class Input(AntaTest.Input): """Input model for the VerifyBFDPeersIntervals test.""" @@ -157,34 +157,34 @@ def test(self) -> None: for bfd_peers in self.inputs.bfd_peers: peer = str(bfd_peers.peer_address) vrf = bfd_peers.vrf - - # Converting milliseconds intervals into actual value - tx_interval = bfd_peers.tx_interval * 1000 - rx_interval = bfd_peers.rx_interval * 1000 + tx_interval = bfd_peers.tx_interval + rx_interval = bfd_peers.rx_interval multiplier = bfd_peers.multiplier + + # Check if BFD peer configured bfd_output = get_value( self.instance_commands[0].json_output, f"vrfs..{vrf}..ipv4Neighbors..{peer}..peerStats..", separator="..", ) - - # Check if BFD peer configured if not bfd_output: failures[peer] = {vrf: "Not Configured"} continue + # Convert interval timer(s) into milliseconds to be consistent with the inputs. bfd_details = bfd_output.get("peerStatsDetail", {}) - intervals_ok = ( - bfd_details.get("operTxInterval") == tx_interval and bfd_details.get("operRxInterval") == rx_interval and bfd_details.get("detectMult") == multiplier - ) + op_tx_interval = bfd_details.get("operTxInterval") // 1000 + op_rx_interval = bfd_details.get("operRxInterval") // 1000 + detect_multiplier = bfd_details.get("detectMult") + intervals_ok = op_tx_interval == tx_interval and op_rx_interval == rx_interval and detect_multiplier == multiplier # Check timers of BFD peer if not intervals_ok: failures[peer] = { vrf: { - "tx_interval": bfd_details.get("operTxInterval"), - "rx_interval": bfd_details.get("operRxInterval"), - "multiplier": bfd_details.get("detectMult"), + "tx_interval": op_tx_interval, + "rx_interval": op_rx_interval, + "multiplier": detect_multiplier, } } @@ -285,3 +285,79 @@ def test(self) -> None: if up_failures: up_failures_str = "\n".join(up_failures) self.result.is_failure(f"\nFollowing BFD peers were down:\n{up_failures_str}") + + +class VerifyBFDPeersRegProtocols(AntaTest): + """Verifies that IPv4 BFD peer(s) have the specified protocol(s) registered. + + Expected Results + ---------------- + * Success: The test will pass if IPv4 BFD peers are registered with the specified protocol(s). + * Failure: The test will fail if IPv4 BFD peers are not found or the specified protocol(s) are not registered for the BFD peer(s). + + Examples + -------- + ```yaml + anta.tests.bfd: + - VerifyBFDPeersRegProtocols: + bfd_peers: + - peer_address: 192.0.255.7 + vrf: default + protocols: + - bgp + ``` + """ + + name = "VerifyBFDPeersRegProtocols" + description = "Verifies that IPv4 BFD peer(s) have the specified protocol(s) registered." + categories: ClassVar[list[str]] = ["bfd"] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show bfd peers detail", revision=1)] + + class Input(AntaTest.Input): + """Input model for the VerifyBFDPeersRegProtocols test.""" + + bfd_peers: list[BFDPeer] + """List of IPv4 BFD peers.""" + + class BFDPeer(BaseModel): + """Model for an IPv4 BFD peer.""" + + peer_address: IPv4Address + """IPv4 address of a BFD peer.""" + vrf: str = "default" + """Optional VRF for BFD peer. If not provided, it defaults to `default`.""" + protocols: list[BfdProtocol] + """List of protocols to be verified.""" + + @AntaTest.anta_test + def test(self) -> None: + """Main test function for VerifyBFDPeersRegProtocols.""" + # Initialize failure messages + failures: dict[Any, Any] = {} + + # Iterating over BFD peers, extract the parameters and command output + for bfd_peer in self.inputs.bfd_peers: + peer = str(bfd_peer.peer_address) + vrf = bfd_peer.vrf + protocols = bfd_peer.protocols + bfd_output = get_value( + self.instance_commands[0].json_output, + f"vrfs..{vrf}..ipv4Neighbors..{peer}..peerStats..", + separator="..", + ) + + # Check if BFD peer configured + if not bfd_output: + failures[peer] = {vrf: "Not Configured"} + continue + + # Check registered protocols + difference = set(protocols) - set(get_value(bfd_output, "peerStatsDetail.apps")) + + if difference: + failures[peer] = {vrf: sorted(difference)} + + if not failures: + self.result.is_success() + else: + self.result.is_failure(f"The following BFD peers are not configured or have non-registered protocol(s):\n{failures}") diff --git a/anta/tests/routing/bgp.py b/anta/tests/routing/bgp.py index a392538a9..d6e970414 100644 --- a/anta/tests/routing/bgp.py +++ b/anta/tests/routing/bgp.py @@ -685,6 +685,8 @@ def test(self) -> None: class VerifyBGPPeerMPCaps(AntaTest): """Verifies the multiprotocol capabilities of a BGP peer in a specified VRF. + Supports `strict: True` to verify that only the specified capabilities are configured, requiring an exact match. + Expected Results ---------------- * Success: The test will pass if the BGP peer's multiprotocol capabilities are advertised, received, and enabled in the specified VRF. @@ -699,6 +701,7 @@ class VerifyBGPPeerMPCaps(AntaTest): bgp_peers: - peer_address: 172.30.11.1 vrf: default + strict: False capabilities: - ipv4Unicast ``` @@ -722,6 +725,8 @@ class BgpPeer(BaseModel): """IPv4 address of a BGP peer.""" vrf: str = "default" """Optional VRF for BGP peer. If not provided, it defaults to `default`.""" + strict: bool = False + """If True, requires exact matching of provided capabilities. Defaults to False.""" capabilities: list[MultiProtocolCaps] """List of multiprotocol capabilities to be verified.""" @@ -730,14 +735,14 @@ def test(self) -> None: """Main test function for VerifyBGPPeerMPCaps.""" failures: dict[str, Any] = {"bgp_peers": {}} - # Iterate over each bgp peer + # Iterate over each bgp peer. for bgp_peer in self.inputs.bgp_peers: peer = str(bgp_peer.peer_address) vrf = bgp_peer.vrf capabilities = bgp_peer.capabilities failure: dict[str, dict[str, dict[str, Any]]] = {"bgp_peers": {peer: {vrf: {}}}} - # Check if BGP output exists + # Check if BGP output exists. if ( not (bgp_output := get_value(self.instance_commands[0].json_output, f"vrfs.{vrf}.peerList")) or (bgp_output := get_item(bgp_output, "peerAddress", peer)) is None @@ -746,8 +751,17 @@ def test(self) -> None: failures = deep_update(failures, failure) continue - # Check each capability + # Fetching the capabilities output. bgp_output = get_value(bgp_output, "neighborCapabilities.multiprotocolCaps") + + if bgp_peer.strict and sorted(capabilities) != sorted(bgp_output): + failure["bgp_peers"][peer][vrf] = { + "status": f"Expected only `{', '.join(capabilities)}` capabilities should be listed but found `{', '.join(bgp_output)}` instead." + } + failures = deep_update(failures, failure) + continue + + # Check each capability for capability in capabilities: capability_output = bgp_output.get(capability) diff --git a/anta/tests/security.py b/anta/tests/security.py index 4eb4d6415..ae5b9bebd 100644 --- a/anta/tests/security.py +++ b/anta/tests/security.py @@ -820,3 +820,37 @@ def test(self) -> None: self.result.is_failure( f"IPv4 security connection `source:{source_input} destination:{destination_input} vrf:{vrf}` for peer `{peer}` is not found." ) + + +class VerifyHardwareEntropy(AntaTest): + """ + Verifies hardware entropy generation is enabled on device. + + Expected Results + ---------------- + * Success: The test will pass if hardware entropy generation is enabled. + * Failure: The test will fail if hardware entropy generation is not enabled. + + Examples + -------- + ```yaml + anta.tests.security: + - VerifyHardwareEntropy: + ``` + """ + + name = "VerifyHardwareEntropy" + description = "Verifies hardware entropy generation is enabled on device." + categories: ClassVar[list[str]] = ["security"] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show management security")] + + @AntaTest.anta_test + def test(self) -> None: + """Main test function for VerifyHardwareEntropy.""" + command_output = self.instance_commands[0].json_output + + # Check if hardware entropy generation is enabled. + if not command_output.get("hardwareEntropyEnabled"): + self.result.is_failure("Hardware entropy generation is disabled.") + else: + self.result.is_success() diff --git a/anta/tests/snmp.py b/anta/tests/snmp.py index ac98bfd2f..c7329b6d7 100644 --- a/anta/tests/snmp.py +++ b/anta/tests/snmp.py @@ -11,6 +11,7 @@ from anta.custom_types import PositiveInteger from anta.models import AntaCommand, AntaTest +from anta.tools import get_value if TYPE_CHECKING: from anta.models import AntaTemplate @@ -183,8 +184,12 @@ class Input(AntaTest.Input): @AntaTest.anta_test def test(self) -> None: """Main test function for VerifySnmpLocation.""" - location = self.instance_commands[0].json_output["location"]["location"] + # Verifies the SNMP location is configured. + if not (location := get_value(self.instance_commands[0].json_output, "location.location")): + self.result.is_failure("SNMP location is not configured.") + return + # Verifies the expected SNMP location. if location != self.inputs.location: self.result.is_failure(f"Expected `{self.inputs.location}` as the location, but found `{location}` instead.") else: @@ -222,8 +227,12 @@ class Input(AntaTest.Input): @AntaTest.anta_test def test(self) -> None: """Main test function for VerifySnmpContact.""" - contact = self.instance_commands[0].json_output["contact"]["contact"] + # Verifies the SNMP contact is configured. + if not (contact := get_value(self.instance_commands[0].json_output, "contact.contact")): + self.result.is_failure("SNMP contact is not configured.") + return + # Verifies the expected SNMP contact. if contact != self.inputs.contact: self.result.is_failure(f"Expected `{self.inputs.contact}` as the contact, but found `{contact}` instead.") else: diff --git a/anta/tests/system.py b/anta/tests/system.py index 49d2dd25d..486e5e1ed 100644 --- a/anta/tests/system.py +++ b/anta/tests/system.py @@ -8,10 +8,14 @@ from __future__ import annotations import re +from ipaddress import IPv4Address from typing import TYPE_CHECKING, ClassVar -from anta.custom_types import PositiveInteger +from pydantic import BaseModel, Field + +from anta.custom_types import Hostname, PositiveInteger from anta.models import AntaCommand, AntaTest +from anta.tools import get_failed_logs, get_value if TYPE_CHECKING: from anta.models import AntaTemplate @@ -299,3 +303,93 @@ def test(self) -> None: else: data = command_output.split("\n")[0] self.result.is_failure(f"The device is not synchronized with the configured NTP server(s): '{data}'") + + +class VerifyNTPAssociations(AntaTest): + """Verifies the Network Time Protocol (NTP) associations. + + Expected Results + ---------------- + * Success: The test will pass if the Primary NTP server (marked as preferred) has the condition 'sys.peer' and + all other NTP servers have the condition 'candidate'. + * Failure: The test will fail if the Primary NTP server (marked as preferred) does not have the condition 'sys.peer' or + if any other NTP server does not have the condition 'candidate'. + + Examples + -------- + ```yaml + anta.tests.system: + - VerifyNTPAssociations: + ntp_servers: + - server_address: 1.1.1.1 + preferred: True + stratum: 1 + - server_address: 2.2.2.2 + stratum: 2 + - server_address: 3.3.3.3 + stratum: 2 + ``` + """ + + name = "VerifyNTPAssociations" + description = "Verifies the Network Time Protocol (NTP) associations." + categories: ClassVar[list[str]] = ["system"] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show ntp associations")] + + class Input(AntaTest.Input): + """Input model for the VerifyNTPAssociations test.""" + + ntp_servers: list[NTPServer] + """List of NTP servers.""" + + class NTPServer(BaseModel): + """Model for a NTP server.""" + + server_address: Hostname | IPv4Address + """The NTP server address as an IPv4 address or hostname. The NTP server name defined in the running configuration + of the device may change during DNS resolution, which is not handled in ANTA. Please provide the DNS-resolved server name. + For example, 'ntp.example.com' in the configuration might resolve to 'ntp3.example.com' in the device output.""" + preferred: bool = False + """Optional preferred for NTP server. If not provided, it defaults to `False`.""" + stratum: int = Field(ge=0, le=16) + """NTP stratum level (0 to 15) where 0 is the reference clock and 16 indicates unsynchronized. + Values should be between 0 and 15 for valid synchronization and 16 represents an out-of-sync state.""" + + @AntaTest.anta_test + def test(self) -> None: + """Main test function for VerifyNTPAssociations.""" + failures: str = "" + + if not (peer_details := get_value(self.instance_commands[0].json_output, "peers")): + self.result.is_failure("None of NTP peers are not configured.") + return + + # Iterate over each NTP server. + for ntp_server in self.inputs.ntp_servers: + server_address = str(ntp_server.server_address) + preferred = ntp_server.preferred + stratum = ntp_server.stratum + + # Check if NTP server details exists. + if (peer_detail := get_value(peer_details, server_address, separator="..")) is None: + failures += f"NTP peer {server_address} is not configured.\n" + continue + + # Collecting the expected NTP peer details. + expected_peer_details = {"condition": "candidate", "stratum": stratum} + if preferred: + expected_peer_details["condition"] = "sys.peer" + + # Collecting the actual NTP peer details. + actual_peer_details = {"condition": get_value(peer_detail, "condition"), "stratum": get_value(peer_detail, "stratumLevel")} + + # Collecting failures logs if any. + failure_logs = get_failed_logs(expected_peer_details, actual_peer_details) + if failure_logs: + failures += f"For NTP peer {server_address}:{failure_logs}\n" + + # Check if there are any failures. + if not failures: + self.result.is_success() + else: + self.result.is_failure(failures) diff --git a/asynceapi/aio_portcheck.py b/asynceapi/aio_portcheck.py index fd8e7aee2..79f4562fa 100644 --- a/asynceapi/aio_portcheck.py +++ b/asynceapi/aio_portcheck.py @@ -33,7 +33,7 @@ # ----------------------------------------------------------------------------- -async def port_check_url(url: URL, timeout: int = 5) -> bool: # noqa: ASYNC109 +async def port_check_url(url: URL, timeout: int = 5) -> bool: """ Open the port designated by the URL given the timeout in seconds. diff --git a/asynceapi/device.py b/asynceapi/device.py index ca206d3e4..394abe40d 100644 --- a/asynceapi/device.py +++ b/asynceapi/device.py @@ -271,10 +271,11 @@ async def jsonrpc_exec(self, jsonrpc: dict[str, Any]) -> list[dict[str, Any] | s len_data = len(cmd_data) err_at = len_data - 1 err_msg = err_data["message"] + failed_cmd = commands[err_at] raise EapiCommandError( passed=[get_output(cmd_data[cmd_i]) for cmd_i, cmd in enumerate(commands[:err_at])], - failed=commands[err_at]["cmd"], + failed=failed_cmd["cmd"] if isinstance(failed_cmd, dict) else failed_cmd, errors=cmd_data[err_at]["errors"], errmsg=err_msg, not_exec=commands[err_at + 1 :], diff --git a/docs/cli/debug.md b/docs/cli/debug.md index d290fe118..b0b8a164f 100644 --- a/docs/cli/debug.md +++ b/docs/cli/debug.md @@ -52,8 +52,6 @@ Options: ANTA_DISABLE_CACHE] -i, --inventory FILE Path to the inventory YAML file. [env var: ANTA_INVENTORY; required] - --tags TEXT List of tags using comma as separator: - tag1,tag2,tag3. [env var: ANTA_TAGS] --ofmt [json|text] EOS eAPI format to use. can be text or json -v, --version [1|latest] EOS eAPI version -r, --revision INTEGER eAPI command revision @@ -97,8 +95,9 @@ Usage: anta debug run-template [OPTIONS] PARAMS... Takes a list of arguments (keys followed by a value) to build a dictionary used as template parameters. - Example: ------- anta debug run-template -d leaf1a -t 'show vlan {vlan_id}' - vlan_id 1 + Example + ------- + anta debug run-template -d leaf1a -t 'show vlan {vlan_id}' vlan_id 1 Options: -u, --username TEXT Username to connect to EOS [env var: @@ -125,8 +124,6 @@ Options: ANTA_DISABLE_CACHE] -i, --inventory FILE Path to the inventory YAML file. [env var: ANTA_INVENTORY; required] - --tags TEXT List of tags using comma as separator: - tag1,tag2,tag3. [env var: ANTA_TAGS] --ofmt [json|text] EOS eAPI format to use. can be text or json -v, --version [1|latest] EOS eAPI version -r, --revision INTEGER eAPI command revision diff --git a/docs/cli/nrfu.md b/docs/cli/nrfu.md index 26935c2b7..579fbdeef 100644 --- a/docs/cli/nrfu.md +++ b/docs/cli/nrfu.md @@ -45,7 +45,7 @@ Options `--device` and `--test` can be used to target one or multiple devices an ### Hide results -Option `--hide` can be used to hide test results in the output based on their status. The option can be repeated. Example: `anta nrfu --hide error --hide skipped`. +Option `--hide` can be used to hide test results in the output or report file based on their status. The option can be repeated. Example: `anta nrfu --hide error --hide skipped`. ## Performing NRFU with text rendering @@ -120,7 +120,7 @@ anta nrfu --test VerifyZeroTouch table ## Performing NRFU with JSON rendering -The JSON rendering command in NRFU testing is useful in generating a JSON output that can subsequently be passed on to another tool for reporting purposes. +The JSON rendering command in NRFU testing will generate an output of all test results in JSON format. ### Command overview @@ -131,12 +131,12 @@ Usage: anta nrfu json [OPTIONS] ANTA command to check network state with JSON result. Options: - -o, --output FILE Path to save report as a file [env var: + -o, --output FILE Path to save report as a JSON file [env var: ANTA_NRFU_JSON_OUTPUT] --help Show this message and exit. ``` -The `--output` option allows you to save the JSON report as a file. +The `--output` option allows you to save the JSON report as a file. If specified, no output will be displayed in the terminal. This is useful for further processing or integration with other tools. ### Example @@ -167,6 +167,29 @@ Options: ![anta nrfu csv results](../imgs/anta_nrfu_csv.png){ loading=lazy width="1600" } +## Performing NRFU and saving results in a Markdown file + +The `md-report` command in NRFU testing generates a comprehensive Markdown report containing various sections, including detailed statistics for devices and test categories. + +### Command overview + +```bash +anta nrfu md-report --help + +Usage: anta nrfu md-report [OPTIONS] + + ANTA command to check network state with Markdown report. + +Options: + --md-output FILE Path to save the report as a Markdown file [env var: + ANTA_NRFU_MD_REPORT_MD_OUTPUT; required] + --help Show this message and exit. +``` + +### Example + +![anta nrfu md-report results](../imgs/anta-nrfu-md-report-output.png){ loading=lazy width="1600" } + ## Performing NRFU with custom reports ANTA offers a CLI option for creating custom reports. This leverages the Jinja2 template system, allowing you to tailor reports to your specific needs. diff --git a/docs/imgs/anta-nrfu-md-report-output.png b/docs/imgs/anta-nrfu-md-report-output.png new file mode 100644 index 000000000..984e76b5c Binary files /dev/null and b/docs/imgs/anta-nrfu-md-report-output.png differ diff --git a/docs/snippets/anta_nrfu_help.txt b/docs/snippets/anta_nrfu_help.txt index 365da0474..cb23fa7ed 100644 --- a/docs/snippets/anta_nrfu_help.txt +++ b/docs/snippets/anta_nrfu_help.txt @@ -53,7 +53,8 @@ Options: Commands: csv ANTA command to check network state with CSV report. - json ANTA command to check network state with JSON result. - table ANTA command to check network states with table result. - text ANTA command to check network states with text result. + json ANTA command to check network state with JSON results. + md-report ANTA command to check network state with Markdown report. + table ANTA command to check network state with table results. + text ANTA command to check network state with text results. tpl-report ANTA command to check network state with templated report. diff --git a/docs/usage-inventory-catalog.md b/docs/usage-inventory-catalog.md index fd6aec320..d8a032f26 100644 --- a/docs/usage-inventory-catalog.md +++ b/docs/usage-inventory-catalog.md @@ -309,7 +309,7 @@ Once you run `anta nrfu table`, you will see following output: ### Example script to merge catalogs -The following script reads all the files in `intended/test_catalogs/` with names `-catalog.yml` and merge them together inside one big catalog `anta-catalog.yml`. +The following script reads all the files in `intended/test_catalogs/` with names `-catalog.yml` and merge them together inside one big catalog `anta-catalog.yml` using the new `AntaCatalog.merge_catalogs()` class method. ```python #!/usr/bin/env python @@ -319,19 +319,26 @@ from pathlib import Path from anta.models import AntaTest -CATALOG_SUFFIX = '-catalog.yml' -CATALOG_DIR = 'intended/test_catalogs/' +CATALOG_SUFFIX = "-catalog.yml" +CATALOG_DIR = "intended/test_catalogs/" if __name__ == "__main__": - catalog = AntaCatalog() - for file in Path(CATALOG_DIR).glob('*'+CATALOG_SUFFIX): - c = AntaCatalog.parse(file) + catalogs = [] + for file in Path(CATALOG_DIR).glob("*" + CATALOG_SUFFIX): device = str(file).removesuffix(CATALOG_SUFFIX).removeprefix(CATALOG_DIR) - print(f"Merging test catalog for device {device}") - # Apply filters to all tests for this device - for test in c.tests: - test.inputs.filters = AntaTest.Input.Filters(tags=[device]) - catalog = catalog.merge(c) + print(f"Loading test catalog for device {device}") + catalog = AntaCatalog.parse(file) + # Add the device name as a tag to all tests in the catalog + for test in catalog.tests: + test.inputs.filters = AntaTest.Input.Filters(tags={device}) + catalogs.append(catalog) + + # Merge all catalogs + merged_catalog = AntaCatalog.merge_catalogs(catalogs) + + # Save the merged catalog to a file with open(Path('anta-catalog.yml'), "w") as f: - f.write(catalog.dump().yaml()) + f.write(merged_catalog.dump().yaml()) ``` +!!! warning + The `AntaCatalog.merge()` method is deprecated and will be removed in ANTA v2.0. Please use the `AntaCatalog.merge_catalogs()` class method instead. diff --git a/examples/tests.yaml b/examples/tests.yaml index c4248cf75..f5a5ca46b 100644 --- a/examples/tests.yaml +++ b/examples/tests.yaml @@ -83,6 +83,13 @@ anta.tests.bfd: multiplier: 3 - VerifyBFDPeersHealth: down_threshold: 2 + - VerifyBFDPeersRegProtocols: + bfd_peers: + - peer_address: 192.0.255.8 + vrf: default + protocols: + - bgp + - isis anta.tests.configuration: - VerifyZeroTouch: @@ -347,6 +354,7 @@ anta.tests.security: destination_address: 100.64.2.2 - source_address: 172.18.3.2 destination_address: 172.18.2.2 + - VerifyHardwareEntropy: anta.tests.services: - VerifyHostname: @@ -437,6 +445,15 @@ anta.tests.system: - VerifyMemoryUtilization: - VerifyFileSystemUtilization: - VerifyNTP: + - VerifyNTPAssociations: + ntp_servers: + - server_address: 1.1.1.1 + preferred: True + stratum: 1 + - server_address: 2.2.2.2 + stratum: 1 + - server_address: 3.3.3.3 + stratum: 1 anta.tests.vlan: - VerifyVlanInternalPolicy: @@ -529,6 +546,7 @@ anta.tests.routing: bgp_peers: - peer_address: 172.30.11.1 vrf: default + strict: False capabilities: - ipv4Unicast - VerifyBGPPeerASNCap: diff --git a/pyproject.toml b/pyproject.toml index e1f4a303d..60c005d60 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ description = "Arista Network Test Automation (ANTA) Framework" license = { file = "LICENSE" } dependencies = [ "aiocache>=0.12.2", - "asyncssh>=2.13.2", + "asyncssh>=2.16", "cvprac>=1.3.1", "eval-type-backport>=0.1.3", # Support newer typing features in older Python versions (required until Python 3.9 support is removed) "Jinja2>=3.1.2", @@ -69,6 +69,7 @@ dev = [ "pytest-cov>=4.1.0", "pytest-dependency", "pytest-html>=3.2.0", + "pytest-httpx>=0.30.0", "pytest-metadata>=3.0.0", "pytest>=7.4.0", "ruff>=0.5.4,<0.7.0", @@ -181,7 +182,8 @@ filterwarnings = [ [tool.coverage.run] branch = true -source = ["anta"] +# https://community.sonarsource.com/t/python-coverage-analysis-warning/62629/7 +include = ["anta/*", "asynceapi/*"] parallel = true relative_files = true diff --git a/tests/data/test_md_report.md b/tests/data/test_md_report.md new file mode 100644 index 000000000..9360dbc74 --- /dev/null +++ b/tests/data/test_md_report.md @@ -0,0 +1,79 @@ +# ANTA Report + +**Table of Contents:** + +- [ANTA Report](#anta-report) + - [Test Results Summary](#test-results-summary) + - [Summary Totals](#summary-totals) + - [Summary Totals Device Under Test](#summary-totals-device-under-test) + - [Summary Totals Per Category](#summary-totals-per-category) + - [Test Results](#test-results) + +## Test Results Summary + +### Summary Totals + +| Total Tests | Total Tests Success | Total Tests Skipped | Total Tests Failure | Total Tests Error | +| ----------- | ------------------- | ------------------- | ------------------- | ------------------| +| 30 | 7 | 2 | 19 | 2 | + +### Summary Totals Device Under Test + +| Device Under Test | Total Tests | Tests Success | Tests Skipped | Tests Failure | Tests Error | Categories Skipped | Categories Failed | +| ------------------| ----------- | ------------- | ------------- | ------------- | ----------- | -------------------| ------------------| +| DC1-SPINE1 | 15 | 2 | 2 | 10 | 1 | MLAG, VXLAN | AAA, BFD, BGP, Connectivity, Routing, SNMP, STP, Services, Software, System | +| DC1-LEAF1A | 15 | 5 | 0 | 9 | 1 | - | AAA, BFD, BGP, Connectivity, SNMP, STP, Services, Software, System | + +### Summary Totals Per Category + +| Test Category | Total Tests | Tests Success | Tests Skipped | Tests Failure | Tests Error | +| ------------- | ----------- | ------------- | ------------- | ------------- | ----------- | +| AAA | 2 | 0 | 0 | 2 | 0 | +| BFD | 2 | 0 | 0 | 2 | 0 | +| BGP | 2 | 0 | 0 | 2 | 0 | +| Connectivity | 4 | 0 | 0 | 2 | 2 | +| Interfaces | 2 | 2 | 0 | 0 | 0 | +| MLAG | 2 | 1 | 1 | 0 | 0 | +| Routing | 2 | 1 | 0 | 1 | 0 | +| SNMP | 2 | 0 | 0 | 2 | 0 | +| STP | 2 | 0 | 0 | 2 | 0 | +| Security | 2 | 2 | 0 | 0 | 0 | +| Services | 2 | 0 | 0 | 2 | 0 | +| Software | 2 | 0 | 0 | 2 | 0 | +| System | 2 | 0 | 0 | 2 | 0 | +| VXLAN | 2 | 1 | 1 | 0 | 0 | + +## Test Results + +| Device Under Test | Categories | Test | Description | Custom Field | Result | Messages | +| ----------------- | ---------- | ---- | ----------- | ------------ | ------ | -------- | +| DC1-LEAF1A | BFD | VerifyBFDSpecificPeers | Verifies the IPv4 BFD peer's sessions and remote disc in the specified VRF. | - | failure | Following BFD peers are not configured, status is not up or remote disc is zero: {'192.0.255.8': {'default': 'Not Configured'}, '192.0.255.7': {'default': 'Not Configured'}} | +| DC1-LEAF1A | BGP | VerifyBGPPeerCount | Verifies the count of BGP peers. | - | failure | Failures: [{'afi': 'ipv4', 'safi': 'unicast', 'vrfs': {'PROD': 'Expected: 2, Actual: 1'}}, {'afi': 'ipv4', 'safi': 'multicast', 'vrfs': {'DEV': 'Expected: 3, Actual: 0'}}] | +| DC1-LEAF1A | Software | VerifyEOSVersion | Verifies the EOS version of the device. | - | failure | device is running version "4.31.1F-34554157.4311F (engineering build)" not in expected versions: ['4.25.4M', '4.26.1F'] | +| DC1-LEAF1A | Services | VerifyHostname | Verifies the hostname of a device. | - | failure | Expected 's1-spine1' as the hostname, but found 'DC1-LEAF1A' instead. | +| DC1-LEAF1A | Interfaces | VerifyInterfaceUtilization | Verifies that the utilization of interfaces is below a certain threshold. | - | success | - | +| DC1-LEAF1A | Connectivity | VerifyLLDPNeighbors | Verifies that the provided LLDP neighbors are connected properly. | - | failure | Wrong LLDP neighbor(s) on port(s): Ethernet1 DC1-SPINE1_Ethernet1 Ethernet2 DC1-SPINE2_Ethernet1 Port(s) not configured: Ethernet7 | +| DC1-LEAF1A | MLAG | VerifyMlagStatus | Verifies the health status of the MLAG configuration. | - | success | - | +| DC1-LEAF1A | System | VerifyNTP | Verifies if NTP is synchronised. | - | failure | The device is not synchronized with the configured NTP server(s): 'NTP is disabled.' | +| DC1-LEAF1A | Connectivity | VerifyReachability | Test the network reachability to one or many destination IP(s). | - | error | ping vrf MGMT 1.1.1.1 source Management1 repeat 2 has failed: No source interface Management1 | +| DC1-LEAF1A | Routing | VerifyRoutingTableEntry | Verifies that the provided routes are present in the routing table of a specified VRF. | - | success | - | +| DC1-LEAF1A | STP | VerifySTPMode | Verifies the configured STP mode for a provided list of VLAN(s). | - | failure | Wrong STP mode configured for the following VLAN(s): [10, 20] | +| DC1-LEAF1A | SNMP | VerifySnmpStatus | Verifies if the SNMP agent is enabled. | - | failure | SNMP agent disabled in vrf default | +| DC1-LEAF1A | AAA | VerifyTacacsSourceIntf | Verifies TACACS source-interface for a specified VRF. | - | failure | Source-interface Management0 is not configured in VRF default | +| DC1-LEAF1A | Security | VerifyTelnetStatus | Verifies if Telnet is disabled in the default VRF. | - | success | - | +| DC1-LEAF1A | VXLAN | VerifyVxlan1Interface | Verifies the Vxlan1 interface status. | - | success | - | +| DC1-SPINE1 | BFD | VerifyBFDSpecificPeers | Verifies the IPv4 BFD peer's sessions and remote disc in the specified VRF. | - | failure | Following BFD peers are not configured, status is not up or remote disc is zero: {'192.0.255.8': {'default': 'Not Configured'}, '192.0.255.7': {'default': 'Not Configured'}} | +| DC1-SPINE1 | BGP | VerifyBGPPeerCount | Verifies the count of BGP peers. | - | failure | Failures: [{'afi': 'ipv4', 'safi': 'unicast', 'vrfs': {'PROD': 'Not Configured', 'default': 'Expected: 3, Actual: 4'}}, {'afi': 'ipv4', 'safi': 'multicast', 'vrfs': {'DEV': 'Not Configured'}}, {'afi': 'evpn', 'vrfs': {'default': 'Expected: 2, Actual: 4'}}] | +| DC1-SPINE1 | Software | VerifyEOSVersion | Verifies the EOS version of the device. | - | failure | device is running version "4.31.1F-34554157.4311F (engineering build)" not in expected versions: ['4.25.4M', '4.26.1F'] | +| DC1-SPINE1 | Services | VerifyHostname | Verifies the hostname of a device. | - | failure | Expected 's1-spine1' as the hostname, but found 'DC1-SPINE1' instead. | +| DC1-SPINE1 | Interfaces | VerifyInterfaceUtilization | Verifies that the utilization of interfaces is below a certain threshold. | - | success | - | +| DC1-SPINE1 | Connectivity | VerifyLLDPNeighbors | Verifies that the provided LLDP neighbors are connected properly. | - | failure | Wrong LLDP neighbor(s) on port(s): Ethernet1 DC1-LEAF1A_Ethernet1 Ethernet2 DC1-LEAF1B_Ethernet1 Port(s) not configured: Ethernet7 | +| DC1-SPINE1 | MLAG | VerifyMlagStatus | Verifies the health status of the MLAG configuration. | - | skipped | MLAG is disabled | +| DC1-SPINE1 | System | VerifyNTP | Verifies if NTP is synchronised. | - | failure | The device is not synchronized with the configured NTP server(s): 'NTP is disabled.' | +| DC1-SPINE1 | Connectivity | VerifyReachability | Test the network reachability to one or many destination IP(s). | - | error | ping vrf MGMT 1.1.1.1 source Management1 repeat 2 has failed: No source interface Management1 | +| DC1-SPINE1 | Routing | VerifyRoutingTableEntry | Verifies that the provided routes are present in the routing table of a specified VRF. | - | failure | The following route(s) are missing from the routing table of VRF default: ['10.1.0.2'] | +| DC1-SPINE1 | STP | VerifySTPMode | Verifies the configured STP mode for a provided list of VLAN(s). | - | failure | STP mode 'rapidPvst' not configured for the following VLAN(s): [10, 20] | +| DC1-SPINE1 | SNMP | VerifySnmpStatus | Verifies if the SNMP agent is enabled. | - | failure | SNMP agent disabled in vrf default | +| DC1-SPINE1 | AAA | VerifyTacacsSourceIntf | Verifies TACACS source-interface for a specified VRF. | - | failure | Source-interface Management0 is not configured in VRF default | +| DC1-SPINE1 | Security | VerifyTelnetStatus | Verifies if Telnet is disabled in the default VRF. | - | success | - | +| DC1-SPINE1 | VXLAN | VerifyVxlan1Interface | Verifies the Vxlan1 interface status. | - | skipped | Vxlan1 interface is not configured | diff --git a/tests/data/test_md_report_results.json b/tests/data/test_md_report_results.json new file mode 100644 index 000000000..b9ecc0c57 --- /dev/null +++ b/tests/data/test_md_report_results.json @@ -0,0 +1,378 @@ +[ + { + "name": "DC1-SPINE1", + "test": "VerifyTacacsSourceIntf", + "categories": [ + "AAA" + ], + "description": "Verifies TACACS source-interface for a specified VRF.", + "result": "failure", + "messages": [ + "Source-interface Management0 is not configured in VRF default" + ], + "custom_field": null + }, + { + "name": "DC1-SPINE1", + "test": "VerifyLLDPNeighbors", + "categories": [ + "Connectivity" + ], + "description": "Verifies that the provided LLDP neighbors are connected properly.", + "result": "failure", + "messages": [ + "Wrong LLDP neighbor(s) on port(s):\n Ethernet1\n DC1-LEAF1A_Ethernet1\n Ethernet2\n DC1-LEAF1B_Ethernet1\nPort(s) not configured:\n Ethernet7" + ], + "custom_field": null + }, + { + "name": "DC1-SPINE1", + "test": "VerifyBGPPeerCount", + "categories": [ + "BGP" + ], + "description": "Verifies the count of BGP peers.", + "result": "failure", + "messages": [ + "Failures: [{'afi': 'ipv4', 'safi': 'unicast', 'vrfs': {'PROD': 'Not Configured', 'default': 'Expected: 3, Actual: 4'}}, {'afi': 'ipv4', 'safi': 'multicast', 'vrfs': {'DEV': 'Not Configured'}}, {'afi': 'evpn', 'vrfs': {'default': 'Expected: 2, Actual: 4'}}]" + ], + "custom_field": null + }, + { + "name": "DC1-SPINE1", + "test": "VerifySTPMode", + "categories": [ + "STP" + ], + "description": "Verifies the configured STP mode for a provided list of VLAN(s).", + "result": "failure", + "messages": [ + "STP mode 'rapidPvst' not configured for the following VLAN(s): [10, 20]" + ], + "custom_field": null + }, + { + "name": "DC1-SPINE1", + "test": "VerifySnmpStatus", + "categories": [ + "SNMP" + ], + "description": "Verifies if the SNMP agent is enabled.", + "result": "failure", + "messages": [ + "SNMP agent disabled in vrf default" + ], + "custom_field": null + }, + { + "name": "DC1-SPINE1", + "test": "VerifyRoutingTableEntry", + "categories": [ + "Routing" + ], + "description": "Verifies that the provided routes are present in the routing table of a specified VRF.", + "result": "failure", + "messages": [ + "The following route(s) are missing from the routing table of VRF default: ['10.1.0.2']" + ], + "custom_field": null + }, + { + "name": "DC1-SPINE1", + "test": "VerifyInterfaceUtilization", + "categories": [ + "Interfaces" + ], + "description": "Verifies that the utilization of interfaces is below a certain threshold.", + "result": "success", + "messages": [], + "custom_field": null + }, + { + "name": "DC1-SPINE1", + "test": "VerifyMlagStatus", + "categories": [ + "MLAG" + ], + "description": "Verifies the health status of the MLAG configuration.", + "result": "skipped", + "messages": [ + "MLAG is disabled" + ], + "custom_field": null + }, + { + "name": "DC1-SPINE1", + "test": "VerifyVxlan1Interface", + "categories": [ + "VXLAN" + ], + "description": "Verifies the Vxlan1 interface status.", + "result": "skipped", + "messages": [ + "Vxlan1 interface is not configured" + ], + "custom_field": null + }, + { + "name": "DC1-SPINE1", + "test": "VerifyBFDSpecificPeers", + "categories": [ + "BFD" + ], + "description": "Verifies the IPv4 BFD peer's sessions and remote disc in the specified VRF.", + "result": "failure", + "messages": [ + "Following BFD peers are not configured, status is not up or remote disc is zero:\n{'192.0.255.8': {'default': 'Not Configured'}, '192.0.255.7': {'default': 'Not Configured'}}" + ], + "custom_field": null + }, + { + "name": "DC1-SPINE1", + "test": "VerifyNTP", + "categories": [ + "System" + ], + "description": "Verifies if NTP is synchronised.", + "result": "failure", + "messages": [ + "The device is not synchronized with the configured NTP server(s): 'NTP is disabled.'" + ], + "custom_field": null + }, + { + "name": "DC1-SPINE1", + "test": "VerifyReachability", + "categories": [ + "Connectivity" + ], + "description": "Test the network reachability to one or many destination IP(s).", + "result": "error", + "messages": [ + "ping vrf MGMT 1.1.1.1 source Management1 repeat 2 has failed: No source interface Management1" + ], + "custom_field": null + }, + { + "name": "DC1-SPINE1", + "test": "VerifyTelnetStatus", + "categories": [ + "Security" + ], + "description": "Verifies if Telnet is disabled in the default VRF.", + "result": "success", + "messages": [], + "custom_field": null + }, + { + "name": "DC1-SPINE1", + "test": "VerifyEOSVersion", + "categories": [ + "Software" + ], + "description": "Verifies the EOS version of the device.", + "result": "failure", + "messages": [ + "device is running version \"4.31.1F-34554157.4311F (engineering build)\" not in expected versions: ['4.25.4M', '4.26.1F']" + ], + "custom_field": null + }, + { + "name": "DC1-SPINE1", + "test": "VerifyHostname", + "categories": [ + "Services" + ], + "description": "Verifies the hostname of a device.", + "result": "failure", + "messages": [ + "Expected `s1-spine1` as the hostname, but found `DC1-SPINE1` instead." + ], + "custom_field": null + }, + { + "name": "DC1-LEAF1A", + "test": "VerifyTacacsSourceIntf", + "categories": [ + "AAA" + ], + "description": "Verifies TACACS source-interface for a specified VRF.", + "result": "failure", + "messages": [ + "Source-interface Management0 is not configured in VRF default" + ], + "custom_field": null + }, + { + "name": "DC1-LEAF1A", + "test": "VerifyLLDPNeighbors", + "categories": [ + "Connectivity" + ], + "description": "Verifies that the provided LLDP neighbors are connected properly.", + "result": "failure", + "messages": [ + "Wrong LLDP neighbor(s) on port(s):\n Ethernet1\n DC1-SPINE1_Ethernet1\n Ethernet2\n DC1-SPINE2_Ethernet1\nPort(s) not configured:\n Ethernet7" + ], + "custom_field": null + }, + { + "name": "DC1-LEAF1A", + "test": "VerifyBGPPeerCount", + "categories": [ + "BGP" + ], + "description": "Verifies the count of BGP peers.", + "result": "failure", + "messages": [ + "Failures: [{'afi': 'ipv4', 'safi': 'unicast', 'vrfs': {'PROD': 'Expected: 2, Actual: 1'}}, {'afi': 'ipv4', 'safi': 'multicast', 'vrfs': {'DEV': 'Expected: 3, Actual: 0'}}]" + ], + "custom_field": null + }, + { + "name": "DC1-LEAF1A", + "test": "VerifySTPMode", + "categories": [ + "STP" + ], + "description": "Verifies the configured STP mode for a provided list of VLAN(s).", + "result": "failure", + "messages": [ + "Wrong STP mode configured for the following VLAN(s): [10, 20]" + ], + "custom_field": null + }, + { + "name": "DC1-LEAF1A", + "test": "VerifySnmpStatus", + "categories": [ + "SNMP" + ], + "description": "Verifies if the SNMP agent is enabled.", + "result": "failure", + "messages": [ + "SNMP agent disabled in vrf default" + ], + "custom_field": null + }, + { + "name": "DC1-LEAF1A", + "test": "VerifyRoutingTableEntry", + "categories": [ + "Routing" + ], + "description": "Verifies that the provided routes are present in the routing table of a specified VRF.", + "result": "success", + "messages": [], + "custom_field": null + }, + { + "name": "DC1-LEAF1A", + "test": "VerifyInterfaceUtilization", + "categories": [ + "Interfaces" + ], + "description": "Verifies that the utilization of interfaces is below a certain threshold.", + "result": "success", + "messages": [], + "custom_field": null + }, + { + "name": "DC1-LEAF1A", + "test": "VerifyMlagStatus", + "categories": [ + "MLAG" + ], + "description": "Verifies the health status of the MLAG configuration.", + "result": "success", + "messages": [], + "custom_field": null + }, + { + "name": "DC1-LEAF1A", + "test": "VerifyVxlan1Interface", + "categories": [ + "VXLAN" + ], + "description": "Verifies the Vxlan1 interface status.", + "result": "success", + "messages": [], + "custom_field": null + }, + { + "name": "DC1-LEAF1A", + "test": "VerifyBFDSpecificPeers", + "categories": [ + "BFD" + ], + "description": "Verifies the IPv4 BFD peer's sessions and remote disc in the specified VRF.", + "result": "failure", + "messages": [ + "Following BFD peers are not configured, status is not up or remote disc is zero:\n{'192.0.255.8': {'default': 'Not Configured'}, '192.0.255.7': {'default': 'Not Configured'}}" + ], + "custom_field": null + }, + { + "name": "DC1-LEAF1A", + "test": "VerifyNTP", + "categories": [ + "System" + ], + "description": "Verifies if NTP is synchronised.", + "result": "failure", + "messages": [ + "The device is not synchronized with the configured NTP server(s): 'NTP is disabled.'" + ], + "custom_field": null + }, + { + "name": "DC1-LEAF1A", + "test": "VerifyReachability", + "categories": [ + "Connectivity" + ], + "description": "Test the network reachability to one or many destination IP(s).", + "result": "error", + "messages": [ + "ping vrf MGMT 1.1.1.1 source Management1 repeat 2 has failed: No source interface Management1" + ], + "custom_field": null + }, + { + "name": "DC1-LEAF1A", + "test": "VerifyTelnetStatus", + "categories": [ + "Security" + ], + "description": "Verifies if Telnet is disabled in the default VRF.", + "result": "success", + "messages": [], + "custom_field": null + }, + { + "name": "DC1-LEAF1A", + "test": "VerifyEOSVersion", + "categories": [ + "Software" + ], + "description": "Verifies the EOS version of the device.", + "result": "failure", + "messages": [ + "device is running version \"4.31.1F-34554157.4311F (engineering build)\" not in expected versions: ['4.25.4M', '4.26.1F']" + ], + "custom_field": null + }, + { + "name": "DC1-LEAF1A", + "test": "VerifyHostname", + "categories": [ + "Services" + ], + "description": "Verifies the hostname of a device.", + "result": "failure", + "messages": [ + "Expected `s1-spine1` as the hostname, but found `DC1-LEAF1A` instead." + ], + "custom_field": null + } +] diff --git a/tests/lib/fixture.py b/tests/lib/fixture.py index b0205b8bb..92210acfa 100644 --- a/tests/lib/fixture.py +++ b/tests/lib/fixture.py @@ -5,8 +5,10 @@ from __future__ import annotations +import json import logging import shutil +from pathlib import Path from typing import TYPE_CHECKING, Any, Callable from unittest.mock import patch @@ -23,12 +25,15 @@ if TYPE_CHECKING: from collections.abc import Iterator - from pathlib import Path from anta.models import AntaCommand logger = logging.getLogger(__name__) +DATA_DIR: Path = Path(__file__).parent.parent.resolve() / "data" + +JSON_RESULTS = "test_md_report_results.json" + DEVICE_HW_MODEL = "pytest" DEVICE_NAME = "pytest" COMMAND_OUTPUT = "retrieved" @@ -154,6 +159,31 @@ def _factory(number: int = 0) -> ResultManager: return _factory +@pytest.fixture +def result_manager() -> ResultManager: + """Return a ResultManager with 30 random tests loaded from a JSON file. + + Devices: DC1-SPINE1, DC1-LEAF1A + + - Total tests: 30 + - Success: 7 + - Skipped: 2 + - Failure: 19 + - Error: 2 + + See `tests/data/test_md_report_results.json` and `tests/data/test_md_report_all_tests.md` for details. + """ + manager = ResultManager() + + with (DATA_DIR / JSON_RESULTS).open("r", encoding="utf-8") as f: + results = json.load(f) + + for result in results: + manager.add(TestResult(**result)) + + return manager + + # tests.units.cli fixtures @pytest.fixture def temp_env(tmp_path: Path) -> dict[str, str | None]: diff --git a/tests/units/anta_tests/routing/test_bgp.py b/tests/units/anta_tests/routing/test_bgp.py index 47db8e60b..b76939bd5 100644 --- a/tests/units/anta_tests/routing/test_bgp.py +++ b/tests/units/anta_tests/routing/test_bgp.py @@ -2200,6 +2200,152 @@ ], }, }, + { + "name": "success-strict", + "test": VerifyBGPPeerMPCaps, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "172.30.11.1", + "neighborCapabilities": { + "multiprotocolCaps": { + "ipv4Unicast": { + "advertised": True, + "received": True, + "enabled": True, + }, + "ipv4MplsLabels": { + "advertised": True, + "received": True, + "enabled": True, + }, + } + }, + } + ] + }, + "MGMT": { + "peerList": [ + { + "peerAddress": "172.30.11.10", + "neighborCapabilities": { + "multiprotocolCaps": { + "ipv4Unicast": { + "advertised": True, + "received": True, + "enabled": True, + }, + "ipv4MplsVpn": { + "advertised": True, + "received": True, + "enabled": True, + }, + } + }, + } + ] + }, + } + } + ], + "inputs": { + "bgp_peers": [ + { + "peer_address": "172.30.11.1", + "vrf": "default", + "strict": True, + "capabilities": ["Ipv4 Unicast", "ipv4 Mpls labels"], + }, + { + "peer_address": "172.30.11.10", + "vrf": "MGMT", + "strict": True, + "capabilities": ["ipv4 Unicast", "ipv4 MplsVpn"], + }, + ] + }, + "expected": {"result": "success"}, + }, + { + "name": "failure-srict", + "test": VerifyBGPPeerMPCaps, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "172.30.11.1", + "neighborCapabilities": { + "multiprotocolCaps": { + "ipv4Unicast": { + "advertised": True, + "received": True, + "enabled": True, + }, + "ipv4MplsLabels": { + "advertised": True, + "received": True, + "enabled": True, + }, + } + }, + } + ] + }, + "MGMT": { + "peerList": [ + { + "peerAddress": "172.30.11.10", + "neighborCapabilities": { + "multiprotocolCaps": { + "ipv4Unicast": { + "advertised": True, + "received": True, + "enabled": True, + }, + "ipv4MplsVpn": { + "advertised": False, + "received": True, + "enabled": True, + }, + } + }, + } + ] + }, + } + } + ], + "inputs": { + "bgp_peers": [ + { + "peer_address": "172.30.11.1", + "vrf": "default", + "strict": True, + "capabilities": ["Ipv4 Unicast"], + }, + { + "peer_address": "172.30.11.10", + "vrf": "MGMT", + "strict": True, + "capabilities": ["ipv4MplsVpn", "L2vpnEVPN"], + }, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "Following BGP peer multiprotocol capabilities are not found or not ok:\n{'bgp_peers': {'172.30.11.1': " + "{'default': {'status': 'Expected only `ipv4Unicast` capabilities should be listed but found `ipv4Unicast, ipv4MplsLabels` instead.'}}," + " '172.30.11.10': {'MGMT': {'status': 'Expected only `ipv4MplsVpn, l2VpnEvpn` capabilities should be listed but found `ipv4Unicast, " + "ipv4MplsVpn` instead.'}}}}" + ], + }, + }, { "name": "success", "test": VerifyBGPPeerASNCap, diff --git a/tests/units/anta_tests/test_bfd.py b/tests/units/anta_tests/test_bfd.py index 54dc7a05e..3b1b8b86a 100644 --- a/tests/units/anta_tests/test_bfd.py +++ b/tests/units/anta_tests/test_bfd.py @@ -10,7 +10,7 @@ # pylint: disable=C0413 # because of the patch above -from anta.tests.bfd import VerifyBFDPeersHealth, VerifyBFDPeersIntervals, VerifyBFDSpecificPeers +from anta.tests.bfd import VerifyBFDPeersHealth, VerifyBFDPeersIntervals, VerifyBFDPeersRegProtocols, VerifyBFDSpecificPeers from tests.lib.anta import test # noqa: F401; pylint: disable=W0611 DATA: list[dict[str, Any]] = [ @@ -163,8 +163,8 @@ "result": "failure", "messages": [ "Following BFD peers are not configured or timers are not correct:\n" - "{'192.0.255.7': {'default': {'tx_interval': 1300000, 'rx_interval': 1200000, 'multiplier': 4}}, " - "'192.0.255.70': {'MGMT': {'tx_interval': 120000, 'rx_interval': 120000, 'multiplier': 5}}}" + "{'192.0.255.7': {'default': {'tx_interval': 1300, 'rx_interval': 1200, 'multiplier': 4}}, " + "'192.0.255.70': {'MGMT': {'tx_interval': 120, 'rx_interval': 120, 'multiplier': 5}}}" ], }, }, @@ -519,4 +519,133 @@ ], }, }, + { + "name": "success", + "test": VerifyBFDPeersRegProtocols, + "eos_data": [ + { + "vrfs": { + "default": { + "ipv4Neighbors": { + "192.0.255.7": { + "peerStats": { + "": { + "status": "up", + "remoteDisc": 108328132, + "peerStatsDetail": { + "role": "active", + "apps": ["ospf"], + }, + } + } + } + } + }, + "MGMT": { + "ipv4Neighbors": { + "192.0.255.70": { + "peerStats": { + "": { + "status": "up", + "remoteDisc": 108328132, + "peerStatsDetail": { + "role": "active", + "apps": ["bgp"], + }, + } + } + } + } + }, + } + } + ], + "inputs": { + "bfd_peers": [ + {"peer_address": "192.0.255.7", "vrf": "default", "protocols": ["ospf"]}, + {"peer_address": "192.0.255.70", "vrf": "MGMT", "protocols": ["bgp"]}, + ] + }, + "expected": {"result": "success"}, + }, + { + "name": "failure", + "test": VerifyBFDPeersRegProtocols, + "eos_data": [ + { + "vrfs": { + "default": { + "ipv4Neighbors": { + "192.0.255.7": { + "peerStats": { + "": { + "status": "up", + "peerStatsDetail": { + "role": "active", + "apps": ["ospf"], + }, + } + } + } + } + }, + "MGMT": { + "ipv4Neighbors": { + "192.0.255.70": { + "peerStats": { + "": { + "status": "up", + "remoteDisc": 0, + "peerStatsDetail": { + "role": "active", + "apps": ["bgp"], + }, + } + } + } + } + }, + } + } + ], + "inputs": { + "bfd_peers": [ + {"peer_address": "192.0.255.7", "vrf": "default", "protocols": ["isis"]}, + {"peer_address": "192.0.255.70", "vrf": "MGMT", "protocols": ["isis"]}, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "The following BFD peers are not configured or have non-registered protocol(s):\n" + "{'192.0.255.7': {'default': ['isis']}, " + "'192.0.255.70': {'MGMT': ['isis']}}" + ], + }, + }, + { + "name": "failure-not-found", + "test": VerifyBFDPeersRegProtocols, + "eos_data": [ + { + "vrfs": { + "default": {}, + "MGMT": {}, + } + } + ], + "inputs": { + "bfd_peers": [ + {"peer_address": "192.0.255.7", "vrf": "default", "protocols": ["isis"]}, + {"peer_address": "192.0.255.70", "vrf": "MGMT", "protocols": ["isis"]}, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "The following BFD peers are not configured or have non-registered protocol(s):\n" + "{'192.0.255.7': {'default': 'Not Configured'}, '192.0.255.70': {'MGMT': 'Not Configured'}}" + ], + }, + }, ] diff --git a/tests/units/anta_tests/test_security.py b/tests/units/anta_tests/test_security.py index 3a732bdaa..eabc40bd8 100644 --- a/tests/units/anta_tests/test_security.py +++ b/tests/units/anta_tests/test_security.py @@ -15,6 +15,7 @@ VerifyAPISSLCertificate, VerifyBannerLogin, VerifyBannerMotd, + VerifyHardwareEntropy, VerifyIPSecConnHealth, VerifyIPv4ACL, VerifySpecificIPSecConn, @@ -1213,4 +1214,18 @@ ], }, }, + { + "name": "success", + "test": VerifyHardwareEntropy, + "eos_data": [{"cpuModel": "2.20GHz", "cryptoModule": "Crypto Module v3.0", "hardwareEntropyEnabled": True, "blockedNetworkProtocols": []}], + "inputs": {}, + "expected": {"result": "success"}, + }, + { + "name": "failure", + "test": VerifyHardwareEntropy, + "eos_data": [{"cpuModel": "2.20GHz", "cryptoModule": "Crypto Module v3.0", "hardwareEntropyEnabled": False, "blockedNetworkProtocols": []}], + "inputs": {}, + "expected": {"result": "failure", "messages": ["Hardware entropy generation is disabled."]}, + }, ] diff --git a/tests/units/anta_tests/test_snmp.py b/tests/units/anta_tests/test_snmp.py index b4d31521e..64c44382e 100644 --- a/tests/units/anta_tests/test_snmp.py +++ b/tests/units/anta_tests/test_snmp.py @@ -99,6 +99,20 @@ "messages": ["Expected `New York` as the location, but found `Europe` instead."], }, }, + { + "name": "failure-details-not-configured", + "test": VerifySnmpLocation, + "eos_data": [ + { + "location": {"location": ""}, + } + ], + "inputs": {"location": "New York"}, + "expected": { + "result": "failure", + "messages": ["SNMP location is not configured."], + }, + }, { "name": "success", "test": VerifySnmpContact, @@ -124,4 +138,18 @@ "messages": ["Expected `Bob@example.com` as the contact, but found `Jon@example.com` instead."], }, }, + { + "name": "failure-details-not-configured", + "test": VerifySnmpContact, + "eos_data": [ + { + "contact": {"contact": ""}, + } + ], + "inputs": {"contact": "Bob@example.com"}, + "expected": { + "result": "failure", + "messages": ["SNMP contact is not configured."], + }, + }, ] diff --git a/tests/units/anta_tests/test_system.py b/tests/units/anta_tests/test_system.py index 6965461d6..54849b734 100644 --- a/tests/units/anta_tests/test_system.py +++ b/tests/units/anta_tests/test_system.py @@ -14,6 +14,7 @@ VerifyFileSystemUtilization, VerifyMemoryUtilization, VerifyNTP, + VerifyNTPAssociations, VerifyReloadCause, VerifyUptime, ) @@ -286,4 +287,186 @@ "inputs": None, "expected": {"result": "failure", "messages": ["The device is not synchronized with the configured NTP server(s): 'unsynchronised'"]}, }, + { + "name": "success", + "test": VerifyNTPAssociations, + "eos_data": [ + { + "peers": { + "1.1.1.1": { + "condition": "sys.peer", + "peerIpAddr": "1.1.1.1", + "stratumLevel": 1, + }, + "2.2.2.2": { + "condition": "candidate", + "peerIpAddr": "2.2.2.2", + "stratumLevel": 2, + }, + "3.3.3.3": { + "condition": "candidate", + "peerIpAddr": "3.3.3.3", + "stratumLevel": 2, + }, + } + } + ], + "inputs": { + "ntp_servers": [ + {"server_address": "1.1.1.1", "preferred": True, "stratum": 1}, + {"server_address": "2.2.2.2", "stratum": 2}, + {"server_address": "3.3.3.3", "stratum": 2}, + ] + }, + "expected": {"result": "success"}, + }, + { + "name": "success-pool-name", + "test": VerifyNTPAssociations, + "eos_data": [ + { + "peers": { + "1.ntp.networks.com": { + "condition": "sys.peer", + "peerIpAddr": "1.1.1.1", + "stratumLevel": 1, + }, + "2.ntp.networks.com": { + "condition": "candidate", + "peerIpAddr": "2.2.2.2", + "stratumLevel": 2, + }, + "3.ntp.networks.com": { + "condition": "candidate", + "peerIpAddr": "3.3.3.3", + "stratumLevel": 2, + }, + } + } + ], + "inputs": { + "ntp_servers": [ + {"server_address": "1.ntp.networks.com", "preferred": True, "stratum": 1}, + {"server_address": "2.ntp.networks.com", "stratum": 2}, + {"server_address": "3.ntp.networks.com", "stratum": 2}, + ] + }, + "expected": {"result": "success"}, + }, + { + "name": "failure", + "test": VerifyNTPAssociations, + "eos_data": [ + { + "peers": { + "1.1.1.1": { + "condition": "candidate", + "peerIpAddr": "1.1.1.1", + "stratumLevel": 2, + }, + "2.2.2.2": { + "condition": "sys.peer", + "peerIpAddr": "2.2.2.2", + "stratumLevel": 2, + }, + "3.3.3.3": { + "condition": "sys.peer", + "peerIpAddr": "3.3.3.3", + "stratumLevel": 3, + }, + } + } + ], + "inputs": { + "ntp_servers": [ + {"server_address": "1.1.1.1", "preferred": True, "stratum": 1}, + {"server_address": "2.2.2.2", "stratum": 2}, + {"server_address": "3.3.3.3", "stratum": 2}, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "For NTP peer 1.1.1.1:\nExpected `sys.peer` as the condition, but found `candidate` instead.\nExpected `1` as the stratum, but found `2` instead.\n" + "For NTP peer 2.2.2.2:\nExpected `candidate` as the condition, but found `sys.peer` instead.\n" + "For NTP peer 3.3.3.3:\nExpected `candidate` as the condition, but found `sys.peer` instead.\nExpected `2` as the stratum, but found `3` instead." + ], + }, + }, + { + "name": "failure-no-peers", + "test": VerifyNTPAssociations, + "eos_data": [{"peers": {}}], + "inputs": { + "ntp_servers": [ + {"server_address": "1.1.1.1", "preferred": True, "stratum": 1}, + {"server_address": "2.2.2.2", "stratum": 1}, + {"server_address": "3.3.3.3", "stratum": 1}, + ] + }, + "expected": { + "result": "failure", + "messages": ["None of NTP peers are not configured."], + }, + }, + { + "name": "failure-one-peer-not-found", + "test": VerifyNTPAssociations, + "eos_data": [ + { + "peers": { + "1.1.1.1": { + "condition": "sys.peer", + "peerIpAddr": "1.1.1.1", + "stratumLevel": 1, + }, + "2.2.2.2": { + "condition": "candidate", + "peerIpAddr": "2.2.2.2", + "stratumLevel": 1, + }, + } + } + ], + "inputs": { + "ntp_servers": [ + {"server_address": "1.1.1.1", "preferred": True, "stratum": 1}, + {"server_address": "2.2.2.2", "stratum": 1}, + {"server_address": "3.3.3.3", "stratum": 1}, + ] + }, + "expected": { + "result": "failure", + "messages": ["NTP peer 3.3.3.3 is not configured."], + }, + }, + { + "name": "failure-with-two-peers-not-found", + "test": VerifyNTPAssociations, + "eos_data": [ + { + "peers": { + "1.1.1.1": { + "condition": "candidate", + "peerIpAddr": "1.1.1.1", + "stratumLevel": 1, + } + } + } + ], + "inputs": { + "ntp_servers": [ + {"server_address": "1.1.1.1", "preferred": True, "stratum": 1}, + {"server_address": "2.2.2.2", "stratum": 1}, + {"server_address": "3.3.3.3", "stratum": 1}, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "For NTP peer 1.1.1.1:\nExpected `sys.peer` as the condition, but found `candidate` instead.\n" + "NTP peer 2.2.2.2 is not configured.\nNTP peer 3.3.3.3 is not configured." + ], + }, + }, ] diff --git a/tests/units/asynceapi/__init__.py b/tests/units/asynceapi/__init__.py new file mode 100644 index 000000000..d4282a31b --- /dev/null +++ b/tests/units/asynceapi/__init__.py @@ -0,0 +1,4 @@ +# Copyright (c) 2023-2024 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the LICENSE file. +"""Unit tests for the asynceapi client package used by ANTA.""" diff --git a/tests/units/asynceapi/conftest.py b/tests/units/asynceapi/conftest.py new file mode 100644 index 000000000..812d5b9cd --- /dev/null +++ b/tests/units/asynceapi/conftest.py @@ -0,0 +1,20 @@ +# Copyright (c) 2023-2024 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the LICENSE file. +"""Fixtures for the asynceapi client package.""" + +import pytest + +from asynceapi import Device + + +@pytest.fixture +def asynceapi_device() -> Device: + """Return an asynceapi Device instance.""" + return Device( + host="localhost", + username="admin", + password="admin", + proto="https", + port=443, + ) diff --git a/tests/units/asynceapi/test_data.py b/tests/units/asynceapi/test_data.py new file mode 100644 index 000000000..908d6084b --- /dev/null +++ b/tests/units/asynceapi/test_data.py @@ -0,0 +1,88 @@ +# Copyright (c) 2023-2024 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the LICENSE file. +"""Unit tests data for the asynceapi client package.""" + +SUCCESS_EAPI_RESPONSE = { + "jsonrpc": "2.0", + "id": "EapiExplorer-1", + "result": [ + { + "mfgName": "Arista", + "modelName": "cEOSLab", + "hardwareRevision": "", + "serialNumber": "5E9D49D20F09DA471333DD835835FD1A", + "systemMacAddress": "00:1c:73:2e:7b:a3", + "hwMacAddress": "00:00:00:00:00:00", + "configMacAddress": "00:00:00:00:00:00", + "version": "4.31.1F-34554157.4311F (engineering build)", + "architecture": "i686", + "internalVersion": "4.31.1F-34554157.4311F", + "internalBuildId": "47114ca4-ae9f-4f32-8c1f-2864db93b7e8", + "imageFormatVersion": "1.0", + "imageOptimization": "None", + "cEosToolsVersion": "(unknown)", + "kernelVersion": "6.5.0-44-generic", + "bootupTimestamp": 1723429239.9352903, + "uptime": 1300202.749528885, + "memTotal": 65832112, + "memFree": 41610316, + "isIntlVersion": False, + }, + { + "utcTime": 1724729442.6863558, + "timezone": "EST", + "localTime": { + "year": 2024, + "month": 8, + "dayOfMonth": 26, + "hour": 22, + "min": 30, + "sec": 42, + "dayOfWeek": 0, + "dayOfYear": 239, + "daylightSavingsAdjust": 0, + }, + "clockSource": {"local": True}, + }, + ], +} +"""Successful eAPI JSON response.""" + +ERROR_EAPI_RESPONSE = { + "jsonrpc": "2.0", + "id": "EapiExplorer-1", + "error": { + "code": 1002, + "message": "CLI command 2 of 3 'bad command' failed: invalid command", + "data": [ + { + "mfgName": "Arista", + "modelName": "cEOSLab", + "hardwareRevision": "", + "serialNumber": "5E9D49D20F09DA471333DD835835FD1A", + "systemMacAddress": "00:1c:73:2e:7b:a3", + "hwMacAddress": "00:00:00:00:00:00", + "configMacAddress": "00:00:00:00:00:00", + "version": "4.31.1F-34554157.4311F (engineering build)", + "architecture": "i686", + "internalVersion": "4.31.1F-34554157.4311F", + "internalBuildId": "47114ca4-ae9f-4f32-8c1f-2864db93b7e8", + "imageFormatVersion": "1.0", + "imageOptimization": "None", + "cEosToolsVersion": "(unknown)", + "kernelVersion": "6.5.0-44-generic", + "bootupTimestamp": 1723429239.9352903, + "uptime": 1300027.2297976017, + "memTotal": 65832112, + "memFree": 41595080, + "isIntlVersion": False, + }, + {"errors": ["Invalid input (at token 1: 'bad')"]}, + ], + }, +} +"""Error eAPI JSON response.""" + +JSONRPC_REQUEST_TEMPLATE = {"jsonrpc": "2.0", "method": "runCmds", "params": {"version": 1, "cmds": [], "format": "json"}, "id": "EapiExplorer-1"} +"""Template for JSON-RPC eAPI request. `cmds` must be filled by the parametrize decorator.""" diff --git a/tests/units/asynceapi/test_device.py b/tests/units/asynceapi/test_device.py new file mode 100644 index 000000000..8a140ee3b --- /dev/null +++ b/tests/units/asynceapi/test_device.py @@ -0,0 +1,88 @@ +# Copyright (c) 2023-2024 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the LICENSE file. +"""Unit tests the asynceapi.device module.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +import pytest +from httpx import HTTPStatusError + +from asynceapi import Device, EapiCommandError + +from .test_data import ERROR_EAPI_RESPONSE, JSONRPC_REQUEST_TEMPLATE, SUCCESS_EAPI_RESPONSE + +if TYPE_CHECKING: + from pytest_httpx import HTTPXMock + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "cmds", + [ + (["show version", "show clock"]), + ([{"cmd": "show version"}, {"cmd": "show clock"}]), + ([{"cmd": "show version"}, "show clock"]), + ], + ids=["simple_commands", "complex_commands", "mixed_commands"], +) +async def test_jsonrpc_exec_success( + asynceapi_device: Device, + httpx_mock: HTTPXMock, + cmds: list[str | dict[str, Any]], +) -> None: + """Test the Device.jsonrpc_exec method with a successful response. Simple and complex commands are tested.""" + jsonrpc_request: dict[str, Any] = JSONRPC_REQUEST_TEMPLATE.copy() + jsonrpc_request["params"]["cmds"] = cmds + + httpx_mock.add_response(json=SUCCESS_EAPI_RESPONSE) + + result = await asynceapi_device.jsonrpc_exec(jsonrpc=jsonrpc_request) + + assert result == SUCCESS_EAPI_RESPONSE["result"] + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "cmds", + [ + (["show version", "bad command", "show clock"]), + ([{"cmd": "show version"}, {"cmd": "bad command"}, {"cmd": "show clock"}]), + ([{"cmd": "show version"}, {"cmd": "bad command"}, "show clock"]), + ], + ids=["simple_commands", "complex_commands", "mixed_commands"], +) +async def test_jsonrpc_exec_eapi_command_error( + asynceapi_device: Device, + httpx_mock: HTTPXMock, + cmds: list[str | dict[str, Any]], +) -> None: + """Test the Device.jsonrpc_exec method with an error response. Simple and complex commands are tested.""" + jsonrpc_request: dict[str, Any] = JSONRPC_REQUEST_TEMPLATE.copy() + jsonrpc_request["params"]["cmds"] = cmds + + error_eapi_response: dict[str, Any] = ERROR_EAPI_RESPONSE.copy() + httpx_mock.add_response(json=error_eapi_response) + + with pytest.raises(EapiCommandError) as exc_info: + await asynceapi_device.jsonrpc_exec(jsonrpc=jsonrpc_request) + + assert exc_info.value.passed == [error_eapi_response["error"]["data"][0]] + assert exc_info.value.failed == "bad command" + assert exc_info.value.errors == ["Invalid input (at token 1: 'bad')"] + assert exc_info.value.errmsg == "CLI command 2 of 3 'bad command' failed: invalid command" + assert exc_info.value.not_exec == [jsonrpc_request["params"]["cmds"][2]] + + +@pytest.mark.asyncio +async def test_jsonrpc_exec_http_status_error(asynceapi_device: Device, httpx_mock: HTTPXMock) -> None: + """Test the Device.jsonrpc_exec method with an HTTPStatusError.""" + jsonrpc_request: dict[str, Any] = JSONRPC_REQUEST_TEMPLATE.copy() + jsonrpc_request["params"]["cmds"] = ["show version"] + + httpx_mock.add_response(status_code=500, text="Internal Server Error") + + with pytest.raises(HTTPStatusError): + await asynceapi_device.jsonrpc_exec(jsonrpc=jsonrpc_request) diff --git a/tests/units/cli/nrfu/test__init__.py b/tests/units/cli/nrfu/test__init__.py index 83369f344..7227a699f 100644 --- a/tests/units/cli/nrfu/test__init__.py +++ b/tests/units/cli/nrfu/test__init__.py @@ -120,3 +120,9 @@ def test_disable_cache(click_runner: CliRunner) -> None: if "disable_cache" in line: assert "True" in line assert result.exit_code == ExitCode.OK + + +def test_hide(click_runner: CliRunner) -> None: + """Test the `--hide` option of the `anta nrfu` command.""" + result = click_runner.invoke(anta, ["nrfu", "--hide", "success", "text"]) + assert "SUCCESS" not in result.output diff --git a/tests/units/cli/nrfu/test_commands.py b/tests/units/cli/nrfu/test_commands.py index 8ad7745f4..27f01a78c 100644 --- a/tests/units/cli/nrfu/test_commands.py +++ b/tests/units/cli/nrfu/test_commands.py @@ -8,7 +8,7 @@ import json import re from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from unittest.mock import patch from anta.cli import anta @@ -90,6 +90,43 @@ def test_anta_nrfu_json(click_runner: CliRunner) -> None: assert res["result"] == "success" +def test_anta_nrfu_json_output(click_runner: CliRunner, tmp_path: Path) -> None: + """Test anta nrfu json with output file.""" + json_output = tmp_path / "test.json" + result = click_runner.invoke(anta, ["nrfu", "json", "--output", str(json_output)]) + + # Making sure the output is not printed to stdout + match = re.search(r"\[\n {2}{[\s\S]+ {2}}\n\]", result.output) + assert match is None + + assert result.exit_code == ExitCode.OK + assert "JSON results saved to" in result.output + assert json_output.exists() + + +def test_anta_nrfu_json_output_failure(click_runner: CliRunner, tmp_path: Path) -> None: + """Test anta nrfu json with output file.""" + json_output = tmp_path / "test.json" + + original_open = Path.open + + def mock_path_open(*args: Any, **kwargs: Any) -> Path: # noqa: ANN401 + """Mock Path.open only for the json_output file of this test.""" + if args[0] == json_output: + msg = "Simulated OSError" + raise OSError(msg) + + # If not the json_output file, call the original Path.open + return original_open(*args, **kwargs) + + with patch("pathlib.Path.open", mock_path_open): + result = click_runner.invoke(anta, ["nrfu", "json", "--output", str(json_output)]) + + assert result.exit_code == ExitCode.USAGE_ERROR + assert "Failed to save JSON results to" in result.output + assert not json_output.exists() + + def test_anta_nrfu_template(click_runner: CliRunner) -> None: """Test anta nrfu, catalog is given via env.""" result = click_runner.invoke(anta, ["nrfu", "tpl-report", "--template", str(DATA_DIR / "template.j2")]) @@ -114,3 +151,47 @@ def test_anta_nrfu_csv_failure(click_runner: CliRunner, tmp_path: Path) -> None: assert result.exit_code == ExitCode.USAGE_ERROR assert "Failed to save CSV report to" in result.output assert not csv_output.exists() + + +def test_anta_nrfu_md_report(click_runner: CliRunner, tmp_path: Path) -> None: + """Test anta nrfu md-report.""" + md_output = tmp_path / "test.md" + result = click_runner.invoke(anta, ["nrfu", "md-report", "--md-output", str(md_output)]) + assert result.exit_code == ExitCode.OK + assert "Markdown report saved to" in result.output + assert md_output.exists() + + +def test_anta_nrfu_md_report_failure(click_runner: CliRunner, tmp_path: Path) -> None: + """Test anta nrfu md-report failure.""" + md_output = tmp_path / "test.md" + with patch("anta.reporter.md_reporter.MDReportGenerator.generate", side_effect=OSError()): + result = click_runner.invoke(anta, ["nrfu", "md-report", "--md-output", str(md_output)]) + + assert result.exit_code == ExitCode.USAGE_ERROR + assert "Failed to save Markdown report to" in result.output + assert not md_output.exists() + + +def test_anta_nrfu_md_report_with_hide(click_runner: CliRunner, tmp_path: Path) -> None: + """Test anta nrfu md-report with the `--hide` option.""" + md_output = tmp_path / "test.md" + result = click_runner.invoke(anta, ["nrfu", "--hide", "success", "md-report", "--md-output", str(md_output)]) + + assert result.exit_code == ExitCode.OK + assert "Markdown report saved to" in result.output + assert md_output.exists() + + with md_output.open("r", encoding="utf-8") as f: + content = f.read() + + # Use regex to find the "Total Tests Success" value + match = re.search(r"\| (\d+) \| (\d+) \| \d+ \| \d+ \| \d+ \|", content) + + assert match is not None + + total_tests = int(match.group(1)) + total_tests_success = int(match.group(2)) + + assert total_tests == 0 + assert total_tests_success == 0 diff --git a/tests/units/reporter/test__init__.py b/tests/units/reporter/test__init__.py index 2fc62ce92..f0e44b41a 100644 --- a/tests/units/reporter/test__init__.py +++ b/tests/units/reporter/test__init__.py @@ -13,9 +13,9 @@ from anta import RICH_COLOR_PALETTE from anta.reporter import ReportJinja, ReportTable +from anta.result_manager.models import AntaTestStatus if TYPE_CHECKING: - from anta.custom_types import TestStatus from anta.result_manager import ResultManager @@ -73,15 +73,14 @@ def test__build_headers(self, headers: list[str]) -> None: @pytest.mark.parametrize( ("status", "expected_status"), [ - pytest.param("unknown", "unknown", id="unknown status"), - pytest.param("unset", "[grey74]unset", id="unset status"), - pytest.param("skipped", "[bold orange4]skipped", id="skipped status"), - pytest.param("failure", "[bold red]failure", id="failure status"), - pytest.param("error", "[indian_red]error", id="error status"), - pytest.param("success", "[green4]success", id="success status"), + pytest.param(AntaTestStatus.UNSET, "[grey74]unset", id="unset status"), + pytest.param(AntaTestStatus.SKIPPED, "[bold orange4]skipped", id="skipped status"), + pytest.param(AntaTestStatus.FAILURE, "[bold red]failure", id="failure status"), + pytest.param(AntaTestStatus.ERROR, "[indian_red]error", id="error status"), + pytest.param(AntaTestStatus.SUCCESS, "[green4]success", id="success status"), ], ) - def test__color_result(self, status: TestStatus, expected_status: str) -> None: + def test__color_result(self, status: AntaTestStatus, expected_status: str) -> None: """Test _build_headers.""" # pylint: disable=protected-access report = ReportTable() @@ -140,7 +139,7 @@ def test_report_summary_tests( new_results = [result.model_copy() for result in manager.results] for result in new_results: result.name = "test_device" - result.result = "failure" + result.result = AntaTestStatus.FAILURE report = ReportTable() kwargs = {"tests": [test] if test is not None else None, "title": title} @@ -175,7 +174,7 @@ def test_report_summary_devices( new_results = [result.model_copy() for result in manager.results] for result in new_results: result.name = dev or "test_device" - result.result = "failure" + result.result = AntaTestStatus.FAILURE manager.results = new_results report = ReportTable() diff --git a/tests/units/reporter/test_md_reporter.py b/tests/units/reporter/test_md_reporter.py new file mode 100644 index 000000000..a60773374 --- /dev/null +++ b/tests/units/reporter/test_md_reporter.py @@ -0,0 +1,54 @@ +# Copyright (c) 2023-2024 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the LICENSE file. +"""Test anta.reporter.md_reporter.py.""" + +from __future__ import annotations + +from io import StringIO +from pathlib import Path + +import pytest + +from anta.reporter.md_reporter import MDReportBase, MDReportGenerator +from anta.result_manager import ResultManager + +DATA_DIR: Path = Path(__file__).parent.parent.parent.resolve() / "data" + + +def test_md_report_generate(tmp_path: Path, result_manager: ResultManager) -> None: + """Test the MDReportGenerator class.""" + md_filename = tmp_path / "test.md" + expected_report = "test_md_report.md" + + # Generate the Markdown report + MDReportGenerator.generate(result_manager, md_filename) + assert md_filename.exists() + + # Load the existing Markdown report to compare with the generated one + with (DATA_DIR / expected_report).open("r", encoding="utf-8") as f: + expected_content = f.read() + + # Check the content of the Markdown file + content = md_filename.read_text(encoding="utf-8") + + assert content == expected_content + + +def test_md_report_base() -> None: + """Test the MDReportBase class.""" + + class FakeMDReportBase(MDReportBase): + """Fake MDReportBase class.""" + + def generate_section(self) -> None: + pass + + results = ResultManager() + + with StringIO() as mock_file: + report = FakeMDReportBase(mock_file, results) + assert report.generate_heading_name() == "Fake MD Report Base" + + with pytest.raises(NotImplementedError, match="Subclasses should implement this method"): + report.generate_rows() diff --git a/tests/units/result_manager/test__init__.py b/tests/units/result_manager/test__init__.py index 02c694c05..802d4a4e3 100644 --- a/tests/units/result_manager/test__init__.py +++ b/tests/units/result_manager/test__init__.py @@ -6,15 +6,16 @@ from __future__ import annotations import json +import re from contextlib import AbstractContextManager, nullcontext from typing import TYPE_CHECKING, Callable import pytest from anta.result_manager import ResultManager, models +from anta.result_manager.models import AntaTestStatus if TYPE_CHECKING: - from anta.custom_types import TestStatus from anta.result_manager.models import TestResult @@ -55,7 +56,7 @@ def test_json(self, list_result_factory: Callable[[int], list[TestResult]]) -> N success_list = list_result_factory(3) for test in success_list: - test.result = "success" + test.result = AntaTestStatus.SUCCESS result_manager.results = success_list json_res = result_manager.json @@ -71,6 +72,27 @@ def test_json(self, list_result_factory: Callable[[int], list[TestResult]]) -> N assert test.get("custom_field") is None assert test.get("result") == "success" + def test_sorted_category_stats(self, list_result_factory: Callable[[int], list[TestResult]]) -> None: + """Test ResultManager.sorted_category_stats.""" + result_manager = ResultManager() + results = list_result_factory(4) + + # Modify the categories to have a mix of different acronym categories + results[0].categories = ["ospf"] + results[1].categories = ["bgp"] + results[2].categories = ["vxlan"] + results[3].categories = ["system"] + + result_manager.results = results + + # Check the current categories order and name format + expected_order = ["OSPF", "BGP", "VXLAN", "System"] + assert list(result_manager.category_stats.keys()) == expected_order + + # Check the sorted categories order and name format + expected_order = ["BGP", "OSPF", "System", "VXLAN"] + assert list(result_manager.sorted_category_stats.keys()) == expected_order + @pytest.mark.parametrize( ("starting_status", "test_status", "expected_status", "expected_raise"), [ @@ -119,29 +141,27 @@ def test_json(self, list_result_factory: Callable[[int], list[TestResult]]) -> N nullcontext(), id="failure, add success", ), - pytest.param( - "unset", "unknown", None, pytest.raises(ValueError, match="Input should be 'unset', 'success', 'failure', 'error' or 'skipped'"), id="wrong status" - ), + pytest.param("unset", "unknown", None, pytest.raises(ValueError, match="'unknown' is not a valid AntaTestStatus"), id="wrong status"), ], ) def test_add( self, test_result_factory: Callable[[], TestResult], - starting_status: TestStatus, - test_status: TestStatus, + starting_status: str, + test_status: str, expected_status: str, expected_raise: AbstractContextManager[Exception], ) -> None: # pylint: disable=too-many-arguments """Test ResultManager_update_status.""" result_manager = ResultManager() - result_manager.status = starting_status + result_manager.status = AntaTestStatus(starting_status) assert result_manager.error_status is False assert len(result_manager) == 0 test = test_result_factory() - test.result = test_status with expected_raise: + test.result = AntaTestStatus(test_status) result_manager.add(test) if test_status == "error": assert result_manager.error_status is True @@ -149,6 +169,91 @@ def test_add( assert result_manager.status == expected_status assert len(result_manager) == 1 + def test_add_clear_cache(self, result_manager: ResultManager, test_result_factory: Callable[[], TestResult]) -> None: + """Test ResultManager.add and make sure the cache is reset after adding a new test.""" + # Check the cache is empty + assert "results_by_status" not in result_manager.__dict__ + + # Access the cache + assert result_manager.get_total_results() == 30 + + # Check the cache is filled with the correct results count + assert "results_by_status" in result_manager.__dict__ + assert sum(len(v) for v in result_manager.__dict__["results_by_status"].values()) == 30 + + # Add a new test + result_manager.add(result=test_result_factory()) + + # Check the cache has been reset + assert "results_by_status" not in result_manager.__dict__ + + # Access the cache again + assert result_manager.get_total_results() == 31 + + # Check the cache is filled again with the correct results count + assert "results_by_status" in result_manager.__dict__ + assert sum(len(v) for v in result_manager.__dict__["results_by_status"].values()) == 31 + + def test_get_results(self, result_manager: ResultManager) -> None: + """Test ResultManager.get_results.""" + # Check for single status + success_results = result_manager.get_results(status={AntaTestStatus.SUCCESS}) + assert len(success_results) == 7 + assert all(r.result == "success" for r in success_results) + + # Check for multiple statuses + failure_results = result_manager.get_results(status={AntaTestStatus.FAILURE, AntaTestStatus.ERROR}) + assert len(failure_results) == 21 + assert all(r.result in {"failure", "error"} for r in failure_results) + + # Check all results + all_results = result_manager.get_results() + assert len(all_results) == 30 + + def test_get_results_sort_by(self, result_manager: ResultManager) -> None: + """Test ResultManager.get_results with sort_by.""" + # Check all results with sort_by result + all_results = result_manager.get_results(sort_by=["result"]) + assert len(all_results) == 30 + assert [r.result for r in all_results] == ["error"] * 2 + ["failure"] * 19 + ["skipped"] * 2 + ["success"] * 7 + + # Check all results with sort_by device (name) + all_results = result_manager.get_results(sort_by=["name"]) + assert len(all_results) == 30 + assert all_results[0].name == "DC1-LEAF1A" + assert all_results[-1].name == "DC1-SPINE1" + + # Check multiple statuses with sort_by categories + success_skipped_results = result_manager.get_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.SKIPPED}, sort_by=["categories"]) + assert len(success_skipped_results) == 9 + assert success_skipped_results[0].categories == ["Interfaces"] + assert success_skipped_results[-1].categories == ["VXLAN"] + + # Check all results with bad sort_by + with pytest.raises( + ValueError, + match=re.escape( + "Invalid sort_by fields: ['bad_field']. Accepted fields are: ['name', 'test', 'categories', 'description', 'result', 'messages', 'custom_field']", + ), + ): + all_results = result_manager.get_results(sort_by=["bad_field"]) + + def test_get_total_results(self, result_manager: ResultManager) -> None: + """Test ResultManager.get_total_results.""" + # Test all results + assert result_manager.get_total_results() == 30 + + # Test single status + assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS}) == 7 + assert result_manager.get_total_results(status={AntaTestStatus.FAILURE}) == 19 + assert result_manager.get_total_results(status={AntaTestStatus.ERROR}) == 2 + assert result_manager.get_total_results(status={AntaTestStatus.SKIPPED}) == 2 + + # Test multiple statuses + assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.FAILURE}) == 26 + assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.FAILURE, AntaTestStatus.ERROR}) == 28 + assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.FAILURE, AntaTestStatus.ERROR, AntaTestStatus.SKIPPED}) == 30 + @pytest.mark.parametrize( ("status", "error_status", "ignore_error", "expected_status"), [ @@ -159,7 +264,7 @@ def test_add( ) def test_get_status( self, - status: TestStatus, + status: AntaTestStatus, error_status: bool, ignore_error: bool, expected_status: str, @@ -177,28 +282,28 @@ def test_filter(self, test_result_factory: Callable[[], TestResult], list_result success_list = list_result_factory(3) for test in success_list: - test.result = "success" + test.result = AntaTestStatus.SUCCESS result_manager.results = success_list test = test_result_factory() - test.result = "failure" + test.result = AntaTestStatus.FAILURE result_manager.add(test) test = test_result_factory() - test.result = "error" + test.result = AntaTestStatus.ERROR result_manager.add(test) test = test_result_factory() - test.result = "skipped" + test.result = AntaTestStatus.SKIPPED result_manager.add(test) assert len(result_manager) == 6 - assert len(result_manager.filter({"failure"})) == 5 - assert len(result_manager.filter({"error"})) == 5 - assert len(result_manager.filter({"skipped"})) == 5 - assert len(result_manager.filter({"failure", "error"})) == 4 - assert len(result_manager.filter({"failure", "error", "skipped"})) == 3 - assert len(result_manager.filter({"success", "failure", "error", "skipped"})) == 0 + assert len(result_manager.filter({AntaTestStatus.FAILURE})) == 5 + assert len(result_manager.filter({AntaTestStatus.ERROR})) == 5 + assert len(result_manager.filter({AntaTestStatus.SKIPPED})) == 5 + assert len(result_manager.filter({AntaTestStatus.FAILURE, AntaTestStatus.ERROR})) == 4 + assert len(result_manager.filter({AntaTestStatus.FAILURE, AntaTestStatus.ERROR, AntaTestStatus.SKIPPED})) == 3 + assert len(result_manager.filter({AntaTestStatus.SUCCESS, AntaTestStatus.FAILURE, AntaTestStatus.ERROR, AntaTestStatus.SKIPPED})) == 0 def test_get_by_tests(self, test_result_factory: Callable[[], TestResult], result_manager_factory: Callable[[int], ResultManager]) -> None: """Test ResultManager.get_by_tests.""" diff --git a/tests/units/result_manager/test_models.py b/tests/units/result_manager/test_models.py index 2276153f8..bc44ccfd8 100644 --- a/tests/units/result_manager/test_models.py +++ b/tests/units/result_manager/test_models.py @@ -9,6 +9,8 @@ import pytest +from anta.result_manager.models import AntaTestStatus + # Import as Result to avoid pytest collection from tests.data.json_data import TEST_RESULT_SET_STATUS from tests.lib.fixture import DEVICE_NAME @@ -45,7 +47,7 @@ def test__is_status_foo(self, test_result_factory: Callable[[int], Result], data assert data["message"] in testresult.messages # no helper for unset, testing _set_status if data["target"] == "unset": - testresult._set_status("unset", data["message"]) # pylint: disable=W0212 + testresult._set_status(AntaTestStatus.UNSET, data["message"]) # pylint: disable=W0212 assert testresult.result == data["target"] assert data["message"] in testresult.messages diff --git a/tests/units/test_catalog.py b/tests/units/test_catalog.py index 76358dd4a..13046f294 100644 --- a/tests/units/test_catalog.py +++ b/tests/units/test_catalog.py @@ -345,6 +345,17 @@ def test_get_tests_by_tags(self) -> None: tests = catalog.get_tests_by_tags(tags={"leaf", "spine"}, strict=True) assert len(tests) == 1 + def test_merge_catalogs(self) -> None: + """Test the merge_catalogs function.""" + # Load catalogs of different sizes + small_catalog = AntaCatalog.parse(DATA_DIR / "test_catalog.yml") + medium_catalog = AntaCatalog.parse(DATA_DIR / "test_catalog_medium.yml") + tagged_catalog = AntaCatalog.parse(DATA_DIR / "test_catalog_with_tags.yml") + + # Merge the catalogs and check the number of tests + final_catalog = AntaCatalog.merge_catalogs([small_catalog, medium_catalog, tagged_catalog]) + assert len(final_catalog.tests) == len(small_catalog.tests) + len(medium_catalog.tests) + len(tagged_catalog.tests) + def test_merge(self) -> None: """Test AntaCatalog.merge().""" catalog1: AntaCatalog = AntaCatalog.parse(DATA_DIR / "test_catalog.yml") @@ -354,11 +365,15 @@ def test_merge(self) -> None: catalog3: AntaCatalog = AntaCatalog.parse(DATA_DIR / "test_catalog_medium.yml") assert len(catalog3.tests) == 228 - assert len(catalog1.merge(catalog2).tests) == 2 + with pytest.deprecated_call(): + merged_catalog = catalog1.merge(catalog2) + assert len(merged_catalog.tests) == 2 assert len(catalog1.tests) == 1 assert len(catalog2.tests) == 1 - assert len(catalog2.merge(catalog3).tests) == 229 + with pytest.deprecated_call(): + merged_catalog = catalog2.merge(catalog3) + assert len(merged_catalog.tests) == 229 assert len(catalog2.tests) == 1 assert len(catalog3.tests) == 228