Skip to content

Commit

Permalink
test: remove error test cases in tests/units/anta_tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mtache committed Sep 25, 2024
1 parent 22143c3 commit edcb5a7
Show file tree
Hide file tree
Showing 18 changed files with 148 additions and 149 deletions.
37 changes: 35 additions & 2 deletions anta/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
"""Module that provides predefined types for AntaTest.Input instances."""

import re
from typing import Annotated, Literal
from typing import Annotated, Literal, Self, get_args

from pydantic import Field
from pydantic import BaseModel, Field, model_validator
from pydantic.functional_validators import AfterValidator, BeforeValidator

# Regular Expression definition
Expand Down Expand Up @@ -204,3 +204,36 @@ def validate_regex(value: str) -> str:
]
BgpUpdateError = Literal["inUpdErrWithdraw", "inUpdErrIgnore", "inUpdErrDisableAfiSafi", "disabledAfiSafi", "lastUpdErrTime"]
BfdProtocol = Literal["bgp", "isis", "lag", "ospf", "ospfv3", "pim", "route-input", "static-bfd", "static-route", "vrrp", "vxlan"]


class APISSLCertificate(BaseModel):
"""Model for an API SSL certificate."""

certificate_name: str
"""The name of the certificate to be verified."""
expiry_threshold: int
"""The expiry threshold of the certificate in days."""
common_name: str
"""The common subject name of the certificate."""
encryption_algorithm: EncryptionAlgorithm
"""The encryption algorithm of the certificate."""
key_size: RsaKeySize | EcdsaKeySize
"""The encryption algorithm key size of the certificate."""

@model_validator(mode="after")
def validate_inputs(self) -> Self:
"""Validate the key size provided to the APISSLCertificates class.
If encryption_algorithm is RSA then key_size should be in {2048, 3072, 4096}.
If encryption_algorithm is ECDSA then key_size should be in {256, 384, 521}.
"""
if self.encryption_algorithm == "RSA" and self.key_size not in get_args(RsaKeySize):
msg = f"`{self.certificate_name}` key size {self.key_size} is invalid for RSA encryption. Allowed sizes are {get_args(RsaKeySize)}."
raise ValueError(msg)

if self.encryption_algorithm == "ECDSA" and self.key_size not in get_args(EcdsaKeySize):
msg = f"`{self.certificate_name}` key size {self.key_size} is invalid for ECDSA encryption. Allowed sizes are {get_args(EcdsaKeySize)}."
raise ValueError(msg)

return self
2 changes: 1 addition & 1 deletion anta/tests/field_notices.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,4 @@ def test(self) -> None:
self.result.is_success("FN72 is mitigated")
return
# We should never hit this point
self.result.is_error("Error in running test - FixedSystemvrm1 not found")
self.result.is_failure("Error in running test - Component FixedSystemvrm1 not found in 'show version'")
4 changes: 2 additions & 2 deletions anta/tests/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def test(self) -> None:
if ((duplex := (interface := interfaces["interfaces"][intf]).get("duplex", None)) is not None and duplex != duplex_full) or (
(members := interface.get("memberInterfaces", None)) is not None and any(stats["duplex"] != duplex_full for stats in members.values())
):
self.result.is_error(f"Interface {intf} or one of its member interfaces is not Full-Duplex. VerifyInterfaceUtilization has not been implemented.")
self.result.is_failure(f"Interface {intf} or one of its member interfaces is not Full-Duplex. VerifyInterfaceUtilization has not been implemented.")
return

if (bandwidth := interfaces["interfaces"][intf]["bandwidth"]) == 0:
Expand Down Expand Up @@ -705,7 +705,7 @@ def test(self) -> None:
input_interface_detail = interface
break
else:
self.result.is_error(f"Could not find `{intf}` in the input interfaces. {GITHUB_SUGGESTION}")
self.result.is_failure(f"Could not find `{intf}` in the input interfaces. {GITHUB_SUGGESTION}")
continue

input_primary_ip = str(input_interface_detail.primary_ip)
Expand Down
5 changes: 1 addition & 4 deletions anta/tests/mlag.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,7 @@ class VerifyMlagConfigSanity(AntaTest):
def test(self) -> None:
"""Main test function for VerifyMlagConfigSanity."""
command_output = self.instance_commands[0].json_output
if (mlag_status := get_value(command_output, "mlagActive")) is None:
self.result.is_error(message="Incorrect JSON response - 'mlagActive' state was not found")
return
if mlag_status is False:
if command_output["mlagActive"] is False:
self.result.is_skipped("MLAG is disabled")
return
keys_to_verify = ["globalConfiguration", "interfaceConfiguration"]
Expand Down
10 changes: 5 additions & 5 deletions anta/tests/routing/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from __future__ import annotations

from ipaddress import IPv4Address, IPv4Network, IPv6Address
from typing import Any, ClassVar
from typing import Any, ClassVar, Self

from pydantic import BaseModel, Field, PositiveInt, model_validator
from pydantic.v1.utils import deep_update
Expand Down Expand Up @@ -235,7 +235,7 @@ class BgpAfi(BaseModel):
"""Number of expected BGP peer(s)."""

@model_validator(mode="after")
def validate_inputs(self: BaseModel) -> BaseModel:
def validate_inputs(self) -> Self:
"""Validate the inputs provided to the BgpAfi class.
If afi is either ipv4 or ipv6, safi must be provided.
Expand Down Expand Up @@ -375,7 +375,7 @@ class BgpAfi(BaseModel):
"""

@model_validator(mode="after")
def validate_inputs(self: BaseModel) -> BaseModel:
def validate_inputs(self) -> Self:
"""Validate the inputs provided to the BgpAfi class.
If afi is either ipv4 or ipv6, safi must be provided.
Expand Down Expand Up @@ -522,7 +522,7 @@ class BgpAfi(BaseModel):
"""List of BGP IPv4 or IPv6 peer."""

@model_validator(mode="after")
def validate_inputs(self: BaseModel) -> BaseModel:
def validate_inputs(self) -> Self:
"""Validate the inputs provided to the BgpAfi class.
If afi is either ipv4 or ipv6, safi must be provided and vrf must NOT be all.
Expand Down Expand Up @@ -1485,7 +1485,7 @@ class BgpPeer(BaseModel):
"""Outbound route map applied, defaults to None."""

@model_validator(mode="after")
def validate_inputs(self: BaseModel) -> BaseModel:
def validate_inputs(self) -> Self:
"""Validate the inputs provided to the BgpPeer class.
At least one of 'inbound' or 'outbound' route-map must be provided.
Expand Down
6 changes: 3 additions & 3 deletions anta/tests/routing/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from functools import cache
from ipaddress import IPv4Address, IPv4Interface
from typing import ClassVar, Literal
from typing import ClassVar, Literal, Self

from pydantic import model_validator

Expand Down Expand Up @@ -89,8 +89,8 @@ class Input(AntaTest.Input):
maximum: int
"""Expected maximum routing table size."""

@model_validator(mode="after") # type: ignore[misc]
def check_min_max(self) -> AntaTest.Input:
@model_validator(mode="after")
def check_min_max(self) -> Self:
"""Validate that maximum is greater than minimum."""
if self.minimum > self.maximum:
msg = f"Minimum {self.minimum} is greater than maximum {self.maximum}"
Expand Down
38 changes: 3 additions & 35 deletions anta/tests/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
from ipaddress import IPv4Address
from typing import ClassVar

from pydantic import BaseModel, Field, model_validator
from pydantic import BaseModel, Field

from anta.custom_types import EcdsaKeySize, EncryptionAlgorithm, PositiveInteger, RsaKeySize
from anta.custom_types import APISSLCertificate, PositiveInteger
from anta.models import AntaCommand, AntaTemplate, AntaTest
from anta.tools import get_failed_logs, get_item, get_value

Expand Down Expand Up @@ -47,7 +47,7 @@ def test(self) -> None:
try:
line = next(line for line in command_output.split("\n") if line.startswith("SSHD status"))
except StopIteration:
self.result.is_error("Could not find SSH status in returned output.")
self.result.is_failure("Could not find SSH status in returned output.")
return
status = line.split("is ")[1]

Expand Down Expand Up @@ -401,38 +401,6 @@ class Input(AntaTest.Input):
certificates: list[APISSLCertificate]
"""List of API SSL certificates."""

class APISSLCertificate(BaseModel):
"""Model for an API SSL certificate."""

certificate_name: str
"""The name of the certificate to be verified."""
expiry_threshold: int
"""The expiry threshold of the certificate in days."""
common_name: str
"""The common subject name of the certificate."""
encryption_algorithm: EncryptionAlgorithm
"""The encryption algorithm of the certificate."""
key_size: RsaKeySize | EcdsaKeySize
"""The encryption algorithm key size of the certificate."""

@model_validator(mode="after")
def validate_inputs(self: BaseModel) -> BaseModel:
"""Validate the key size provided to the APISSLCertificates class.
If encryption_algorithm is RSA then key_size should be in {2048, 3072, 4096}.
If encryption_algorithm is ECDSA then key_size should be in {256, 384, 521}.
"""
if self.encryption_algorithm == "RSA" and self.key_size not in RsaKeySize.__args__:
msg = f"`{self.certificate_name}` key size {self.key_size} is invalid for RSA encryption. Allowed sizes are {RsaKeySize.__args__}."
raise ValueError(msg)

if self.encryption_algorithm == "ECDSA" and self.key_size not in EcdsaKeySize.__args__:
msg = f"`{self.certificate_name}` key size {self.key_size} is invalid for ECDSA encryption. Allowed sizes are {EcdsaKeySize.__args__}."
raise ValueError(msg)

return self

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyAPISSLCertificate."""
Expand Down
3 changes: 0 additions & 3 deletions anta/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ class VerifyReloadCause(AntaTest):
def test(self) -> None:
"""Main test function for VerifyReloadCause."""
command_output = self.instance_commands[0].json_output
if "resetCauses" not in command_output:
self.result.is_error(message="No reload causes available")
return
if len(command_output["resetCauses"]) == 0:
# No reload causes
self.result.is_success()
Expand Down
2 changes: 2 additions & 0 deletions tests/benchmark/test_anta.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,5 @@ def bench() -> ResultManager:
"---------------------------------------"
)
logger.info(bench_info)
assert manager.get_total_results({AntaTestStatus.ERROR}) == 0
assert manager.get_total_results({AntaTestStatus.UNSET}) == 0
14 changes: 4 additions & 10 deletions tests/benchmark/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from typing import TYPE_CHECKING, Any

import httpx
from pydantic import ValidationError

from anta.catalog import AntaCatalog, AntaTestDefinition
from anta.models import AntaCommand, AntaTest
Expand Down Expand Up @@ -91,15 +90,10 @@ def import_test_modules() -> Generator[ModuleType, None, None]:
for test_data in module.DATA:
test = test_data["test"]
result_overwrite = AntaTest.Input.ResultOverwrite(custom_field=test_data["name"])
# Some unit tests purposely have invalid inputs, we just skip them
try:
if test_data["inputs"] is None:
inputs = test.Input(result_overwrite=result_overwrite)
else:
inputs = test.Input(**test_data["inputs"], result_overwrite=result_overwrite)
except ValidationError:
continue

if test_data["inputs"] is None:
inputs = test.Input(result_overwrite=result_overwrite)
else:
inputs = test.Input(**test_data["inputs"], result_overwrite=result_overwrite)
test_definition = AntaTestDefinition(
test=test,
inputs=inputs,
Expand Down
30 changes: 13 additions & 17 deletions tests/units/anta_tests/routing/test_generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

from typing import Any

import pytest
from pydantic import ValidationError

from anta.tests.routing.generic import VerifyRoutingProtocolModel, VerifyRoutingTableEntry, VerifyRoutingTableSize
from tests.units.anta_tests import test

Expand Down Expand Up @@ -66,16 +69,6 @@
"inputs": {"minimum": 42, "maximum": 666},
"expected": {"result": "failure", "messages": ["routing-table has 1000 routes and not between min (42) and maximum (666)"]},
},
{
"name": "error-max-smaller-than-min",
"test": VerifyRoutingTableSize,
"eos_data": [{}],
"inputs": {"minimum": 666, "maximum": 42},
"expected": {
"result": "error",
"messages": ["Minimum 666 is greater than maximum 42"],
},
},
{
"name": "success",
"test": VerifyRoutingTableEntry,
Expand Down Expand Up @@ -310,11 +303,14 @@
"inputs": {"vrf": "default", "routes": ["10.1.0.1", "10.1.0.2"], "collect": "all"},
"expected": {"result": "failure", "messages": ["The following route(s) are missing from the routing table of VRF default: ['10.1.0.2']"]},
},
{
"name": "collect-input-error",
"test": VerifyRoutingTableEntry,
"eos_data": {},
"inputs": {"vrf": "default", "routes": ["10.1.0.1", "10.1.0.2"], "collect": "not-valid"},
"expected": {"result": "error", "messages": ["Inputs are not valid"]},
},
]


class TestVerifyRoutingTableSize: # pylint: disable=too-few-public-methods
"""Test VerifyRoutingTableSize."""

def test_inputs(self) -> None:
"""Test VerifyRoutingTableSize inputs."""
VerifyRoutingTableSize.Input(minimum=1, maximum=2)
with pytest.raises(ValidationError):
VerifyRoutingTableSize.Input(minimum=2, maximum=1)
10 changes: 0 additions & 10 deletions tests/units/anta_tests/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,4 @@
"inputs": {"regex_patterns": ["bla", "bleh"]},
"expected": {"result": "failure", "messages": ["Following patterns were not found: 'bla','bleh'"]},
},
{
"name": "failure-invalid-regex",
"test": VerifyRunningConfigLines,
"eos_data": ["enable password something\nsome other line"],
"inputs": {"regex_patterns": ["["]},
"expected": {
"result": "error",
"messages": ["1 validation error for Input\nregex_patterns.0\n Value error, Invalid regex: unterminated character set at position 0"],
},
},
]
4 changes: 2 additions & 2 deletions tests/units/anta_tests/test_field_notices.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@
],
"inputs": None,
"expected": {
"result": "error",
"messages": ["Error in running test - FixedSystemvrm1 not found"],
"result": "failure",
"messages": ["Error in running test - Component FixedSystemvrm1 not found in 'show version'"],
},
},
]
4 changes: 2 additions & 2 deletions tests/units/anta_tests/test_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@
],
"inputs": {"threshold": 70.0},
"expected": {
"result": "error",
"result": "failure",
"messages": ["Interface Ethernet1/1 or one of its member interfaces is not Full-Duplex. VerifyInterfaceUtilization has not been implemented."],
},
},
Expand Down Expand Up @@ -797,7 +797,7 @@
],
"inputs": {"threshold": 70.0},
"expected": {
"result": "error",
"result": "failure",
"messages": ["Interface Port-Channel31 or one of its member interfaces is not Full-Duplex. VerifyInterfaceUtilization has not been implemented."],
},
},
Expand Down
11 changes: 0 additions & 11 deletions tests/units/anta_tests/test_mlag.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,6 @@
"inputs": None,
"expected": {"result": "skipped", "messages": ["MLAG is disabled"]},
},
{
"name": "error",
"test": VerifyMlagConfigSanity,
"eos_data": [
{
"dummy": False,
},
],
"inputs": None,
"expected": {"result": "error", "messages": ["Incorrect JSON response - 'mlagActive' state was not found"]},
},
{
"name": "failure-global",
"test": VerifyMlagConfigSanity,
Expand Down
Loading

0 comments on commit edcb5a7

Please sign in to comment.